⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 aq.pkg

📁 OReilly Oracle PL SQL Programming第4版源代码
💻 PKG
字号:
/* Formatted on 2001/09/10 16:08 (RevealNet Formatter v4.4.1) */
CREATE OR REPLACE PACKAGE aq
IS
   
/* Standard datatypes for use with Oracle AQ. */

   v_msgid                RAW (16);

   SUBTYPE msgid_type IS v_msgid%TYPE;

   v_name                 VARCHAR2 (49);

   SUBTYPE name_type IS v_name%TYPE;

   
/* Some common exceptions encountered when working with Oracle AQ.
   You will probably want to add more; see chapter in book. */

   dequeue_disabled       EXCEPTION;
   PRAGMA exception_init (dequeue_disabled,  -25226);
   dequeue_timeout        EXCEPTION;
   PRAGMA exception_init (dequeue_timeout,  -25228);
   end_of_message_group   EXCEPTION;
   PRAGMA exception_init (end_of_message_group,  -25235);

   
/* Create and Remove queue objects. */

   PROCEDURE create_queue (
      qtable         IN   VARCHAR2,
      payload_type   IN   VARCHAR2,
      qname          IN   VARCHAR2,
      prioritize     IN   VARCHAR2
            := NULL,
      qtype          IN   INTEGER
            := DBMS_AQADM.normal_queue
   );

   PROCEDURE create_priority_queue (
      qtable         IN   VARCHAR2,
      payload_type   IN   VARCHAR2,
      qname          IN   VARCHAR2,
      prioritize     IN   VARCHAR2
            := 'PRIORITY,ENQ_TIME'
   );

   /* Hides the need for an agent object. */
   PROCEDURE add_subscriber (
      qname        IN   VARCHAR2,
      subscriber   IN   VARCHAR2,
      address      IN   VARCHAR2 := NULL
   );

   PROCEDURE stop_and_drop (
      qtable    IN   VARCHAR2,
      qname     IN   VARCHAR2 := '%',
      enqueue   IN   BOOLEAN := TRUE,
      dequeue   IN   BOOLEAN := TRUE,
      wait      IN   BOOLEAN := TRUE
   );

   PROCEDURE killqt (qtable IN VARCHAR2);

   
/* Retrieve queue information */

   FUNCTION queue_exists (
      qname   IN   VARCHAR2
   )
      RETURN BOOLEAN;

   FUNCTION qtable_exists (
      qtable   IN   VARCHAR2
   )
      RETURN BOOLEAN;

   FUNCTION msgcount (
      qtable   IN   VARCHAR2,
      qname    IN   VARCHAR2
   )
      RETURN INTEGER;

   FUNCTION msgdata (
      qtable_in   IN   VARCHAR2,
      msgid_in    IN   RAW,
      data_in     IN   VARCHAR2
   )
      RETURN VARCHAR2;

   PROCEDURE showmsgs (
      qtable   IN   VARCHAR2,
      qname    IN   VARCHAR2
   );
END;
/
CREATE OR REPLACE PACKAGE BODY aq
IS
   g_dyncur   PLS_INTEGER;

   
/* Private program */

   PROCEDURE initcur
   IS
   BEGIN
      IF NOT DBMS_SQL.is_open (
                g_dyncur
             )
      THEN
         g_dyncur :=
                   DBMS_SQL.open_cursor;
      END IF;
   EXCEPTION
      WHEN INVALID_CURSOR
      THEN
         g_dyncur :=
                   DBMS_SQL.open_cursor;
   END;

   
