⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messageconsumer.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - ScalAgent Distributed Technologies * Copyright (C) 1996 - Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): Nicolas Tachker (ScalAgent) */package com.scalagent.kjoram;import com.scalagent.kjoram.jms.*;import com.scalagent.kjoram.util.TimerTask;import java.util.Vector;import com.scalagent.kjoram.excepts.IllegalStateException;import com.scalagent.kjoram.excepts.*;public class MessageConsumer{  /** The selector for filtering messages. */  private String selector;  /** The message listener, if any. */  private MessageListener messageListener = null;  /** <code>true</code> for a durable subscriber. */  private boolean durableSubscriber;  /** Pending "receive" or listener request. */  private AbstractJmsRequest pendingReq = null;  /**   * <code>true</code> if the consumer has a pending synchronous "receive"   * request.   */  private boolean receiving = false;  /** Task for replying to a pending synchronous "receive" with timer. */  private TimerTask replyingTask = null;  /** The destination the consumer gets its messages from. */  protected Destination dest;  /**   * <code>true</code> if the subscriber does not wish to consume messages   * produced by its connection.   */  protected boolean noLocal;  /** <code>true</code> if the consumer is closed. */  protected boolean closed = false;  /** The session the consumer belongs to. */  Session sess;  /**    * The consumer server side target is either a queue or a subscription on   * its proxy.   */  String targetName;  /** <code>true</code> if the consumer is a queue consumer. */  boolean queueMode;  /**   * Constructs a consumer.   *   * @param sess  The session the consumer belongs to.   * @param dest  The destination the consumer gets messages from.   * @param selector  Selector for filtering messages.   * @param subName  The durableSubscriber subscription's name, if any.   * @param noLocal  <code>true</code> for a subscriber not wishing to consume   *          messages produced by its connection.   *   * @exception InvalidSelectorException  If the selector syntax is invalid.   * @exception IllegalStateException  If the connection is broken.   * @exception JMSException  If the creation fails for any other reason.   */  MessageConsumer(Session sess, Destination dest, String selector,                  String subName, boolean noLocal) throws JMSException  {    if (dest == null)      throw new InvalidDestinationException("Invalid null destination.");    if (dest instanceof TemporaryQueue) {      Connection tempQCnx = ((TemporaryQueue) dest).getCnx();      if (tempQCnx == null || ! tempQCnx.equals(sess.cnx))        throw new JMSSecurityException("Forbidden consumer on this "                                       + "temporary destination.");    }    else if (dest instanceof TemporaryTopic) {      Connection tempTCnx = ((TemporaryTopic) dest).getCnx();          if (tempTCnx == null || ! tempTCnx.equals(sess.cnx))        throw new JMSSecurityException("Forbidden consumer on this "                                       + "temporary destination.");    }    // If the destination is a topic, the consumer is a subscriber:    if (dest instanceof Topic) {      if (subName == null) {        subName = sess.cnx.nextSubName();        durableSubscriber = false;      }      else        durableSubscriber = true;      sess.cnx.syncRequest(new ConsumerSubRequest(dest.getName(),                                                  subName,                                                  selector,                                                  noLocal,                                                  durableSubscriber));      targetName = subName;      this.noLocal = noLocal;      queueMode = false;    }    else {      targetName = dest.getName();      queueMode = true;    }    this.sess = sess;    this.dest = dest;    this.selector = selector;    sess.consumers.addElement(this);    if (JoramTracing.dbgClient)      JoramTracing.log(JoramTracing.DEBUG, this + ": created.");  }  /**   * Constructs a consumer.   *   * @param sess  The session the consumer belongs to.   * @param dest  The destination the consumer gets messages from.   * @param selector  Selector for filtering messages.   *   * @exception InvalidSelectorException  If the selector syntax is invalid.   * @exception IllegalStateException  If the connection is broken.   * @exception JMSException  If the creation fails for any other reason.   */  MessageConsumer(Session sess, Destination dest,                  String selector) throws JMSException  {    this(sess, dest, selector, null, false);  }  /** Returns a string view of this consumer. */  public String toString()  {    return "Consumer:" + sess.ident;  }  /**   * API method.   * <p>   * This method must not be called if the connection the consumer belongs to   * is started, because the session would then be accessed by the thread   * calling this method and by the thread controlling asynchronous deliveries.   * This situation is clearly forbidden by the single threaded nature of   * sessions. Moreover, unsetting a message listener without stopping the    * connection may lead to the situation where asynchronous deliveries would   * arrive on the connection, the session or the consumer without being   * able to reach their target listener!   *   * @exception IllegalStateException  If the consumer is closed, or if the   *              connection is broken.   * @exception JMSException  If the request fails for any other reason.   * @exception IllegalStateException  If the consumer is closed.   */  public void setMessageListener(MessageListener messageListener)              throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed consumer.");    if (JoramTracing.dbgClient)      JoramTracing.log(JoramTracing.DEBUG, "--- " + this                       + ": setting MessageListener to "                       + messageListener);    if (sess.cnx.started && JoramTracing.dbgClient)      JoramTracing.log(JoramTracing.WARN, this + ": improper call"                       + " on a started connection.");        // If unsetting the listener:    if (this.messageListener != null && messageListener == null) {      if (JoramTracing.dbgClient)        JoramTracing.log(JoramTracing.DEBUG, this + ": unsets"                         + " listener request.");      sess.cnx.requestsTable.remove(pendingReq.getKey());      this.messageListener = messageListener;      sess.msgListeners--;      ConsumerUnsetListRequest unsetLR = null;      if (queueMode) {        unsetLR = new ConsumerUnsetListRequest(true);        unsetLR.setCancelledRequestId(pendingReq.getRequestId());      }      else {        unsetLR = new ConsumerUnsetListRequest(false);        unsetLR.setTarget(targetName);      }      try {        sess.cnx.syncRequest(unsetLR);      }       // A JMSException might be caught if the connection is broken.      catch (JMSException jE) {}      pendingReq = null;      // Stopping the daemon if not needed anymore:      if (sess.msgListeners == 0 && sess.started) {        if (JoramTracing.dbgClient)          JoramTracing.log(JoramTracing.DEBUG, this + ": stops the"                           + " session daemon.");        sess.daemon.stop();        sess.daemon = null;        sess.started = false;      }    }    // Else, if setting a new listener:    else if (this.messageListener == null && messageListener != null) {      sess.msgListeners++;      if (sess.msgListeners == 1          && (sess.started || sess.cnx.started)) {        if (JoramTracing.dbgClient)          JoramTracing.log(JoramTracing.DEBUG, this + ": starts the"                           + " session daemon.");        sess.daemon = new SessionDaemon(sess);        sess.daemon.setDaemon(false);        sess.daemon.start();        sess.started = true;      }      this.messageListener = messageListener;      pendingReq = new ConsumerSetListRequest(targetName, selector, queueMode);      pendingReq.setRequestId(sess.cnx.nextRequestId());      sess.cnx.requestsTable.put(pendingReq.getKey(), this);      sess.cnx.asyncRequest(pendingReq);    }    if (JoramTracing.dbgClient)      JoramTracing.log(JoramTracing.DEBUG, this + ": MessageListener"                       + " set.");  }  /**   * API method.   *   * @exception IllegalStateException  If the consumer is closed.   */  public MessageListener getMessageListener() throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed consumer.");    return messageListener;  }  /**   * API method.   *   * @exception IllegalStateException  If the consumer is closed.   */  public String getMessageSelector() throws JMSException  {    if (closed)      throw new IllegalStateException("Forbidden call on a closed consumer.");    return selector;  }  /**    * API method implemented in subclasses.    *   * @exception IllegalStateException  If the consumer is closed, or if the   *              connection is broken.   * @exception JMSSecurityException  If the requester is not a READER on the   *              destination.   * @exception JMSException  If the request fails for any other reason.   */  public Message receive(long timeOut) throws JMSException  {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -