asynctopicsubscriber.java

来自「100多M的J2EE培训内容」· Java 代码 · 共 176 行

JAVA
176
字号
package bible.jms;



import javax.jms.*;

import javax.naming.*;

import java.util.*;


/**
 * Class AsyncTopicSubscriber
 *
 *
 * @author
 * @version %I%, %G%
 */
public class AsyncTopicSubscriber implements MessageListener {

  // Instance variables.
  private boolean                finished           = false;
  private Context                ctx                = null;
  private Hashtable              ht                 = null;
  private TopicConnectionFactory tConnectionFactory = null;
  private TopicConnection        tConnection        = null;
  private TopicSession           tSession           = null;
  private TopicSubscriber        tSubscriber        = null;
  private Topic                  t                  = null;

  /**
   * Method init
   *
   *
   */
  public void init() {

    try {

      // Obtain references to JMS queue components.
      ht = new Hashtable();

      ht.put(Context.INITIAL_CONTEXT_FACTORY,
             "weblogic.jndi.WLInitialContextFactory");
      ht.put(Context.PROVIDER_URL, "t3://localhost:7001");

      ctx                = new InitialContext(ht);
      tConnectionFactory =
        (TopicConnectionFactory) ctx.lookup("BibleJMSFactory");
      tConnection        = tConnectionFactory.createTopicConnection();
      tSession           = tConnection.createTopicSession(false,
        javax.jms.QueueSession.AUTO_ACKNOWLEDGE);
      t                  = (Topic) ctx.lookup("BibleJMSTopic");
      tSubscriber        = tSession.createSubscriber(t);

      tSubscriber.setMessageListener(this);

      // Must start the connection in order to listen.
      tConnection.start();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  /**
   * Method close
   *
   *
   * @throws JMSException
   *
   */
  public void close() throws JMSException {

    try {

      // Release JMS resources.
      tSubscriber.close();
      tSession.close();
      tConnection.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  /**
   * Method onMessage
   *
   *
   * @param message
   *
   */
  public void onMessage(Message message) {

    if (message instanceof BytesMessage) {
      BytesMessage bytesMessage = (BytesMessage) message;

      // Process bytesMessage here
    } else {
      if (message instanceof MapMessage) {
        MapMessage mapMessage = (MapMessage) message;

        // Process mapMessage here
      } else {
        if (message instanceof ObjectMessage) {
          ObjectMessage objectMessage = (ObjectMessage) message;

          // Process objectMessage here
        } else {
          if (message instanceof StreamMessage) {
            StreamMessage streamMessage = (StreamMessage) message;

            // Process streamMessage here
          } else {
            if (message instanceof TextMessage) {
              try {
                TextMessage textMessage = (TextMessage) message;

                // Process textMessage here
                String msg = textMessage.getText();

                System.out.println("Received message: " + msg);

                if (msg.equals("Stop")) {
                  synchronized (this) {
                    finished = true;

                    this.notifyAll();  // Notify main thread to quit
                  }
                }
              } catch (Exception e) {
                e.printStackTrace();
              }
            }
          }
        }
      }
    }
  }

  /**
   * Method main
   *
   *
   * @param args
   *
   */
  public static void main(String[] args) {

    try {
      AsyncTopicSubscriber ts = new AsyncTopicSubscriber();

      ts.init();
      System.out.println("Ready to receive messages....");

      // Receive text messages until one is equal to "Stop"
      synchronized (ts) {
        while (!ts.finished) {
          try {
            ts.wait();
          } catch (InterruptedException ie) {}
        }
      }

      ts.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}


/*--- Formatted in Bible Style on Thu, Sep 6, '01 ---*/


/*------ Formatted by Jindent 3.24 Gold 1.02 --- http://www.jindent.de ------*/

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?