VastbaseG100

基于openGauss内核开发的企业级关系型数据库。

Menu

DBMS_AQ

功能描述

Vastbase G100在Oracle兼容模式下支持内置包DBMS_AQ,其中包含的存储过程负责添加消息到队列、从队列中删除消息、注册或注销PL/SQL回调存储过程。

注意事项

该功能仅在数据库兼容模式为Oracle时能够使用(即创建DB时DBCOMPATIBILITY='A'),在其他数据库兼容模式下不能使用该特性。

常量

该内置包包含以下常量:

参数 常量 说明
dequeue_options_t.dequeue_mode DBMS_AQ.BROWSE (0) 读取消息而不锁定,等同于SELECT。
DBMS_AQ.LOCKED (1) 获取锁定之后读取信息,等同于SELECT FOR UPDATE。
DBMS_AQ.REMOVE (2) 读取之后删除消息,该参数为默认值。
DBMS_AQ.REMOVE_NODATA (3) 将信息标记为已更新或删除,不立即删除信息。此参数暂不支持。
dequeue_options_t.navigation DBMS_AQ.FIRST_MESSAGE (0) 当出队期间有更高优先级的信息到达队列时,则可以优先处理更高优先级的信息。
DBMS_AQ.NEXT_MESSAGE (1) 出队期间将根据第一个出队信息获取的快照检索信息,在第一条信息出队之后,只有处理完队列中所有剩余信息,入队的信息才会被处理。该参数为默认值。
dequeue_options_t.wait DBMS_AQ.FOREVER (-1) 如果找不到与搜索词匹配的消息,则持续等待,该参数为默认值。
DBMS_AQ.NO_WAIT (0) 如果找不到与搜索词匹配的消息,则不等待。
enqueue_options_t.visibility 、 dequeue_options_t.visibility DBMS_AQ.ON_COMMIT (0) 默认值,若回退事务则出队项目保留在队列中。
enqueue_options_t.delivery_mode DBMS_AQ.PERSISTENT (0) 此消息应存储在表中。
DBMS_AQ.BUFFERED(1) 此常量已定义,但功能不支持。
message_properties_t.state DBMS_AQ.READY(0) 指定消息已经准备好进行处理。
DBMS_AQ.WAITTING (1) 指定消息正在等待处理。
DBMS_AQ.PROCESSED (2) 指定消息已处理。
DBMS_AQ.EXPIRED (3) 指定消息已过期并处于异常队列中。
message_properties_t.delay DBMS_AQ.NO_DELAY (0) 信息可用于出队之前经过的秒数。
message_properties_t.expiration DBMS_AQ.NEVER (-1) 指定信息过期的秒数,NEVER为默认值。
DBMS_AQ.aq$_reg_info.namespace DBMS_AQ.NAMESPACE_AQ (0) 接收来自DBMS_AQ队列的通知。
DBMS_AQ.NAMESPACE_ANONYMOUS (1) 此常量已定义,但功能不支持。
DBMS_AQ.aq$_reg_info.tgtype DBMS_AQ.BOTH 回调函数在出队入队时都执行。
DBMS_AQ.ENQUEUE_ONLY 回调函数仅在入队时执行。
DBMS_AQ.DEQUEUE_ONLY 回调函数仅在出队时执行。

类型

类型 描述
AQ$_REG_INFO 用于记录注册回调时需要的信息。
AQ$_REG_INFO_LIST AQ$_REG_INFO的列表。
AQ$_DESCRIPTOR 用户定义的回调函数中,必须存在此类型的入参。
ENQUEUE_OPTIONS_T 指定可用于取消排队操作的选项。
MESSAGE_PROPERTIES_T 描述AQ用于传达各个消息的状态的信息。
DEQUEUE_OPTIONS_T 可用于取消排队操作的选项。
  • 本部分介绍内置包的内置类型,各类型结构已在内置包中定义完成,无需用户手动创建。
  • 各类型的结构参见类型定义。
  • 类型定义中,DEFAULT表示指定默认值,其后接的内容即为指定的默认值。

AQ$_REG_INFO

类型定义

CREATE  TYPE  AQ$_REG_INFO  AS(
   name        VARCHAR2(128),
   namespace   NUMERIC         DEFAULT  DBMS_AQ.NAMESPACE_AQ,
   callback    VARCHAR2(4000),
   context     RAW(16),
   tgtype      INTEGER         DEFAULT  DBMS_AQ.BOTH
  );

参数说明

  • name

    类型为VARCHAR2(128),表示队列的名称,格式为[schema].queue。

  • namespace

    类型为NUMERIC,唯一支持的值为DBMS_AQ.NAMESPACE_AQ (0)。

  • callback

    类型为VARCHAR2(4000),描述对通知执行的操作。

    目前,仅支持调用PL/SQL存储过程。调用应采取以下形式:

    plsql://schema.procedure
    

    其中:

    • schema:指定存储过程所在的schema。

    • procedure:指定待通知的存储过程的名称。

  • context

    类型为RAW(16),回调存储过程所需的用户定义的值。

  • tgtype

    类型为INTEGER,标记执行回调的时机。

    • BOTH表示出入队都执行。
    • ENQUEUE_ONLY表示仅入队时执行。
    • DEQUEUE_ONLY表示仅出队时执行。

AQ$_DESCRIPTOR

类型定义

TYPE AQ$_DESCRIPTOR IS RECORD ( 
        queue_name          VARCHAR2(261),    
        consumer_name       VARCHAR2(512),          
        msg_id              RAW(16),      
        ntfnsRecdInGrp      NUMBER              
    );

参数说明

  • queue_name

    类型为VARCHAR2(261),表示触发回调操作的队列名。

  • consumer_name

    类型为VARCHAR2(512),目前仅支持单消费者模型,所以忽略此参数。

  • msg_id

    类型为RAW(16),表示触发回调操作信息的msgid。

  • ntfnsRecdInGrp

    类型为NUMBER,为Group相关参数,当前不支持。

ENQUEUE_OPTIONS_T

类型定义

CREATE TYPE DBMS_AQ.ENQUEUE_OPTIONS_T  AS(
  visibility            BINARY_INTEGER          DEFAULT DBMS_AQ.ON_COMMIT,
  relative_msgid        RAW(16)                 DEFAULT NULL,
  sequence_deviation    BINARY INTEGER          DEFAULT NULL,
  transformation        VARCHAR2(61)            DEFAULT NULL,
  delivery_mode         PLS_INTEGER NOT NULL    DEFAULT DBMS_AQ.PERSISTENT);

参数说明

  • visibility

    指定排队请求的事务行为。当前仅支持设置为ON_COMMIT。表示排队是当前事务的一部分。当事务提交时,操作完成。

  • delivery_mode

    排队程序指定其希望在排队选项中排队的消息的传递模式。当前仅支持设置为PERSISTENT。

  • sequence_deviation

    指定是否应该在队列中已存在的其他消息之前将正在加入队列的消息从队列中取出。当前仅支持设置为NULL。

  • transformation

    指定在对消息进行排队之前应用转换。转换函数的返回类型必须与队列的类型匹配。当前仅支持设置为NULL。

  • relative_msgid

    指定序列偏差操作中引用的消息的消息标识符。当前仅支持设置为NULL。

MESSAGE_PROPERTIES_T

类型定义

TYPE MESSAGE_PROPERTIES_T IS RECORD (
        priority            BINARY_INTEGER      NOT NULL DEFAULT 1, 
        delay               BINARY_INTEGER      NOT NULL DEFAULT DBMS_AQ.NO_DELAY,
        expiration          BINARY_INTEGER      NOT NULL DEFAULT DBMS_AQ.NEVER,
        correlation         VARCHAR2(128)       DEFAULT NULL,    
        attempts            BINARY_INTEGER,                        
        recipient_list      text                DEFAULT NULL,       
        exception_queue     VARCHAR2(61)        DEFAULT NULL,       
        enqueue_time        Date,                                   
        state               BINARY_INTEGER,                        
        sender_id           text                DEFAULT NULL,       
        original_msgid      RAW(16)             DEFAULT NULL,       
        transaction_group   VARCHAR2(30)        DEFAULT NULL,      
        user_property       text                DEFAULT NULL,  
        delivery_mode       INTEGER             NOT NULL DEFAULT DBMS_AQ.PERSISTENT
    );

参数说明

  • priority

    如果队列表定义包括sort_list并引用了 priority,则此参数影响消息出队的顺序。较低的值指示较高的出队优先级。

  • delay

    指定消息可用于出队之前将经过的秒数,或者NO_DELAY。

  • expiration

    使用expiration参数指定消息过期的秒数。

  • correlation

    使用correlation指定将与条目关联的消息,暂不支持此参数。

  • attempts

    这是系统维护的值,指定消息出队的尝试次数。不允许用户赋值。

  • recipient_list

    当前暂不支持此参数。

  • exception_queue

    使用exception_queue参数指定异常队列的名称,如果消息过期或者信息出队尝试次数超过限制,则消息将移动到该队列。

  • enqueue_time

    enqueue_time是记录添加到队列的时间。此值由系统提供,不允许用户赋值。

  • state

    此参数由DBMS_AQ维护,状态可以为:

    • DBMS_AQ.WAITING:队列条目正在等待处理。

    • DBMS_AQ.READY:队列条目已准备好处理。

    • DBMS_AQ.PROCESSED:队列条目已处理。

    • DBMS_AQ.EXPIRED:队列条目已移动到异常队列。

  • original_msgid

    为了实现兼容性而支持此参数,忽略此参数。

  • transaction_group

    为了实现兼容性而支持此参数,忽略此参数。

  • delivery_mode

    指定DBMS_AQ.PERSISTENT的值。不支持此参数

DEQUEUE_OPTIONS_T

类型定义

TYPE DEQUEUE_OPTIONS_T IS RECORD (
        consumer_name       VARCHAR2(30)        DEFAULT NULL,     
        dequeue_mode        BINARY_INTEGER      DEFAULT DBMS_AQ.REMOVE,          
        navigation          BINARY_INTEGER      DEFAULT DBMS_AQ.NEXT_MESSAGE,         
        visibility          BINARY_INTEGER      DEFAULT DBMS_AQ.ON_COMMIT,         
        wait                BINARY_INTEGER      DEFAULT DBMS_AQ.FOREVER,         
        msgid               RAW(16)             DEFAULT NULL,    
        correlation         VARCHAR2(128)       DEFAULT NULL,      
        deq_condition       VARCHAR2(4000)      DEFAULT NULL,      
        transformation      VARCHAR2(61)        DEFAULT NULL,       
        delivery_mode       INTEGER             DEFAULT DBMS_AQ.PERSISTENT           
    );

参数说明

  • consumer_name

    暂不支持,忽略此参数。

  • dequeue_mode

    出队操作的锁定行为。目前支持的有:

    • DBMS_AQ.BROWSE:读取消息而不获取锁定。

    • DBMS_AQ.LOCKED:获取写锁定之后读取消息。

    • DBMS_AQ.REMOVE:读取消息之后删除信息。该参数为默认值。

  • navigation

    标识将检索的消息。必须为如下选项:

    • NEXT_MESSAGE:第一条信息出队后入队的信息,只有在处理完队列中所有剩余信息后才会被处理。该参数为默认值。

    • FIRST_MESSAGE:若信息出队过程中有更高优先级信息入队,则优先处理。

  • visibility

    必须为ON_COMMIT,如果用户回退当前事务,出队项目将保持在队列中。

  • wait

    必须为大于0的数字,或者为如下选项:

    • DBMS_AQ.FOREVER:无限期等待。

    • DBMS_AQ.NO_WAIT:不等待。

  • msgid

    即将出队消息的ID。若指定此项,即使是waitting状态的信息也可以出队。

  • correlation

    为了实现兼容性而提供的支持,所以忽略此参数。

  • deq_condition

    一个VARCHAR2表达式,所求值为BOOLEAN值,指定消息是否应出队。若指定队列负载相关内容,格式为:tab.user_data.xxx。

  • transformation

    为了实现兼容性而提供的支持,所以忽略此参数。

  • delivery_mode

    必须为PERSISTENT。

子程序

该内置包包含以下存储过程:

存储过程名 描述
REGISTER 使用REGISTER存储过程在消息入队或出队时接收通知。
UNREGISTER 使用UNREGISTER存储过程关闭与入队和出队相关的通知。
ENQUEUE ENQUEUE存储过程将一个条目添加到队列。
DEQUEUE DEQUEUE存储过程让消息出队。

REGISTER

语法格式

REGISTER( 
reg_list  IN  SYS.AQ$_REG_INFO_LIST,
count     IN  NUMBER)

参数说明

  • reg_list

    类型为AQ$_REG_INFO_LIST,是AQ$_REG_INFO的列表。用于提供多个订阅相关信息。

  • count

    reg_list列表中元素个数。

注意事项

使用DBMS_AQ.REGISTER注册回调存储过程时,存储过程必须具有以下入参:

create or replace procedure plsqlcallback(
  context    RAW, 
  reginfo    DBMS_AQ.AQ$_REG_INFO, 
  descr      DBMS_AQ.AQ$_DESCRIPTOR, 
  payload    RAW,
  payloadl   NUMBER)
  { IS | AS } plsql_body
/
  • 也可以使用DBMS_AQ.REGISTER注册回调函数,但函数的返回值无法获取。
  • 使用DBMS_AQ.REGISTER注册回调函数时,也应遵守上述入参限制。

示例

1、创建类型和队列表。

CREATE TYPE work_order_1143352 as object (id int,name VARCHAR2(200));

call DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'work_order_1143352_1',
queue_payload_type => 'work_order_1143352');

2、创建普通队列。

call DBMS_AQADM.CREATE_QUEUE ( queue_name => 'workorder_1143352', queue_table => 'work_order_1143352_1',max_retries => 3);

3、启动队列。

call dbms_AQADM.start_queue(queue_name => 'workorder_1143352');

4、创建测试表和存储过程。

create temp table t1 (c1 int,c2 text,c3 text,c4 text);
create or replace procedure p_1143352(context raw, c2 dbms_aq.aq$_reg_info,des dbms_aq.aq$_descriptor, payload raw, playload1 number)
as
begin
insert into t1 values(c2.tgtype,c2.name,des.msg_id,'SUCCESSFUL!');
end;
/

5、使用REGISTER存储过程注册,在消息入队或出队时接收通知。

declare
s1 dbms_aq.aq$_reg_info;
s11 dbms_aq.aq$_reg_info;
s2 dbms_aq.aq$_reg_info_list;
BEGIN
s1:=dbms_aq.aq$_reg_info('workorder_1143352',dbms_aq.namespace_aq,'plsql://public.p_1143352',1,0);
s11:=dbms_aq.aq$_reg_info('workorder_1143352',dbms_aq.namespace_aq,'plsql://public.p_1143352',1,0);
s2:=dbms_aq.aq$_reg_info_list(s1,s11);
dbms_aq.register(s2,2);
end;
/

6、执行入队操作。

declare
c1 dbms_AQ.enqueue_options_t;
c2 dbms_AQ.message_properties_t;
v1 raw(16);
o1 work_order_1143352;
begin
o1:=work_order_1143352(1,'message');
dbms_AQ.enqueue(queue_name => 'workorder_1143352',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1);
commit;
dbms_output.put_line('入队成功。');
end;
/

declare
c1 dbms_AQ.enqueue_options_t;
c2 dbms_AQ.message_properties_t;
v1 raw(16);
o1 work_order_1143352;
begin
o1:=work_order_1143352(1,'message');
dbms_AQ.enqueue(queue_name => 'workorder_1143352',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1);
commit;
dbms_output.put_line('入队成功。');
end;
/

7、查询测试表中的数据量。

select count(*) from t1;

返回结果显示为:

 count
-------
     4
(1 row)

8、查询队列信息。

select count(*) from aq$work_order_1143352_1;
declare
c1 dbms_AQ.dequeue_options_t;
c2 dbms_AQ.message_properties_t;
v1 raw(16);
o1 work_order_1143352;
BEGIN
c1.wait:=dbms_AQ.no_wait;
c1.dequeue_mode:=dbms_AQ.browse;
dbms_AQ.dequeue(queue_name => 'workorder_1143352',dequeue_options => c1,message_properties => c2,payload => o1,msgid => v1);
dbms_output.put_line(o1.name);
end;
/

结果显示为如下:

 count
-------
     2
(1 row)


ANONYMOUS BLOCK EXECUTE

9、执行出队操作。

declare
c1 dbms_AQ.dequeue_options_t;
c2 dbms_AQ.message_properties_t;
v1 raw(16);
o1 work_order_1143352;
BEGIN
c1.wait:=dbms_AQ.no_wait;
dbms_AQ.dequeue(queue_name => 'workorder_1143352',dequeue_options => c1,message_properties => c2,payload => o1,msgid => v1);
dbms_output.put_line(o1.name);
--raise info '%',v1;
end;
/

10、查询队列信息和测试表信息。

select count(*) from aq$work_order_1143352_1;
select * from t1;

查询结果显示为如下:

 count
-------
     1
(1 row)


 c1 |        c2         |                c3                |     c4
----+-------------------+----------------------------------+-------------
  0 | workorder_1143352 | AEAB1A4F00D7C83A4A0C1F40D156A11D | SUCCESSFUL!
  0 | workorder_1143352 | AEAB1A4F00D7C83A4A0C1F40D156A11D | SUCCESSFUL!
  0 | workorder_1143352 | 1073CF7A32B30C2CA9F96868EE5E9123 | SUCCESSFUL!
  0 | workorder_1143352 | 1073CF7A32B30C2CA9F96868EE5E9123 | SUCCESSFUL!
  0 | workorder_1143352 | AEAB1A4F00D7C83A4A0C1F40D156A11D | SUCCESSFUL!
  0 | workorder_1143352 | AEAB1A4F00D7C83A4A0C1F40D156A11D | SUCCESSFUL!
  0 | workorder_1143352 | AEAB1A4F00D7C83A4A0C1F40D156A11D | SUCCESSFUL!
  0 | workorder_1143352 | AEAB1A4F00D7C83A4A0C1F40D156A11D | SUCCESSFUL!
(8 rows)

UNREGISTER

语法格式

UNREGISTER(
reg_list  IN  AQ$_REG_INFO_LIST,
count  IN  NUMBER)