/* Check status of AQ objects */

   FUNCTION queue_exists (
      qname   IN   VARCHAR2
   )
      RETURN BOOLEAN
   IS
      CURSOR q_cur
      IS
         SELECT 'x'
           FROM user_queues
          WHERE name = UPPER (qname);

      q_rec   q_cur%ROWTYPE;
      retval  BOOLEAN;
   BEGIN
      OPEN q_cur;
      FETCH q_cur INTO q_rec;
      retval := q_cur%FOUND;
      CLOSE q_cur;
      RETURN retval;
   END;

   FUNCTION qtable_exists (
      qtable   IN   VARCHAR2
   )
      RETURN BOOLEAN
   IS
      CURSOR q_cur
      IS
         SELECT 'x'
           FROM user_queue_tables
          WHERE queue_table =
                         UPPER (qtable);

      q_rec   q_cur%ROWTYPE;
   BEGIN
      OPEN q_cur;
      FETCH q_cur INTO q_rec;
      RETURN q_cur%FOUND;
   END;

   PROCEDURE create_queue (
      qtable         IN   VARCHAR2,
      payload_type   IN   VARCHAR2,
      qname          IN   VARCHAR2,
      prioritize     IN   VARCHAR2
            := NULL,
      qtype          IN   INTEGER
            := DBMS_AQADM.normal_queue
   )
   IS
   BEGIN
      IF NOT qtable_exists (qtable)
      THEN
         DBMS_AQADM.create_queue_table (
            queue_table=> qtable,
            queue_payload_type=> payload_type,
            sort_list=> NVL (
                        prioritize,
                        'ENQ_TIME'
                     )
         );
      END IF;

      IF NOT queue_exists (qname)
      THEN
         DBMS_AQADM.create_queue (
            queue_name=> qname,
            queue_table=> qtable,
            queue_type=> qtype
         );
      END IF;

      DBMS_AQADM.start_queue (
         queue_name=> qname,
         enqueue=> qtype !=
                     DBMS_AQADM.exception_queue
      );
   END;

   PROCEDURE create_priority_queue (
      qtable         IN   VARCHAR2,
      payload_type   IN   VARCHAR2,
      qname          IN   VARCHAR2,
      prioritize     IN   VARCHAR2
            := 'PRIORITY,ENQ_TIME'
   )
   IS
   BEGIN
      create_queue (
         qtable,
         payload_type,
         qname,
         prioritize
      );
   END;

   PROCEDURE add_subscriber (
      qname        IN   VARCHAR2,
      subscriber   IN   VARCHAR2,
      address      IN   VARCHAR2 := NULL
   )
   IS
      v_agent   sys.aq$_agent;
   BEGIN
      v_agent :=
         sys.aq$_agent (
            subscriber,
            address,
            NULL
         );
      DBMS_AQADM.add_subscriber (
         qname,
         v_agent
      );
   END;

   
/* Stop and Drop Utilities */

   PROCEDURE stop_and_drop (
      qtable    IN   VARCHAR2,
      qname     IN   VARCHAR2 := '%',
      enqueue   IN   BOOLEAN := TRUE,
      dequeue   IN   BOOLEAN := TRUE,
      wait      IN   BOOLEAN := TRUE
   )
   IS
      CURSOR q_cur
      IS
         SELECT name
           FROM user_queues
          WHERE queue_table =
                         UPPER (qtable);

      all_dropped   BOOLEAN
             :=      enqueue
                 AND dequeue;
   BEGIN
      FOR q_rec IN q_cur
      LOOP
         IF q_rec.name LIKE
                          UPPER (qname)
         THEN
            p.l (q_rec.name);
            DBMS_AQADM.stop_queue (
               q_rec.name,
               enqueue,
               dequeue,
               wait
            );
            DBMS_OUTPUT.put_line (
                  'stopping '
               || q_rec.name
            );

            IF      enqueue
                AND dequeue
            THEN
               DBMS_AQADM.drop_queue (
                  q_rec.name
               );
               DBMS_OUTPUT.put_line (
                     'dropping '
                  || q_rec.name
               );
            END IF;
         ELSE
            all_dropped := FALSE;
         END IF;
      END LOOP;

      IF      all_dropped
          AND qtable_exists (qtable)
      THEN
         DBMS_AQADM.drop_queue_table (
            qtable,
            force => TRUE
         );
         DBMS_OUTPUT.put_line (
               'dropping '
            || qtable
         );
      END IF;
   END;

   PROCEDURE killqt (qtable IN VARCHAR2)
   IS
   BEGIN
      DBMS_AQADM.drop_queue_table (
         qtable,
         force => TRUE
      );
   END;

   
/* Retrieve information about queues */

   /* Enhance this so that you provide just the queue name and look
      up the queue table name from that. */

   FUNCTION msgcount (
      qtable   IN   VARCHAR2,
      qname    IN   VARCHAR2
   )
      RETURN INTEGER
   IS
      fdbk     PLS_INTEGER;
      retval   PLS_INTEGER;
   BEGIN
      initcur;
      DBMS_SQL.parse (
         g_dyncur,
            'SELECT COUNT(*) FROM AQ$'
         || qtable
         || ' WHERE queue = :qname',
         DBMS_SQL.native
      );
      DBMS_SQL.define_column (
         g_dyncur,
         1,
         1
      );
      DBMS_SQL.bind_variable (
         g_dyncur,
         'qname',
         UPPER (qname)
      );
      fdbk :=
         DBMS_SQL.execute_and_fetch (
            g_dyncur
         );
      DBMS_SQL.column_value (
         g_dyncur,
         1,
         retval
      );
      RETURN retval;
   END;

   FUNCTION msgdata (
      qtable_in   IN   VARCHAR2,
      msgid_in    IN   RAW,
      data_in     IN   VARCHAR2
   )
      RETURN VARCHAR2
   IS
      fdbk     PLS_INTEGER;
      retval   VARCHAR2 (2000);
   BEGIN
      initcur;
      DBMS_SQL.parse (
         g_dyncur,
            'SELECT '
         || data_in
         || ' FROM AQ$'
         || qtable_in
         || ' WHERE msg_id = :msgid',
         DBMS_SQL.native
      );
      DBMS_SQL.define_column (
         g_dyncur,
         1,
         'a',
         2000
      );
      DBMS_SQL.bind_variable_raw (
         g_dyncur,
         'msgid',
         msgid_in
      );
      fdbk :=
         DBMS_SQL.execute_and_fetch (
            g_dyncur
         );

      IF fdbk = 1
      THEN
         DBMS_SQL.column_value (
            g_dyncur,
            1,
            retval
         );
      ELSE
         NULL;
      END IF;

      RETURN retval;
   END;

   PROCEDURE showmsgs (
      qtable   IN   VARCHAR2,
      qname    IN   VARCHAR2
   )
   IS
      fdbk             PLS_INTEGER;
      v_msg_priority   PLS_INTEGER;
      v_msg_state      VARCHAR2 (30);
      v_retry_count    PLS_INTEGER;
      v_correlation    VARCHAR2 (30);
   BEGIN
      initcur;
      DBMS_SQL.parse (
         g_dyncur,
            'SELECT msg_priority, msg_state, retry_count, corr_id FROM AQ$'
         || qtable
         || ' WHERE queue = :qname',
         DBMS_SQL.native
      );
      DBMS_SQL.define_column (
         g_dyncur,
         1,
         1
      );
      DBMS_SQL.define_column (
         g_dyncur,
         2,
         'a',
         30
      );
      DBMS_SQL.define_column (
         g_dyncur,
         3,
         1
      );
      DBMS_SQL.define_column (
         g_dyncur,
         4,
         'a',
         30
      );
      DBMS_SQL.bind_variable (
         g_dyncur,
         'qname',
         UPPER (qname)
      );
      fdbk :=
            DBMS_SQL.EXECUTE (g_dyncur);

      WHILE DBMS_SQL.fetch_rows (
               g_dyncur
            ) > 0
      LOOP
         IF v_msg_state IS NULL
         THEN
            /* Display the header */
            DBMS_OUTPUT.put_line (
               'Priority        State      Retries Correlation ID'
            );
            DBMS_OUTPUT.put_line (
               '--------------- ---------- ------- ------------------------------'
            );
         END IF;

         DBMS_SQL.column_value (
            g_dyncur,
            1,
            v_msg_priority
         );
         DBMS_SQL.column_value (
            g_dyncur,
            2,
            v_msg_state
         );
         DBMS_SQL.column_value (
            g_dyncur,
            3,
            v_retry_count
         );
         DBMS_SQL.column_value (
            g_dyncur,
            4,
            v_correlation
         );
         DBMS_OUTPUT.put_line (
               RPAD (v_msg_priority, 16)
            || RPAD (v_msg_state, 11)
            || RPAD (v_retry_count, 8)
            || RPAD (v_correlation, 30)
         );
      END LOOP;
   END;
END;
/



/*======================================================================
| Supplement to the third edition of Oracle PL/SQL Programming by Steven
| Feuerstein with Bill Pribyl, Copyright (c) 1997-2002 O'Reilly &
| Associates, Inc. To submit corrections or find more code samples visit
| http://www.oreilly.com/catalog/oraclep3/
*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -