DBMS_PIPE
- 用于在同一个实例中,通过一个管道来实现会话内或会话之间的消息传递。
函数名 |
参数类型 |
结果类型 |
描述 |
create_pipe |
varchar,integer,bool |
null |
使用指定的名称显示地创建了一个公有或私有的管道 |
next_item_type |
null |
integer |
返回消息缓存中下一个数据成员的数据类型 |
pack_message |
number/text/date/
timestamptz/bytea/record |
null |
用于在本地缓存中构建消息 |
purge |
varchar |
null |
用于清除指定的管道中的消息 |
receive_message |
varchar,integer |
integer |
用于从指定的管道中获取一条消息 |
remove_pipe |
varchar |
integer |
用于删除一个私有或公有的管道 |
reset_buffer |
null |
null |
用于清除本地缓存中的内容 |
send_message |
varchar,integer,integer |
integer |
用于将消息发送到指定的管道中 |
unique_session_name |
null |
varchar |
返回一个唯一的会话名称 |
unpack_message |
null |
number/text/date/timestamptz/bytea/record |
用于从本地缓存中获取消息 |
list_pipes |
null |
record |
用于列出所有的管道 |
视图名 |
描述 |
db_pipes |
用于查询数据库中所有的管道 |
--创建公有管道
vastbase=# CREATE FUNCTION test_create_pipe() RETURNS VOID AS
$$
BEGIN
PERFORM dbms_pipe.create_pipe('message');
END;
$$
LANGUAGE plpgsql;
--创建私有管道
vastbase=# CREATE FUNCTION test_create_pipe() RETURNS VOID AS
$$
BEGIN
PERFORM dbms_pipe.create_pipe('message',32,true);
END;
$$
LANGUAGE plpgsql;
vastbase=# select dbms_output.serveroutput(true);
--调用next_item_type
vastbase=# CREATE OR REPLACE FUNCTION test_next_item_type() RETURNS void AS
$$
DECLARE
v_number NUMBER := 123;
v_varchar TEXT := 'Character data';
v_date DATE := date'2020-12-14';
BEGIN
perform dbms_pipe.pack_message(v_number);
perform dbms_pipe.pack_message(v_varchar);
perform dbms_pipe.pack_message(v_date);
dbms_pipe.send_message('datatypes');
END;
$$
LANGUAGE plpgsql;
vastbase=# select test_next_item_type();
vastbase=# CREATE OR REPLACE FUNCTION test_next_item_type1() RETURNS void AS
$$
DECLARE
v_number NUMBER;
v_varchar TEXT;
v_timestamptz TIMESTAMPTZ;
status INTEGER;
BEGIN
dbms_pipe.receive_message('datatypes');
-- 返回9
status := dbms_pipe.next_item_type();
dbms_output.put_line('next_item_type:' || status);
v_number := dbms_pipe.unpack_message_number();
dbms_output.put_line('number item:' || v_number);
-- 返回11
status := dbms_pipe.next_item_type();
dbms_output.put_line('next_item_type:' || status);
v_varchar := dbms_pipe.unpack_message_text();
dbms_output.put_line('varchar item:' || v_varchar);
-- 返回13
status := dbms_pipe.next_item_type();
dbms_output.put_line('next_item_type:' || status);
v_timestamptz := dbms_pipe.unpack_message_timestamp();
dbms_output.put_line('timestamptz item:' || v_timestamptz);
status := dbms_pipe.next_item_type();
dbms_output.put_line('next_item_type:' || status);
END;
$$
LANGUAGE plpgsql;
vastbase=# select test_next_item_type1();
---调用pack_message
vastbase=# CREATE OR REPLACE FUNCTION test_pack_message1() RETURNS void AS
$$
DECLARE
v_number NUMBER := 123;
v_varchar TEXT := 'Character data';
status INTEGER;
BEGIN
perform dbms_pipe.pack_message(v_number);
perform dbms_pipe.pack_message(v_varchar);
status := dbms_pipe.send_message('datatypes');
dbms_output.put_line('send_message status:' || status);
END;
$$
LANGUAGE plpgsql;
vastbase=# select test_pack_message1();
---调用purge
CREATE OR REPLACE FUNCTION test_create_pipe2() RETURNS void AS
$$
DECLARE
BEGIN
perform dbms_pipe.create_pipe('message');
perform dbms_pipe.pack_message('test1');
dbms_pipe.send_message('message');
perform dbms_pipe.pack_message('test2');
dbms_pipe.send_message('message');
END;
$$
LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION test_purge() RETURNS void AS
$$
DECLARE
v_item TEXT;
BEGIN
dbms_pipe.receive_message('message', 10);
v_item := dbms_pipe.unpack_message_text();
dbms_output.put_line('Item:' || v_item);
END;
$$
LANGUAGE plpgsql;
SELECT dbms_pipe.purge('message');
---remove_pipe清除已有显式管道
select dbms_pipe.create_pipe('test_remove');
select * from dbms_pipe.db_pipes;
select dbms_pipe.remove_pipe('test_remove');
---reset_buffer清除本地缓存中的内容
会话1:
select dbms_pipe.pack_message('u r testing reset_buffer');
select dbms_pipe.send_message('reset_buffer');
会话2:
select dbms_pipe.receive_message('reset_buffer');
select dbms_pipe.reset_buffer();
select dbms_pipe.unpack_message_text();
---list_pipes列出所有管道
select dbms_pipe.create_pipe('pipe1');
select dbms_pipe.pack_message('message');
select dbms_pipe.send_message('pipe2');
SELECT * FROM dbms_pipe.list_pipes() AS (name varchar, item integer, size integer, "limit" integer, private bool, owner varchar);
name | item | size | limit | private | owner
--------------+------+------+-------+---------+-------
datatypes | 1 | 72 | | f |
test_receive | 1 | 64 | | f |
test_remove | 0 | 0 | | f |
pipe1 | 0 | 0 | | f |
pipe2 | 1 | 40 | | f |
(5 rows)
---unique_session_name返回唯一的会话名称
CREATE OR REPLACE FUNCTION test_unique_session_name() RETURNS void AS
$$
DECLARE
v_session TEXT;
BEGIN
v_session := dbms_pipe.unique_session_name();
dbms_output.put_line('unique_session_name:' || v_session);
END;
$$
LANGUAGE plpgsql;
select test_unique_session_name();
---db_pipes视图
select * from dbms_pipe.db_pipes;