参数说明

  • reg_list

    类型为AQ$_REG_INFO_LIST,是AQ$_REG_INFO的列表。用于提供多个订阅相关信息。

  • count

    reg_list列表中元素个数。

示例

1、创建类型和队列表。

CREATE TYPE work_order_1143355 as object (id int,name VARCHAR2(200));
call DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'work_order_1143355_1',
queue_payload_type => 'work_order_1143355');

2、创建普通队列。

call DBMS_AQADM.CREATE_QUEUE ( queue_name => 'workorder_1143355', queue_table => 'work_order_1143355_1',max_retries => 3);
call DBMS_AQADM.CREATE_QUEUE ( queue_name => 'workorder_1143355_1', queue_table => 'work_order_1143355_1',max_retries => 3);

3、启动队列。

call dbms_AQADM.start_queue(queue_name => 'workorder_1143355_1');
call dbms_AQADM.start_queue(queue_name => 'workorder_1143355');

4、创建测试表和存储过程。

create table t1_1143355 (c1 int,c2 text,c3 text,c4 text);
create or replace procedure p_1143355(context raw, c2 dbms_aq.aq$_reg_info,des dbms_aq.aq$_descriptor, payload raw, playload1 number)
as
begin
insert into t1_1143355 values(c2.tgtype,c2.name,des.msg_id,'SUCCESSFUL!');
end;
/

create or replace procedure p_1143355_1(context raw, c2 dbms_aq.aq$_reg_info,des dbms_aq.aq$_descriptor, payload raw, playload1 number)
as
begin
delete from t1_1143355 where c3=des.msg_id;
end;
/

5、使用REGISTER存储过程注册,在消息入队或出队时接收通知。

declare
s1 dbms_aq.aq$_reg_info;
s11 dbms_aq.aq$_reg_info;
s2 dbms_aq.aq$_reg_info_list;
BEGIN
s1:=dbms_aq.aq$_reg_info('workorder_1143355',dbms_aq.namespace_aq,'plsql://public.p_1143355',1,0);
s11:=dbms_aq.aq$_reg_info('workorder_1143355_1',dbms_aq.namespace_aq,'plsql://public.p_1143355',1,0);
s2:=dbms_aq.aq$_reg_info_list(s1,s11);
dbms_aq.register(s2,2);
end;
/

declare
c1 dbms_AQ.enqueue_options_t;
c2 dbms_AQ.message_properties_t;
v1 raw(16);
o1 work_order_1143355;
begin
o1:=work_order_1143355(1,'message');
dbms_AQ.enqueue(queue_name => 'workorder_1143355_1',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1);
dbms_AQ.enqueue(queue_name => 'workorder_1143355',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1);
commit;
dbms_output.put_line('入队成功。');
end;
/

6、查询测试表和队列信息。\x表示开启或关闭列式展示查询结果。

select * from t1_1143355;
\x
select * from aq$work_order_1143355_1;
\x
select * from vb_queue_triggers;

查询结果显示为:

 c1 |         c2          |                c3                |     c4
----+---------------------+----------------------------------+-------------
  0 | workorder_1143355_1 | 2706990B85126A57EBE1DA005EF2F57D | SUCCESSFUL!
  0 | workorder_1143355   | 195C922E0702B4755AE8D4774B062536 | SUCCESSFUL!
(2 rows)

Expanded display is on.
-[ RECORD 1 ]---------+---------------------------------
queue                 | workorder_1143355_1
msg_id                | 2706990B85126A57EBE1DA005EF2F57D
corr_id               |
msg_priority          | 1
msg_state             | READY
delay                 | 2023-05-26 23:03:15
delay_timestamp       | 2023-05-26 23:03:15.217726
expiration            | -1
enq_time              | 2023-05-26 23:03:15
enq_timestamp         | 2023-05-26 23:03:15.217726
enq_user_id           | vastbase
enq_txn_id            |
deq_time              |
deq_timestamp         |
deq_user_id           |
deq_txn_id            |
retry_count           | 0
exception_queue_owner | public
exception_queue       | AQ$_work_order_1143355_1_E
user_data             | (1,message)
original_queue_name   |
original_queue_owner  |
expiration_reason     |
sender_name           |
sender_address        |
sender_protocol       |
original_msgid        |
-[ RECORD 2 ]---------+---------------------------------
queue                 | workorder_1143355
msg_id                | 195C922E0702B4755AE8D4774B062536
corr_id               |
msg_priority          | 1
msg_state             | READY
delay                 | 2023-05-26 23:03:15
delay_timestamp       | 2023-05-26 23:03:15.223222

...skipping one line
enq_time              | 2023-05-26 23:03:15
enq_timestamp         | 2023-05-26 23:03:15.223222
enq_user_id           | vastbase
enq_txn_id            |
deq_time              |
deq_timestamp         |
deq_user_id           |
deq_txn_id            |
retry_count           | 0
exception_queue_owner | public
exception_queue       | AQ$_work_order_1143355_1_E
user_data             | (1,message)
original_queue_name   |
original_queue_owner  |
expiration_reason     |
sender_name           |
sender_address        |
sender_protocol       |
original_msgid        |

Expanded display is off.

 qid |       q_name        |     queue_table      | tgoid | tgtype | tgcontext |
         callback
-----+---------------------+----------------------+-------+--------+-----------+
--------------------------
   2 | workorder_1143352   | work_order_1143352_1 | 21654 |      0 | 01        |
 plsql://public.p_1143352
   2 | workorder_1143352   | work_order_1143352_1 | 21654 |      0 | 01        |
 plsql://public.p_1143352
   4 | workorder_1143355   | work_order_1143355_1 | 21716 |      0 | 01        |
 plsql://public.p_1143355
   5 | workorder_1143355_1 | work_order_1143355_1 | 21716 |      0 | 01        |
 plsql://public.p_1143355
(4 rows)

7、使用UNREGISTER存储过程注销。

declare
s1 dbms_aq.aq$_reg_info;
s11 dbms_aq.aq$_reg_info;
s2 dbms_aq.aq$_reg_info_list;
BEGIN
s1:=dbms_aq.aq$_reg_info('workorder_1143355',dbms_aq.namespace_aq,'plsql://public.p_1143355_1',1,0);
s11:=dbms_aq.aq$_reg_info('workorder_1143355_1',dbms_aq.namespace_aq,'plsql://public.p_1143355_1',1,0);
s2:=dbms_aq.aq$_reg_info_list(s1,s11);
dbms_aq.unregister(s2,2);
end;
/

8、查询队列信息。

select count(*) from aq$work_order_1143355_1;
declare
c1 dbms_AQ.dequeue_options_t;
c2 dbms_AQ.message_properties_t;
v1 raw(16);
o1 work_order_1143355;
BEGIN
c1.wait:=dbms_AQ.no_wait;
c1.dequeue_mode:=dbms_AQ.browse;
dbms_AQ.dequeue(queue_name => 'workorder_1143355',dequeue_options => c1,message_properties => c2,payload => o1,msgid => v1);
dbms_output.put_line(o1.name);
end;
/

查询结果显示为:

 count
-------
     2
(1 row)

9、执行出队操作。

declare
c1 dbms_AQ.dequeue_options_t;
c2 dbms_AQ.message_properties_t;
v1 raw(16);
o1 work_order_1143355;
BEGIN
v1:='123';
c1.wait:=dbms_AQ.no_wait;
dbms_AQ.dequeue(queue_name => 'workorder_1143355_1',dequeue_options => c1,message_properties => c2,payload => o1,msgid => v1);
dbms_output.put_line(o1.name);
raise info '%',v1;
end;
/

10、查询测试表和队列信息。\x表示开启或关闭列式展示返回结果。

select * from t1_1143355;
\x
select * from aq$work_order_1143355_1;
\x
select * from vb_queue_triggers; 

查询结果显示为:

 c1 |         c2          |                c3                |     c4
----+---------------------+----------------------------------+-------------
  0 | workorder_1143355_1 | 2706990B85126A57EBE1DA005EF2F57D | SUCCESSFUL!
  0 | workorder_1143355   | 195C922E0702B4755AE8D4774B062536 | SUCCESSFUL!
  0 | workorder_1143355   | 195C922E0702B4755AE8D4774B062536 | SUCCESSFUL!
  0 | workorder_1143355_1 | 2706990B85126A57EBE1DA005EF2F57D | SUCCESSFUL!
(4 rows)

Expanded display is on.

-[ RECORD 1 ]---------+---------------------------------
queue                 | workorder_1143355
msg_id                | 775C7F7021BFD62301FF887967BC1B23
corr_id               |
msg_priority          | 1
msg_state             | READY
delay                 | 2023-05-27 00:58:19
delay_timestamp       | 2023-05-27 00:58:19.036037
expiration            | -1
enq_time              | 2023-05-27 00:58:19
enq_timestamp         | 2023-05-27 00:58:19.036037
enq_user_id           | vastbase
enq_txn_id            |
deq_time              |
deq_timestamp         |
deq_user_id           |
deq_txn_id            |
retry_count           | 0
exception_queue_owner | public
exception_queue       | AQ$_work_order_1143355_1_E
user_data             | (1,message)
original_queue_name   |
original_queue_owner  |
expiration_reason     |
sender_name           |
sender_address        |
sender_protocol       |
original_msgid        |

Expanded display is off.

 qid |       q_name        |     queue_table      | tgoid | tgtype | tgcontext |
         callback
-----+---------------------+----------------------+-------+--------+-----------+
--------------------------
   2 | workorder_1143352   | work_order_1143352_1 | 21654 |      0 | 01        |
 plsql://public.p_1143352
   2 | workorder_1143352   | work_order_1143352_1 | 21654 |      0 | 01        |
 plsql://public.p_1143352
   4 | workorder_1143355   | work_order_1143355_1 | 21716 |      0 | 01        |
 plsql://public.p_1143355
   5 | workorder_1143355_1 | work_order_1143355_1 | 21716 |      0 | 01        |
 plsql://public.p_1143355
(4 rows)

ENQUEUE

语法格式

ENQUEUE(
   queue_name  IN  VARCHAR2,
   enqueue_options  IN  DBMS_AQ.ENQUEUE_OPTIONS_T,
   message_properties  IN  DBMS_AQ.MESSAGE_PROPERTIES_T,
   payload IN <type_name>,
   msgid  OUT  RAW)

参数说明

  • queue_name

    现有队列的名称。

  • enqueue_options

    类型为enqueue_options_t,指定可用于入队操作的选项。

  • message_properties

    类型为message_properties_t,并描述AQ用于传达各个消息的状态的信息。它们在入队时设置,其值在出队时返回。

  • payload

    使用payload参数提供将与队列条目关联的数据。有效负载类型必须与创建对应的队列表时指定的类型匹配。

  • msgid

    使用msgid参数检索唯一(系统生成的)消息标识符。

示例

1、创建类型和队列表。

CREATE TYPE work_order_1142336 as object (id int,name VARCHAR2(200));

call DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'work_order_1142336_1',
queue_payload_type => 'work_order_1142336',
storage_clause =>'TABLESPACE pg_default',
comment =>'sknn');

2、创建普通队列并查看队列表。

call DBMS_AQADM.CREATE_QUEUE ( queue_name => 'workorder_1142336', queue_table => 'work_order_1142336_1', comment => 'This queue contains pending work orders.',queue_type =>DBMS_AQADM.NORMAL_QUEUE,max_retries=>3,retry_delay => 2);
select retention,name,max_retries,retry_delay from all_dequeue_queues where queue_table='work_order_1142336_1' order by 1;

查询结果返回如下:

 create_queue
--------------

(1 row)

 retention |            name            | max_retries | retry_delay
-----------+----------------------------+-------------+-------------
 0         | aq$_work_order_1142336_1_e |           0 |           0
 0         | workorder_1142336          |           3 |           2
(2 rows)

3、未启动队列时入队。

declare
c1 dbms_AQ.enqueue_options_t;
c2 dbms_AQ.message_properties_t;
v1 raw(16);
o1 work_order_1142336;
BEGIN
o1:=work_order_1142336(1,'message');
dbms_AQ.enqueue(queue_name => 'workorder_1142336',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1);
commit;
dbms_output.put_line(o1.name);
end;
/

结果显示为如下,提示入队失败:

ERROR:  call dbms_aq.enqueue failed.
ECODE:  P0001.
REASON: Message enqueue failed! enqueue ability of queue workorder_1142336 has disabled.
CONTEXT:  SQL statement "CALL dbms_aq.enqueue(queue_name => 'workorder_1142336',enqueue_options => c1,message_properties => c2,payload => o1,msgid=>v1)"
PL/pgSQL function inline_code_block line 8 at SQL statement

入队(DBMS_AQ.enqueue)操作应在开启队列(DBMS_AQADM.start_QUEUE)后执行。

4、启动队列。

call DBMS_AQADM.start_QUEUE(queue_name => 'workorder_1142336');

5、启动队列后入队。

declare
c1 dbms_AQ.enqueue_options_t;
c2 dbms_AQ.message_properties_t;
v1 raw(16);
o1 work_order_1142336;
BEGIN
o1:=work_order_1142336(1,'message');
dbms_AQ.enqueue(queue_name => 'workorder_1142336',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1);
commit;
dbms_output.put_line(o1.name);
end;
/

6、查询队列信息。

select queue,msg_priority,msg_state,user_data,expiration from aq$work_order_1142336_1;
select count(*) from aq$work_order_1142336_1;

结果显示为如下,表示入队成功:

       queue       | msg_priority | msg_state |  user_data  | expiration
-------------------+--------------+-----------+-------------+------------
 workorder_1142336 |            1 | READY     | (1,message) |         -1
(1 row)

 count
-------
     1
(1 row)

DEQUEUE

语法格式

DEQUEUE(
   queue_name IN  VARCHAR2,
   dequeue_options IN  DBMS_AQ.DEQUEUE_OPTIONS_T,
   message_properties  IN OUT  DBMS_AQ.MESSAGE_PROPERTIES_T,
   payload   IN OUT  anyelement,
   msgid OUT RAW)

参数说明

  • queue_name

    现有队列的名称。

  • dequeue_options

    类型为dequeue_options_t,指定可用于出队操作的选项。

  • message_properties

    类型为message_properties_t,并描述 AQ 用于传达各个消息的状态的信息。它们在入队时设置,其值在出队时返回。

  • payload

    使用payload参数提供将与队列条目关联的数据。有效负载类型必须与创建对应的队列表时指定的类型匹配。

  • msgid

    使用msgid 参数检索唯一(系统生成的)消息标识符。

示例

1、创建类型和队列表。

CREATE TYPE work_order_1142859 as object (id int,name VARCHAR2(200));
call DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'work_order_1142859_1',
queue_payload_type => 'work_order_1142859',
storage_clause =>'TABLESPACE pg_default',
sort_list => 'enq_time,PRIORITY',
comment =>'sknn');

2、创建普通队列,并启动队列。

call DBMS_AQADM.CREATE_QUEUE ( queue_name => 'workorder_1142859', queue_table => 'work_order_1142859_1', comment => 'This queue contains pending work orders.',dependency_tracking=>false,auto_commit => true);
call dbms_AQADM.start_queue(queue_name => 'workorder_1142859');

3、支持入队操作。

declare
c1 dbms_AQ.enqueue_options_t;
c2 dbms_AQ.message_properties_t;
v1 raw(16);
o1 work_order_1142859;
begin
o1:=work_order_1142859(1,'message');
c2.priority :=1;
c2.delay= dbms_aq.no_delay;
dbms_AQ.enqueue(queue_name => 'workorder_1142859',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1);
commit;
dbms_output.put_line('入队成功。');
end;
/

declare
c1 dbms_AQ.enqueue_options_t;
c2 dbms_AQ.message_properties_t;
v1 raw(16);
o1 work_order_1142859;
begin
c2.priority :=0;
c2.delay= 0;
o1:=work_order_1142859(1,'message_2');
dbms_AQ.enqueue(queue_name => 'workorder_1142859',enqueue_options => c1,message_properties => c2,payload => o1,msgid => v1);
dbms_output.put_line('入队成功1。');
end;
/

4、查询队列信息。

select queue,msg_priority,msg_state,user_data,expiration from aq$work_order_1142859_1 order by 1;
select count(*) from aq$work_order_1142859_1;

查询结果显示为:

       queue       | msg_priority | msg_state |   user_data   | expiration
-------------------+--------------+-----------+---------------+------------
 workorder_1142859 |            1 | READY     | (1,message)   |         -1
 workorder_1142859 |            0 | READY     | (1,message_2) |         -1
(2 rows)

 count
-------
     2
(1 row)

5、按照先进先出的模式出队第一条。

declare
c1 dbms_AQ.dequeue_options_t;
c2 dbms_AQ.message_properties_t;
v1 raw(16);
o1 work_order_1142859;
BEGIN
c1.wait:=dbms_AQ.no_wait;
dbms_AQ.dequeue(queue_name => 'workorder_1142859',dequeue_options => c1,message_properties => c2,payload => o1,msgid => v1);
dbms_output.put_line(o1.name);
end;
/

6、查询队列信息。

select queue,msg_priority,msg_state,user_data,expiration from aq$work_order_1142859_1 order by 1;
select count(*) from aq$work_order_1142859_1;

结果返回为如下:

       queue       | msg_priority | msg_state |   user_data   | expiration
-------------------+--------------+-----------+---------------+------------
 workorder_1142859 |            0 | READY     | (1,message_2) |         -1
(1 row)

 count
-------
     1
(1 row)

7、按照先进先出的模式出队第二条

declare
c1 dbms_AQ.dequeue_options_t;
c2 dbms_AQ.message_properties_t;
v1 raw(16);
o1 work_order_1142859;
BEGIN
c1.wait:=dbms_AQ.no_wait;
dbms_AQ.dequeue(queue_name => 'workorder_1142859',dequeue_options => c1,message_properties => c2,payload => o1,msgid => v1);
dbms_output.put_line(o1.name);
end;
/

8、查询队列的信息。

select queue,msg_priority,msg_state,user_data,expiration from aq$work_order_1142859_1 order by 1;
select count(*) from aq$work_order_1142859_1;

结果显示为如下:

 queue | msg_priority | msg_state | user_data | expiration
-------+--------------+-----------+-----------+------------
(0 rows)

 count
-------
     0
(1 row)

相关链接

DBMA_AQADM