VastbaseG100

基于openGauss内核开发的企业级关系型数据库。

Menu

DBMS_PIPE

  • 用于在同一个实例中,通过一个管道来实现会话内或会话之间的消息传递。
    函数名 参数类型 结果类型 描述
    create_pipe varchar,integer,bool null 使用指定的名称显示地创建了一个公有或私有的管道
    next_item_type null integer 返回消息缓存中下一个数据成员的数据类型
    pack_message number/text/date/
    timestamptz/bytea/record
    null 用于在本地缓存中构建消息
    purge varchar null 用于清除指定的管道中的消息
    receive_message varchar,integer integer 用于从指定的管道中获取一条消息
    remove_pipe varchar integer 用于删除一个私有或公有的管道
    reset_buffer null null 用于清除本地缓存中的内容
    send_message varchar,integer,integer integer 用于将消息发送到指定的管道中
    unique_session_name null varchar 返回一个唯一的会话名称
    unpack_message null number/text/date/timestamptz/bytea/record 用于从本地缓存中获取消息
    list_pipes null record 用于列出所有的管道

视图名 描述
db_pipes 用于查询数据库中所有的管道

--创建公有管道
vastbase=# CREATE FUNCTION test_create_pipe() RETURNS VOID AS
$$
BEGIN
PERFORM dbms_pipe.create_pipe('message');
END;
$$
LANGUAGE plpgsql;
--创建私有管道
vastbase=# CREATE FUNCTION test_create_pipe() RETURNS VOID AS
$$
BEGIN
PERFORM dbms_pipe.create_pipe('message',32,true);
END;
$$
LANGUAGE plpgsql;
vastbase=# select dbms_output.serveroutput(true);
--调用next_item_type
vastbase=# CREATE OR REPLACE FUNCTION test_next_item_type() RETURNS void AS
$$
DECLARE
        v_number NUMBER := 123;
        v_varchar TEXT := 'Character data';
        v_date DATE := date'2020-12-14';
BEGIN
        perform dbms_pipe.pack_message(v_number);
        perform dbms_pipe.pack_message(v_varchar);
        perform dbms_pipe.pack_message(v_date);
        dbms_pipe.send_message('datatypes');
END;
$$
LANGUAGE plpgsql;
vastbase=# select test_next_item_type();
vastbase=# CREATE OR REPLACE FUNCTION test_next_item_type1() RETURNS void AS
$$
DECLARE
        v_number NUMBER;
        v_varchar TEXT;
        v_timestamptz TIMESTAMPTZ;
        status INTEGER;
BEGIN
        dbms_pipe.receive_message('datatypes');
        -- 返回9
        status := dbms_pipe.next_item_type();
        dbms_output.put_line('next_item_type:' || status);                  
        v_number := dbms_pipe.unpack_message_number();
        dbms_output.put_line('number item:' || v_number);
        -- 返回11
        status := dbms_pipe.next_item_type();
        dbms_output.put_line('next_item_type:' || status);
        v_varchar := dbms_pipe.unpack_message_text();
        dbms_output.put_line('varchar item:' || v_varchar);
        -- 返回13
        status := dbms_pipe.next_item_type();
        dbms_output.put_line('next_item_type:' || status);
        v_timestamptz := dbms_pipe.unpack_message_timestamp();
        dbms_output.put_line('timestamptz item:' || v_timestamptz);

        status := dbms_pipe.next_item_type();
        dbms_output.put_line('next_item_type:' || status);
END;
$$
LANGUAGE plpgsql;
vastbase=# select test_next_item_type1();
---调用pack_message
vastbase=# CREATE OR REPLACE FUNCTION test_pack_message1() RETURNS void AS
$$
DECLARE
        v_number NUMBER := 123;
        v_varchar TEXT := 'Character data';
        status INTEGER;
BEGIN
        perform dbms_pipe.pack_message(v_number);
        perform dbms_pipe.pack_message(v_varchar);
        status := dbms_pipe.send_message('datatypes');
        dbms_output.put_line('send_message status:' || status);
END;
$$
LANGUAGE plpgsql;
vastbase=# select test_pack_message1();
---调用purge
CREATE OR REPLACE FUNCTION test_create_pipe2() RETURNS void AS
$$
DECLARE
BEGIN
        perform dbms_pipe.create_pipe('message');
        
        perform dbms_pipe.pack_message('test1');
        dbms_pipe.send_message('message');

        perform dbms_pipe.pack_message('test2');
        dbms_pipe.send_message('message');
END;
$$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION test_purge() RETURNS void AS
$$
DECLARE
        v_item TEXT;
BEGIN
        dbms_pipe.receive_message('message', 10);
        v_item := dbms_pipe.unpack_message_text();
        dbms_output.put_line('Item:' || v_item);

END;
$$
LANGUAGE plpgsql;
SELECT dbms_pipe.purge('message');
---remove_pipe清除已有显式管道
select dbms_pipe.create_pipe('test_remove');
select * from dbms_pipe.db_pipes;
select dbms_pipe.remove_pipe('test_remove');
---reset_buffer清除本地缓存中的内容
会话1:
select dbms_pipe.pack_message('u r testing reset_buffer');
select dbms_pipe.send_message('reset_buffer');
会话2:
select dbms_pipe.receive_message('reset_buffer');
select dbms_pipe.reset_buffer();
select dbms_pipe.unpack_message_text();
---list_pipes列出所有管道
select dbms_pipe.create_pipe('pipe1');
select dbms_pipe.pack_message('message');
select dbms_pipe.send_message('pipe2');
SELECT * FROM dbms_pipe.list_pipes() AS (name varchar, item integer, size integer, "limit" integer, private bool, owner varchar);
     name     | item | size | limit | private | owner
--------------+------+------+-------+---------+-------
 datatypes    |    1 |   72 |       | f       |
 test_receive |    1 |   64 |       | f       |
 test_remove  |    0 |    0 |       | f       |
 pipe1        |    0 |    0 |       | f       |
 pipe2        |    1 |   40 |       | f       |
(5 rows)
---unique_session_name返回唯一的会话名称
CREATE OR REPLACE FUNCTION test_unique_session_name() RETURNS void AS
$$
DECLARE
        v_session TEXT;
BEGIN
        v_session := dbms_pipe.unique_session_name();
        dbms_output.put_line('unique_session_name:' || v_session);
END;
$$
LANGUAGE plpgsql;
select test_unique_session_name();
---db_pipes视图
select * from dbms_pipe.db_pipes;