📄 aq.pkg
字号:
/* Formatted on 2001/09/10 16:08 (RevealNet Formatter v4.4.1) */
CREATE OR REPLACE PACKAGE aq
IS
/* Standard datatypes for use with Oracle AQ. */
v_msgid RAW (16);
SUBTYPE msgid_type IS v_msgid%TYPE;
v_name VARCHAR2 (49);
SUBTYPE name_type IS v_name%TYPE;
/* Some common exceptions encountered when working with Oracle AQ.
You will probably want to add more; see chapter in book. */
dequeue_disabled EXCEPTION;
PRAGMA exception_init (dequeue_disabled, -25226);
dequeue_timeout EXCEPTION;
PRAGMA exception_init (dequeue_timeout, -25228);
end_of_message_group EXCEPTION;
PRAGMA exception_init (end_of_message_group, -25235);
/* Create and Remove queue objects. */
PROCEDURE create_queue (
qtable IN VARCHAR2,
payload_type IN VARCHAR2,
qname IN VARCHAR2,
prioritize IN VARCHAR2
:= NULL,
qtype IN INTEGER
:= DBMS_AQADM.normal_queue
);
PROCEDURE create_priority_queue (
qtable IN VARCHAR2,
payload_type IN VARCHAR2,
qname IN VARCHAR2,
prioritize IN VARCHAR2
:= 'PRIORITY,ENQ_TIME'
);
/* Hides the need for an agent object. */
PROCEDURE add_subscriber (
qname IN VARCHAR2,
subscriber IN VARCHAR2,
address IN VARCHAR2 := NULL
);
PROCEDURE stop_and_drop (
qtable IN VARCHAR2,
qname IN VARCHAR2 := '%',
enqueue IN BOOLEAN := TRUE,
dequeue IN BOOLEAN := TRUE,
wait IN BOOLEAN := TRUE
);
PROCEDURE killqt (qtable IN VARCHAR2);
/* Retrieve queue information */
FUNCTION queue_exists (
qname IN VARCHAR2
)
RETURN BOOLEAN;
FUNCTION qtable_exists (
qtable IN VARCHAR2
)
RETURN BOOLEAN;
FUNCTION msgcount (
qtable IN VARCHAR2,
qname IN VARCHAR2
)
RETURN INTEGER;
FUNCTION msgdata (
qtable_in IN VARCHAR2,
msgid_in IN RAW,
data_in IN VARCHAR2
)
RETURN VARCHAR2;
PROCEDURE showmsgs (
qtable IN VARCHAR2,
qname IN VARCHAR2
);
END;
/
CREATE OR REPLACE PACKAGE BODY aq
IS
g_dyncur PLS_INTEGER;
/* Private program */
PROCEDURE initcur
IS
BEGIN
IF NOT DBMS_SQL.is_open (
g_dyncur
)
THEN
g_dyncur :=
DBMS_SQL.open_cursor;
END IF;
EXCEPTION
WHEN INVALID_CURSOR
THEN
g_dyncur :=
DBMS_SQL.open_cursor;
END;
/* Check status of AQ objects */
FUNCTION queue_exists (
qname IN VARCHAR2
)
RETURN BOOLEAN
IS
CURSOR q_cur
IS
SELECT 'x'
FROM user_queues
WHERE name = UPPER (qname);
q_rec q_cur%ROWTYPE;
retval BOOLEAN;
BEGIN
OPEN q_cur;
FETCH q_cur INTO q_rec;
retval := q_cur%FOUND;
CLOSE q_cur;
RETURN retval;
END;
FUNCTION qtable_exists (
qtable IN VARCHAR2
)
RETURN BOOLEAN
IS
CURSOR q_cur
IS
SELECT 'x'
FROM user_queue_tables
WHERE queue_table =
UPPER (qtable);
q_rec q_cur%ROWTYPE;
BEGIN
OPEN q_cur;
FETCH q_cur INTO q_rec;
RETURN q_cur%FOUND;
END;
PROCEDURE create_queue (
qtable IN VARCHAR2,
payload_type IN VARCHAR2,
qname IN VARCHAR2,
prioritize IN VARCHAR2
:= NULL,
qtype IN INTEGER
:= DBMS_AQADM.normal_queue
)
IS
BEGIN
IF NOT qtable_exists (qtable)
THEN
DBMS_AQADM.create_queue_table (
queue_table=> qtable,
queue_payload_type=> payload_type,
sort_list=> NVL (
prioritize,
'ENQ_TIME'
)
);
END IF;
IF NOT queue_exists (qname)
THEN
DBMS_AQADM.create_queue (
queue_name=> qname,
queue_table=> qtable,
queue_type=> qtype
);
END IF;
DBMS_AQADM.start_queue (
queue_name=> qname,
enqueue=> qtype !=
DBMS_AQADM.exception_queue
);
END;
PROCEDURE create_priority_queue (
qtable IN VARCHAR2,
payload_type IN VARCHAR2,
qname IN VARCHAR2,
prioritize IN VARCHAR2
:= 'PRIORITY,ENQ_TIME'
)
IS
BEGIN
create_queue (
qtable,
payload_type,
qname,
prioritize
);
END;
PROCEDURE add_subscriber (
qname IN VARCHAR2,
subscriber IN VARCHAR2,
address IN VARCHAR2 := NULL
)
IS
v_agent sys.aq$_agent;
BEGIN
v_agent :=
sys.aq$_agent (
subscriber,
address,
NULL
);
DBMS_AQADM.add_subscriber (
qname,
v_agent
);
END;
/* Stop and Drop Utilities */
PROCEDURE stop_and_drop (
qtable IN VARCHAR2,
qname IN VARCHAR2 := '%',
enqueue IN BOOLEAN := TRUE,
dequeue IN BOOLEAN := TRUE,
wait IN BOOLEAN := TRUE
)
IS
CURSOR q_cur
IS
SELECT name
FROM user_queues
WHERE queue_table =
UPPER (qtable);
all_dropped BOOLEAN
:= enqueue
AND dequeue;
BEGIN
FOR q_rec IN q_cur
LOOP
IF q_rec.name LIKE
UPPER (qname)
THEN
p.l (q_rec.name);
DBMS_AQADM.stop_queue (
q_rec.name,
enqueue,
dequeue,
wait
);
DBMS_OUTPUT.put_line (
'stopping '
|| q_rec.name
);
IF enqueue
AND dequeue
THEN
DBMS_AQADM.drop_queue (
q_rec.name
);
DBMS_OUTPUT.put_line (
'dropping '
|| q_rec.name
);
END IF;
ELSE
all_dropped := FALSE;
END IF;
END LOOP;
IF all_dropped
AND qtable_exists (qtable)
THEN
DBMS_AQADM.drop_queue_table (
qtable,
force => TRUE
);
DBMS_OUTPUT.put_line (
'dropping '
|| qtable
);
END IF;
END;
PROCEDURE killqt (qtable IN VARCHAR2)
IS
BEGIN
DBMS_AQADM.drop_queue_table (
qtable,
force => TRUE
);
END;
/* Retrieve information about queues */
/* Enhance this so that you provide just the queue name and look
up the queue table name from that. */
FUNCTION msgcount (
qtable IN VARCHAR2,
qname IN VARCHAR2
)
RETURN INTEGER
IS
fdbk PLS_INTEGER;
retval PLS_INTEGER;
BEGIN
initcur;
DBMS_SQL.parse (
g_dyncur,
'SELECT COUNT(*) FROM AQ$'
|| qtable
|| ' WHERE queue = :qname',
DBMS_SQL.native
);
DBMS_SQL.define_column (
g_dyncur,
1,
1
);
DBMS_SQL.bind_variable (
g_dyncur,
'qname',
UPPER (qname)
);
fdbk :=
DBMS_SQL.execute_and_fetch (
g_dyncur
);
DBMS_SQL.column_value (
g_dyncur,
1,
retval
);
RETURN retval;
END;
FUNCTION msgdata (
qtable_in IN VARCHAR2,
msgid_in IN RAW,
data_in IN VARCHAR2
)
RETURN VARCHAR2
IS
fdbk PLS_INTEGER;
retval VARCHAR2 (2000);
BEGIN
initcur;
DBMS_SQL.parse (
g_dyncur,
'SELECT '
|| data_in
|| ' FROM AQ$'
|| qtable_in
|| ' WHERE msg_id = :msgid',
DBMS_SQL.native
);
DBMS_SQL.define_column (
g_dyncur,
1,
'a',
2000
);
DBMS_SQL.bind_variable_raw (
g_dyncur,
'msgid',
msgid_in
);
fdbk :=
DBMS_SQL.execute_and_fetch (
g_dyncur
);
IF fdbk = 1
THEN
DBMS_SQL.column_value (
g_dyncur,
1,
retval
);
ELSE
NULL;
END IF;
RETURN retval;
END;
PROCEDURE showmsgs (
qtable IN VARCHAR2,
qname IN VARCHAR2
)
IS
fdbk PLS_INTEGER;
v_msg_priority PLS_INTEGER;
v_msg_state VARCHAR2 (30);
v_retry_count PLS_INTEGER;
v_correlation VARCHAR2 (30);
BEGIN
initcur;
DBMS_SQL.parse (
g_dyncur,
'SELECT msg_priority, msg_state, retry_count, corr_id FROM AQ$'
|| qtable
|| ' WHERE queue = :qname',
DBMS_SQL.native
);
DBMS_SQL.define_column (
g_dyncur,
1,
1
);
DBMS_SQL.define_column (
g_dyncur,
2,
'a',
30
);
DBMS_SQL.define_column (
g_dyncur,
3,
1
);
DBMS_SQL.define_column (
g_dyncur,
4,
'a',
30
);
DBMS_SQL.bind_variable (
g_dyncur,
'qname',
UPPER (qname)
);
fdbk :=
DBMS_SQL.EXECUTE (g_dyncur);
WHILE DBMS_SQL.fetch_rows (
g_dyncur
) > 0
LOOP
IF v_msg_state IS NULL
THEN
/* Display the header */
DBMS_OUTPUT.put_line (
'Priority State Retries Correlation ID'
);
DBMS_OUTPUT.put_line (
'--------------- ---------- ------- ------------------------------'
);
END IF;
DBMS_SQL.column_value (
g_dyncur,
1,
v_msg_priority
);
DBMS_SQL.column_value (
g_dyncur,
2,
v_msg_state
);
DBMS_SQL.column_value (
g_dyncur,
3,
v_retry_count
);
DBMS_SQL.column_value (
g_dyncur,
4,
v_correlation
);
DBMS_OUTPUT.put_line (
RPAD (v_msg_priority, 16)
|| RPAD (v_msg_state, 11)
|| RPAD (v_retry_count, 8)
|| RPAD (v_correlation, 30)
);
END LOOP;
END;
END;
/
/*======================================================================
| Supplement to the third edition of Oracle PL/SQL Programming by Steven
| Feuerstein with Bill Pribyl, Copyright (c) 1997-2002 O'Reilly &
| Associates, Inc. To submit corrections or find more code samples visit
| http://www.oreilly.com/catalog/oraclep3/
*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -