DBMS_AQ
功能描述
Vastbase G100在Oracle兼容模式下支持内置包DBMS_AQ,其中包含的存储过程负责添加消息到队列、从队列中删除消息、注册或注销PL/SQL回调存储过程。
注意事项
该功能仅在数据库兼容模式为Oracle时支持(即数据库实例初始化时指定DBCOMPATIBILITY='A')。
常量
该内置包包含以下常量:
参数 | 常量 | 说明 |
---|---|---|
dequeue_options_t.dequeue_mode | DBMS_AQ.BROWSE (0) | 读取消息而不锁定,等同于SELECT。 |
DBMS_AQ.LOCKED (1) | 获取锁定之后读取信息,等同于SELECT FOR UPDATE。 | |
DBMS_AQ.REMOVE (2) | 读取之后删除消息,该参数为默认值。 | |
DBMS_AQ.REMOVE_NODATA (3) | 将信息标记为已更新或删除,不立即删除信息。此参数暂不支持。 | |
dequeue_options_t.navigation | DBMS_AQ.FIRST_MESSAGE (0) | 当出队期间有更高优先级的信息到达队列时,则可以优先处理更高优先级的信息。 |
DBMS_AQ.NEXT_MESSAGE (1) | 出队期间将根据第一个出队信息获取的快照检索信息,在第一条信息出队之后,只有处理完队列中所有剩余信息,入队的信息才会被处理。该参数为默认值。 | |
dequeue_options_t.wait | DBMS_AQ.FOREVER (-1) | 如果找不到与搜索词匹配的消息,则持续等待,该参数为默认值。 |
DBMS_AQ.NO_WAIT (0) | 如果找不到与搜索词匹配的消息,则不等待。 | |
enqueue_options_t.visibility 、 dequeue_options_t.visibility | DBMS_AQ.ON_COMMIT (0) | 默认值,若回退事务则出队项目保留在队列中。 |
enqueue_options_t.delivery_mode | DBMS_AQ.PERSISTENT (0) | 此消息应存储在表中。 |
DBMS_AQ.BUFFERED(1) | 此常量已定义,但功能不支持。 | |
message_properties_t.state | DBMS_AQ.READY(0) | 指定消息已经准备好进行处理。 |
DBMS_AQ.WAITTING (1) | 指定消息正在等待处理。 | |
DBMS_AQ.PROCESSED (2) | 指定消息已处理。 | |
DBMS_AQ.EXPIRED (3) | 指定消息已过期并处于异常队列中。 | |
message_properties_t.delay | DBMS_AQ.NO_DELAY (0) | 信息可用于出队之前经过的秒数。 |
message_properties_t.expiration | DBMS_AQ.NEVER (-1) | 指定信息过期的秒数,NEVER为默认值。 |
DBMS_AQ.aq$_reg_info.namespace | DBMS_AQ.NAMESPACE_AQ (0) | 接收来自DBMS_AQ队列的通知。 |
DBMS_AQ.NAMESPACE_ANONYMOUS (1) | 此常量已定义,但功能不支持。 | |
DBMS_AQ.aq$_reg_info.tgtype | DBMS_AQ.BOTH | 回调函数在出队入队时都执行。 |
DBMS_AQ.ENQUEUE_ONLY | 回调函数仅在入队时执行。 | |
DBMS_AQ.DEQUEUE_ONLY | 回调函数仅在出队时执行。 |
类型
类型 | 描述 |
---|---|
AQ$_REG_INFO | 用于记录注册回调时需要的信息。 |
AQ$_REG_INFO_LIST | AQ$_REG_INFO的列表。 |
AQ$_DESCRIPTOR | 用户定义的回调函数中,必须存在此类型的入参。 |
ENQUEUE_OPTIONS_T | 指定可用于取消排队操作的选项。 |
MESSAGE_PROPERTIES_T | 描述AQ用于传达各个消息的状态的信息。 |
DEQUEUE_OPTIONS_T | 可用于取消排队操作的选项。 |
- 本部分介绍内置包的内置类型,各类型结构已在内置包中定义完成,无需用户手动创建。
- 各类型的结构参见类型定义。
- 类型定义中,DEFAULT表示指定默认值,其后接的内容即为指定的默认值。
AQ$_REG_INFO
类型定义
CREATE TYPE AQ$_REG_INFO AS( name VARCHAR2(128), namespace NUMERIC DEFAULT DBMS_AQ.NAMESPACE_AQ, callback VARCHAR2(4000), context RAW(16), tgtype INTEGER DEFAULT DBMS_AQ.BOTH );
参数说明
name
类型为VARCHAR2(128),表示队列的名称,格式为[schema].queue。
namespace
类型为NUMERIC,唯一支持的值为DBMS_AQ.NAMESPACE_AQ (0)。
callback
类型为VARCHAR2(4000),描述对通知执行的操作。
目前,仅支持调用PL/SQL存储过程。调用应采取以下形式:
plsql://schema.procedure
其中:
schema:指定存储过程所在的schema。
procedure:指定待通知的存储过程的名称。
context
类型为RAW(16),回调存储过程所需的用户定义的值。
tgtype
类型为INTEGER,标记执行回调的时机。
- BOTH表示出入队都执行。
- ENQUEUE_ONLY表示仅入队时执行。
- DEQUEUE_ONLY表示仅出队时执行。
AQ$_DESCRIPTOR
类型定义
TYPE AQ$_DESCRIPTOR IS RECORD ( queue_name VARCHAR2(261), consumer_name VARCHAR2(512), msg_id RAW(16), ntfnsRecdInGrp NUMBER );
参数说明
queue_name
类型为VARCHAR2(261),表示触发回调操作的队列名。
consumer_name
类型为VARCHAR2(512),目前仅支持单消费者模型,所以忽略此参数。
msg_id
类型为RAW(16),表示触发回调操作信息的msgid。
ntfnsRecdInGrp
类型为NUMBER,为Group相关参数,当前不支持。
ENQUEUE_OPTIONS_T
类型定义
CREATE TYPE DBMS_AQ.ENQUEUE_OPTIONS_T AS( visibility BINARY_INTEGER DEFAULT DBMS_AQ.ON_COMMIT, relative_msgid RAW(16) DEFAULT NULL, sequence_deviation BINARY INTEGER DEFAULT NULL, transformation VARCHAR2(61) DEFAULT NULL, delivery_mode PLS_INTEGER NOT NULL DEFAULT DBMS_AQ.PERSISTENT);
参数说明
visibility
指定排队请求的事务行为。当前仅支持设置为ON_COMMIT。表示排队是当前事务的一部分。当事务提交时,操作完成。
delivery_mode
排队程序指定其希望在排队选项中排队的消息的传递模式。当前仅支持设置为PERSISTENT。
sequence_deviation
指定是否应该在队列中已存在的其他消息之前将正在加入队列的消息从队列中取出。当前仅支持设置为NULL。
transformation
指定在对消息进行排队之前应用转换。转换函数的返回类型必须与队列的类型匹配。当前仅支持设置为NULL。
relative_msgid
指定序列偏差操作中引用的消息的消息标识符。当前仅支持设置为NULL。
MESSAGE_PROPERTIES_T
类型定义
TYPE MESSAGE_PROPERTIES_T IS RECORD ( priority BINARY_INTEGER NOT NULL DEFAULT 1, delay BINARY_INTEGER NOT NULL DEFAULT DBMS_AQ.NO_DELAY, expiration BINARY_INTEGER NOT NULL DEFAULT DBMS_AQ.NEVER, correlation VARCHAR2(128) DEFAULT NULL, attempts BINARY_INTEGER, recipient_list text DEFAULT NULL, exception_queue VARCHAR2(61) DEFAULT NULL, enqueue_time Date, state BINARY_INTEGER, sender_id text DEFAULT NULL, original_msgid RAW(16) DEFAULT NULL, transaction_group VARCHAR2(30) DEFAULT NULL, user_property text DEFAULT NULL, delivery_mode INTEGER NOT NULL DEFAULT DBMS_AQ.PERSISTENT );
参数说明
priority
如果队列表定义包括sort_list并引用了 priority,则此参数影响消息出队的顺序。较低的值指示较高的出队优先级。
delay
指定消息可用于出队之前将经过的秒数,或者NO_DELAY。
expiration
使用expiration参数指定消息过期的秒数。
correlation
使用correlation指定将与条目关联的消息,暂不支持此参数。
attempts
这是系统维护的值,指定消息出队的尝试次数。不允许用户赋值。
recipient_list
当前暂不支持此参数。
exception_queue
使用exception_queue参数指定异常队列的名称,如果消息过期或者信息出队尝试次数超过限制,则消息将移动到该队列。
enqueue_time
enqueue_time是记录添加到队列的时间。此值由系统提供,不允许用户赋值。
state
此参数由DBMS_AQ维护,状态可以为:
DBMS_AQ.WAITING:队列条目正在等待处理。
DBMS_AQ.READY:队列条目已准备好处理。
DBMS_AQ.PROCESSED:队列条目已处理。
DBMS_AQ.EXPIRED:队列条目已移动到异常队列。
original_msgid
为了实现兼容性而支持此参数,忽略此参数。
transaction_group
为了实现兼容性而支持此参数,忽略此参数。
delivery_mode
指定DBMS_AQ.PERSISTENT的值。不支持此参数
DEQUEUE_OPTIONS_T
类型定义
TYPE DEQUEUE_OPTIONS_T IS RECORD ( consumer_name VARCHAR2(30) DEFAULT NULL, dequeue_mode BINARY_INTEGER DEFAULT DBMS_AQ.REMOVE, navigation BINARY_INTEGER DEFAULT DBMS_AQ.NEXT_MESSAGE, visibility BINARY_INTEGER DEFAULT DBMS_AQ.ON_COMMIT, wait BINARY_INTEGER DEFAULT DBMS_AQ.FOREVER, msgid RAW(16) DEFAULT NULL, correlation VARCHAR2(128) DEFAULT NULL, deq_condition VARCHAR2(4000) DEFAULT NULL, transformation VARCHAR2(61) DEFAULT NULL, delivery_mode INTEGER DEFAULT DBMS_AQ.PERSISTENT );
参数说明
consumer_name
暂不支持,忽略此参数。
dequeue_mode
出队操作的锁定行为。目前支持的有:
DBMS_AQ.BROWSE:读取消息而不获取锁定。
DBMS_AQ.LOCKED:获取写锁定之后读取消息。
DBMS_AQ.REMOVE:读取消息之后删除信息。该参数为默认值。
navigation
标识将检索的消息。必须为如下选项:
NEXT_MESSAGE:第一条信息出队后入队的信息,只有在处理完队列中所有剩余信息后才会被处理。该参数为默认值。
FIRST_MESSAGE:若信息出队过程中有更高优先级信息入队,则优先处理。
visibility
必须为ON_COMMIT,如果用户回退当前事务,出队项目将保持在队列中。
wait
必须为大于0的数字,或者为如下选项:
DBMS_AQ.FOREVER:无限期等待。
DBMS_AQ.NO_WAIT:不等待。
msgid
即将出队消息的ID。若指定此项,即使是waitting状态的信息也可以出队。
correlation
为了实现兼容性而提供的支持,所以忽略此参数。
deq_condition
一个VARCHAR2表达式,所求值为BOOLEAN值,指定消息是否应出队。若指定队列负载相关内容,格式为:tab.user_data.xxx。
transformation
为了实现兼容性而提供的支持,所以忽略此参数。
delivery_mode
必须为PERSISTENT。
子程序
该内置包包含以下存储过程:
存储过程名 描述 REGISTER 使用REGISTER存储过程在消息入队或出队时接收通知。 UNREGISTER 使用UNREGISTER存储过程关闭与入队和出队相关的通知。 ENQUEUE ENQUEUE存储过程将一个条目添加到队列。 DEQUEUE DEQUEUE存储过程让消息出队。 REGISTER
语法格式
REGISTER( reg_list IN SYS.AQ$_REG_INFO_LIST, count IN NUMBER)
参数说明
reg_list
类型为AQ$_REG_INFO_LIST,是AQ$_REG_INFO的列表。用于提供多个订阅相关信息。
count
reg_list列表中元素个数。
注意事项
使用DBMS_AQ.REGISTER注册回调存储过程时,存储过程必须具有以下入参:
create or replace procedure plsqlcallback( context RAW, reginfo DBMS_AQ.AQ$_REG_INFO, descr DBMS_AQ.AQ$_DESCRIPTOR, payload RAW, payloadl NUMBER) { IS | AS } plsql_body /
- 也可以使用DBMS_AQ.REGISTER注册回调函数,但函数的返回值无法获取。
- 使用DBMS_AQ.REGISTER注册回调函数时,也应遵守上述入参限制。
示例
1、创建类型和队列表。
CREATE TYPE work_order_1143352 as object (id int,name VARCHAR2(200)); call DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'work_order_1143352_1', queue_payload_type => 'work_order_1143352');
2、创建普通队列。
call DBMS_AQADM.CREATE_QUEUE ( queue_name => 'workorder_1143352', queue_table => 'work_order_1143352_1',max_retries => 3);
3、启动队列。
call dbms_AQADM.start_queue(queue_name => 'workorder_1143352');
4、创建测试表和存储过程。
create temp table t1 (c1 int,c2 text,c3 text,c4 text); create or replace procedure p_1143352(context raw, c2 dbms_aq.aq$_reg_info,des dbms_aq.aq$_descriptor, payload raw, playload1 number) as begin insert into t1 values(c2.tgtype,c2.name,des.msg_id,'SUCCESSFUL!'); end; /
5、使用REGISTER存储过程注册,在消息入队或出队时接收通知。
declare s1 dbms_aq.aq$_reg_info; s11 dbms_aq.aq$_reg_info; s2 dbms_aq.aq$_reg_info_list; BEGIN s1:=dbms_aq.aq$_reg_info('workorder_1143352',dbms_aq.namespace_aq,'plsql://public.p_1143352',1,0); s11:=dbms_aq.aq$_reg_info('workorder_1143352',dbms_aq.namespace_aq,'plsql://public.p_1143352',1,0); s2:=dbms_aq.aq$_reg_info_list(s1,s11); dbms_aq.register(s2,2); end; /
6、执行入队操作。
declare c1 dbms_AQ.enqueue_options_t; c2 dbms_AQ.message_properties_t; v1 raw(16); o1 work_order_1143352; begin o1:=work_order_1143352(1,'message'); dbms_AQ.enqueue(queue_name => 'workorder_1143352',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1); commit; dbms_output.put_line('入队成功。'); end; / declare c1 dbms_AQ.enqueue_options_t; c2 dbms_AQ.message_properties_t; v1 raw(16); o1 work_order_1143352; begin o1:=work_order_1143352(1,'message'); dbms_AQ.enqueue(queue_name => 'workorder_1143352',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1); commit; dbms_output.put_line('入队成功。'); end; /
7、查询测试表中的数据量。
select count(*) from t1;
返回结果显示为:
count ------- 4 (1 row)
8、查询队列信息。
select count(*) from aq$work_order_1143352_1; declare c1 dbms_AQ.dequeue_options_t; c2 dbms_AQ.message_properties_t; v1 raw(16); o1 work_order_1143352; BEGIN c1.wait:=dbms_AQ.no_wait; c1.dequeue_mode:=dbms_AQ.browse; dbms_AQ.dequeue(queue_name => 'workorder_1143352',dequeue_options => c1,message_properties => c2,payload => o1,msgid => v1); dbms_output.put_line(o1.name); end; /
结果显示为如下:
count ------- 2 (1 row) ANONYMOUS BLOCK EXECUTE
9、执行出队操作。
declare c1 dbms_AQ.dequeue_options_t; c2 dbms_AQ.message_properties_t; v1 raw(16); o1 work_order_1143352; BEGIN c1.wait:=dbms_AQ.no_wait; dbms_AQ.dequeue(queue_name => 'workorder_1143352',dequeue_options => c1,message_properties => c2,payload => o1,msgid => v1); dbms_output.put_line(o1.name); --raise info '%',v1; end; /
10、查询队列信息和测试表信息。
select count(*) from aq$work_order_1143352_1; select * from t1;
查询结果显示为如下:
count ------- 1 (1 row) c1 | c2 | c3 | c4 ----+-------------------+----------------------------------+------------- 0 | workorder_1143352 | AEAB1A4F00D7C83A4A0C1F40D156A11D | SUCCESSFUL! 0 | workorder_1143352 | AEAB1A4F00D7C83A4A0C1F40D156A11D | SUCCESSFUL! 0 | workorder_1143352 | 1073CF7A32B30C2CA9F96868EE5E9123 | SUCCESSFUL! 0 | workorder_1143352 | 1073CF7A32B30C2CA9F96868EE5E9123 | SUCCESSFUL! 0 | workorder_1143352 | AEAB1A4F00D7C83A4A0C1F40D156A11D | SUCCESSFUL! 0 | workorder_1143352 | AEAB1A4F00D7C83A4A0C1F40D156A11D | SUCCESSFUL! 0 | workorder_1143352 | AEAB1A4F00D7C83A4A0C1F40D156A11D | SUCCESSFUL! 0 | workorder_1143352 | AEAB1A4F00D7C83A4A0C1F40D156A11D | SUCCESSFUL! (8 rows)
UNREGISTER
语法格式
UNREGISTER( reg_list IN AQ$_REG_INFO_LIST, count IN NUMBER)
参数说明
reg_list
类型为AQ$_REG_INFO_LIST,是AQ$_REG_INFO的列表。用于提供多个订阅相关信息。
count
reg_list列表中元素个数。
示例
1、创建类型和队列表。
CREATE TYPE work_order_1143355 as object (id int,name VARCHAR2(200)); call DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'work_order_1143355_1', queue_payload_type => 'work_order_1143355');
2、创建普通队列。
call DBMS_AQADM.CREATE_QUEUE ( queue_name => 'workorder_1143355', queue_table => 'work_order_1143355_1',max_retries => 3); call DBMS_AQADM.CREATE_QUEUE ( queue_name => 'workorder_1143355_1', queue_table => 'work_order_1143355_1',max_retries => 3);
3、启动队列。
call dbms_AQADM.start_queue(queue_name => 'workorder_1143355_1'); call dbms_AQADM.start_queue(queue_name => 'workorder_1143355');
4、创建测试表和存储过程。
create table t1_1143355 (c1 int,c2 text,c3 text,c4 text); create or replace procedure p_1143355(context raw, c2 dbms_aq.aq$_reg_info,des dbms_aq.aq$_descriptor, payload raw, playload1 number) as begin insert into t1_1143355 values(c2.tgtype,c2.name,des.msg_id,'SUCCESSFUL!'); end; / create or replace procedure p_1143355_1(context raw, c2 dbms_aq.aq$_reg_info,des dbms_aq.aq$_descriptor, payload raw, playload1 number) as begin delete from t1_1143355 where c3=des.msg_id; end; /
5、使用REGISTER存储过程注册,在消息入队或出队时接收通知。
declare s1 dbms_aq.aq$_reg_info; s11 dbms_aq.aq$_reg_info; s2 dbms_aq.aq$_reg_info_list; BEGIN s1:=dbms_aq.aq$_reg_info('workorder_1143355',dbms_aq.namespace_aq,'plsql://public.p_1143355',1,0); s11:=dbms_aq.aq$_reg_info('workorder_1143355_1',dbms_aq.namespace_aq,'plsql://public.p_1143355',1,0); s2:=dbms_aq.aq$_reg_info_list(s1,s11); dbms_aq.register(s2,2); end; / declare c1 dbms_AQ.enqueue_options_t; c2 dbms_AQ.message_properties_t; v1 raw(16); o1 work_order_1143355; begin o1:=work_order_1143355(1,'message'); dbms_AQ.enqueue(queue_name => 'workorder_1143355_1',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1); dbms_AQ.enqueue(queue_name => 'workorder_1143355',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1); commit; dbms_output.put_line('入队成功。'); end; /
6、查询测试表和队列信息。
\x
表示开启或关闭列式展示查询结果。select * from t1_1143355; \x select * from aq$work_order_1143355_1; \x select * from vb_queue_triggers;
查询结果显示为:
c1 | c2 | c3 | c4 ----+---------------------+----------------------------------+------------- 0 | workorder_1143355_1 | 2706990B85126A57EBE1DA005EF2F57D | SUCCESSFUL! 0 | workorder_1143355 | 195C922E0702B4755AE8D4774B062536 | SUCCESSFUL! (2 rows) Expanded display is on. -[ RECORD 1 ]---------+--------------------------------- queue | workorder_1143355_1 msg_id | 2706990B85126A57EBE1DA005EF2F57D corr_id | msg_priority | 1 msg_state | READY delay | 2023-05-26 23:03:15 delay_timestamp | 2023-05-26 23:03:15.217726 expiration | -1 enq_time | 2023-05-26 23:03:15 enq_timestamp | 2023-05-26 23:03:15.217726 enq_user_id | vastbase enq_txn_id | deq_time | deq_timestamp | deq_user_id | deq_txn_id | retry_count | 0 exception_queue_owner | public exception_queue | AQ$_work_order_1143355_1_E user_data | (1,message) original_queue_name | original_queue_owner | expiration_reason | sender_name | sender_address | sender_protocol | original_msgid | -[ RECORD 2 ]---------+--------------------------------- queue | workorder_1143355 msg_id | 195C922E0702B4755AE8D4774B062536 corr_id | msg_priority | 1 msg_state | READY delay | 2023-05-26 23:03:15 delay_timestamp | 2023-05-26 23:03:15.223222 ...skipping one line enq_time | 2023-05-26 23:03:15 enq_timestamp | 2023-05-26 23:03:15.223222 enq_user_id | vastbase enq_txn_id | deq_time | deq_timestamp | deq_user_id | deq_txn_id | retry_count | 0 exception_queue_owner | public exception_queue | AQ$_work_order_1143355_1_E user_data | (1,message) original_queue_name | original_queue_owner | expiration_reason | sender_name | sender_address | sender_protocol | original_msgid | Expanded display is off. qid | q_name | queue_table | tgoid | tgtype | tgcontext | callback -----+---------------------+----------------------+-------+--------+-----------+ -------------------------- 2 | workorder_1143352 | work_order_1143352_1 | 21654 | 0 | 01 | plsql://public.p_1143352 2 | workorder_1143352 | work_order_1143352_1 | 21654 | 0 | 01 | plsql://public.p_1143352 4 | workorder_1143355 | work_order_1143355_1 | 21716 | 0 | 01 | plsql://public.p_1143355 5 | workorder_1143355_1 | work_order_1143355_1 | 21716 | 0 | 01 | plsql://public.p_1143355 (4 rows)
7、使用UNREGISTER存储过程注销。
declare s1 dbms_aq.aq$_reg_info; s11 dbms_aq.aq$_reg_info; s2 dbms_aq.aq$_reg_info_list; BEGIN s1:=dbms_aq.aq$_reg_info('workorder_1143355',dbms_aq.namespace_aq,'plsql://public.p_1143355_1',1,0); s11:=dbms_aq.aq$_reg_info('workorder_1143355_1',dbms_aq.namespace_aq,'plsql://public.p_1143355_1',1,0); s2:=dbms_aq.aq$_reg_info_list(s1,s11); dbms_aq.unregister(s2,2); end; /
8、查询队列信息。
select count(*) from aq$work_order_1143355_1; declare c1 dbms_AQ.dequeue_options_t; c2 dbms_AQ.message_properties_t; v1 raw(16); o1 work_order_1143355; BEGIN c1.wait:=dbms_AQ.no_wait; c1.dequeue_mode:=dbms_AQ.browse; dbms_AQ.dequeue(queue_name => 'workorder_1143355',dequeue_options => c1,message_properties => c2,payload => o1,msgid => v1); dbms_output.put_line(o1.name); end; /
查询结果显示为:
count ------- 2 (1 row)
9、执行出队操作。
declare c1 dbms_AQ.dequeue_options_t; c2 dbms_AQ.message_properties_t; v1 raw(16); o1 work_order_1143355; BEGIN v1:='123'; c1.wait:=dbms_AQ.no_wait; dbms_AQ.dequeue(queue_name => 'workorder_1143355_1',dequeue_options => c1,message_properties => c2,payload => o1,msgid => v1); dbms_output.put_line(o1.name); raise info '%',v1; end; /
10、查询测试表和队列信息。
\x
表示开启或关闭列式展示返回结果。select * from t1_1143355; \x select * from aq$work_order_1143355_1; \x select * from vb_queue_triggers;
查询结果显示为:
c1 | c2 | c3 | c4 ----+---------------------+----------------------------------+------------- 0 | workorder_1143355_1 | 2706990B85126A57EBE1DA005EF2F57D | SUCCESSFUL! 0 | workorder_1143355 | 195C922E0702B4755AE8D4774B062536 | SUCCESSFUL! 0 | workorder_1143355 | 195C922E0702B4755AE8D4774B062536 | SUCCESSFUL! 0 | workorder_1143355_1 | 2706990B85126A57EBE1DA005EF2F57D | SUCCESSFUL! (4 rows) Expanded display is on. -[ RECORD 1 ]---------+--------------------------------- queue | workorder_1143355 msg_id | 775C7F7021BFD62301FF887967BC1B23 corr_id | msg_priority | 1 msg_state | READY delay | 2023-05-27 00:58:19 delay_timestamp | 2023-05-27 00:58:19.036037 expiration | -1 enq_time | 2023-05-27 00:58:19 enq_timestamp | 2023-05-27 00:58:19.036037 enq_user_id | vastbase enq_txn_id | deq_time | deq_timestamp | deq_user_id | deq_txn_id | retry_count | 0 exception_queue_owner | public exception_queue | AQ$_work_order_1143355_1_E user_data | (1,message) original_queue_name | original_queue_owner | expiration_reason | sender_name | sender_address | sender_protocol | original_msgid | Expanded display is off. qid | q_name | queue_table | tgoid | tgtype | tgcontext | callback -----+---------------------+----------------------+-------+--------+-----------+ -------------------------- 2 | workorder_1143352 | work_order_1143352_1 | 21654 | 0 | 01 | plsql://public.p_1143352 2 | workorder_1143352 | work_order_1143352_1 | 21654 | 0 | 01 | plsql://public.p_1143352 4 | workorder_1143355 | work_order_1143355_1 | 21716 | 0 | 01 | plsql://public.p_1143355 5 | workorder_1143355_1 | work_order_1143355_1 | 21716 | 0 | 01 | plsql://public.p_1143355 (4 rows)
ENQUEUE
语法格式
ENQUEUE( queue_name IN VARCHAR2, enqueue_options IN DBMS_AQ.ENQUEUE_OPTIONS_T, message_properties IN DBMS_AQ.MESSAGE_PROPERTIES_T, payload IN <type_name>, msgid OUT RAW)
参数说明
queue_name
现有队列的名称。
enqueue_options
类型为enqueue_options_t,指定可用于入队操作的选项。
message_properties
类型为message_properties_t,并描述AQ用于传达各个消息的状态的信息。它们在入队时设置,其值在出队时返回。
payload
使用payload参数提供将与队列条目关联的数据。有效负载类型必须与创建对应的队列表时指定的类型匹配。
msgid
使用msgid参数检索唯一(系统生成的)消息标识符。
示例
1、创建类型和队列表。
CREATE TYPE work_order_1142336 as object (id int,name VARCHAR2(200)); call DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'work_order_1142336_1', queue_payload_type => 'work_order_1142336', storage_clause =>'TABLESPACE pg_default', comment =>'sknn');
2、创建普通队列并查看队列表。
call DBMS_AQADM.CREATE_QUEUE ( queue_name => 'workorder_1142336', queue_table => 'work_order_1142336_1', comment => 'This queue contains pending work orders.',queue_type =>DBMS_AQADM.NORMAL_QUEUE,max_retries=>3,retry_delay => 2); select retention,name,max_retries,retry_delay from all_dequeue_queues where queue_table='work_order_1142336_1' order by 1;
查询结果返回如下:
create_queue -------------- (1 row) retention | name | max_retries | retry_delay -----------+----------------------------+-------------+------------- 0 | aq$_work_order_1142336_1_e | 0 | 0 0 | workorder_1142336 | 3 | 2 (2 rows)
3、未启动队列时入队。
declare c1 dbms_AQ.enqueue_options_t; c2 dbms_AQ.message_properties_t; v1 raw(16); o1 work_order_1142336; BEGIN o1:=work_order_1142336(1,'message'); dbms_AQ.enqueue(queue_name => 'workorder_1142336',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1); commit; dbms_output.put_line(o1.name); end; /
结果显示为如下,提示入队失败:
ERROR: call dbms_aq.enqueue failed. ECODE: P0001. REASON: Message enqueue failed! enqueue ability of queue workorder_1142336 has disabled. CONTEXT: SQL statement "CALL dbms_aq.enqueue(queue_name => 'workorder_1142336',enqueue_options => c1,message_properties => c2,payload => o1,msgid=>v1)" PL/pgSQL function inline_code_block line 8 at SQL statement
入队(DBMS_AQ.enqueue)操作应在开启队列(DBMS_AQADM.start_QUEUE)后执行。
4、启动队列。
call DBMS_AQADM.start_QUEUE(queue_name => 'workorder_1142336');
5、启动队列后入队。
declare c1 dbms_AQ.enqueue_options_t; c2 dbms_AQ.message_properties_t; v1 raw(16); o1 work_order_1142336; BEGIN o1:=work_order_1142336(1,'message'); dbms_AQ.enqueue(queue_name => 'workorder_1142336',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1); commit; dbms_output.put_line(o1.name); end; /
6、查询队列信息。
select queue,msg_priority,msg_state,user_data,expiration from aq$work_order_1142336_1; select count(*) from aq$work_order_1142336_1;
结果显示为如下,表示入队成功:
queue | msg_priority | msg_state | user_data | expiration -------------------+--------------+-----------+-------------+------------ workorder_1142336 | 1 | READY | (1,message) | -1 (1 row) count ------- 1 (1 row)
DEQUEUE
语法格式
DEQUEUE( queue_name IN VARCHAR2, dequeue_options IN DBMS_AQ.DEQUEUE_OPTIONS_T, message_properties IN OUT DBMS_AQ.MESSAGE_PROPERTIES_T, payload IN OUT anyelement, msgid OUT RAW)
参数说明
queue_name
现有队列的名称。
dequeue_options
类型为dequeue_options_t,指定可用于出队操作的选项。
message_properties
类型为message_properties_t,并描述 AQ 用于传达各个消息的状态的信息。它们在入队时设置,其值在出队时返回。
payload
使用payload参数提供将与队列条目关联的数据。有效负载类型必须与创建对应的队列表时指定的类型匹配。
msgid
使用msgid 参数检索唯一(系统生成的)消息标识符。
示例
1、创建类型和队列表。
CREATE TYPE work_order_1142859 as object (id int,name VARCHAR2(200)); call DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'work_order_1142859_1', queue_payload_type => 'work_order_1142859', storage_clause =>'TABLESPACE pg_default', sort_list => 'enq_time,PRIORITY', comment =>'sknn');
2、创建普通队列,并启动队列。
call DBMS_AQADM.CREATE_QUEUE ( queue_name => 'workorder_1142859', queue_table => 'work_order_1142859_1', comment => 'This queue contains pending work orders.',dependency_tracking=>false,auto_commit => true); call dbms_AQADM.start_queue(queue_name => 'workorder_1142859');
3、支持入队操作。
declare c1 dbms_AQ.enqueue_options_t; c2 dbms_AQ.message_properties_t; v1 raw(16); o1 work_order_1142859; begin o1:=work_order_1142859(1,'message'); c2.priority :=1; c2.delay= dbms_aq.no_delay; dbms_AQ.enqueue(queue_name => 'workorder_1142859',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1); commit; dbms_output.put_line('入队成功。'); end; / declare c1 dbms_AQ.enqueue_options_t; c2 dbms_AQ.message_properties_t; v1 raw(16); o1 work_order_1142859; begin c2.priority :=0; c2.delay= 0; o1:=work_order_1142859(1,'message_2'); dbms_AQ.enqueue(queue_name => 'workorder_1142859',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1); dbms_output.put_line('入队成功1。'); end; /
4、查询队列信息。
select queue,msg_priority,msg_state,user_data,expiration from aq$work_order_1142859_1 order by 1; select count(*) from aq$work_order_1142859_1;
查询结果显示为:
queue | msg_priority | msg_state | user_data | expiration -------------------+--------------+-----------+---------------+------------ workorder_1142859 | 1 | READY | (1,message) | -1 workorder_1142859 | 0 | READY | (1,message_2) | -1 (2 rows) count ------- 2 (1 row)
5、按照先进先出的模式出队第一条。
declare c1 dbms_AQ.dequeue_options_t; c2 dbms_AQ.message_properties_t; v1 raw(16); o1 work_order_1142859; BEGIN c1.wait:=dbms_AQ.no_wait; dbms_AQ.dequeue(queue_name => 'workorder_1142859',dequeue_options => c1,message_properties => c2,payload => o1,msgid => v1); dbms_output.put_line(o1.name); end; /
6、查询队列信息。
select queue,msg_priority,msg_state,user_data,expiration from aq$work_order_1142859_1 order by 1; select count(*) from aq$work_order_1142859_1;
结果返回为如下:
queue | msg_priority | msg_state | user_data | expiration -------------------+--------------+-----------+---------------+------------ workorder_1142859 | 0 | READY | (1,message_2) | -1 (1 row) count ------- 1 (1 row)
7、按照先进先出的模式出队第二条
declare c1 dbms_AQ.dequeue_options_t; c2 dbms_AQ.message_properties_t; v1 raw(16); o1 work_order_1142859; BEGIN c1.wait:=dbms_AQ.no_wait; dbms_AQ.dequeue(queue_name => 'workorder_1142859',dequeue_options => c1,message_properties => c2,payload => o1,msgid => v1); dbms_output.put_line(o1.name); end; /
8、查询队列的信息。
select queue,msg_priority,msg_state,user_data,expiration from aq$work_order_1142859_1 order by 1; select count(*) from aq$work_order_1142859_1;
结果显示为如下:
queue | msg_priority | msg_state | user_data | expiration -------+--------------+-----------+-----------+------------ (0 rows) count ------- 0 (1 row)
相关链接