📄 messageconsumer.java
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - ScalAgent Distributed Technologies * Copyright (C) 1996 - Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): Nicolas Tachker (ScalAgent) */package com.scalagent.kjoram;import com.scalagent.kjoram.jms.*;import com.scalagent.kjoram.util.TimerTask;import java.util.Vector;import com.scalagent.kjoram.excepts.IllegalStateException;import com.scalagent.kjoram.excepts.*;public class MessageConsumer{ /** The selector for filtering messages. */ private String selector; /** The message listener, if any. */ private MessageListener messageListener = null; /** <code>true</code> for a durable subscriber. */ private boolean durableSubscriber; /** Pending "receive" or listener request. */ private AbstractJmsRequest pendingReq = null; /** * <code>true</code> if the consumer has a pending synchronous "receive" * request. */ private boolean receiving = false; /** Task for replying to a pending synchronous "receive" with timer. */ private TimerTask replyingTask = null; /** The destination the consumer gets its messages from. */ protected Destination dest; /** * <code>true</code> if the subscriber does not wish to consume messages * produced by its connection. */ protected boolean noLocal; /** <code>true</code> if the consumer is closed. */ protected boolean closed = false; /** The session the consumer belongs to. */ Session sess; /** * The consumer server side target is either a queue or a subscription on * its proxy. */ String targetName; /** <code>true</code> if the consumer is a queue consumer. */ boolean queueMode; /** * Constructs a consumer. * * @param sess The session the consumer belongs to. * @param dest The destination the consumer gets messages from. * @param selector Selector for filtering messages. * @param subName The durableSubscriber subscription's name, if any. * @param noLocal <code>true</code> for a subscriber not wishing to consume * messages produced by its connection. * * @exception InvalidSelectorException If the selector syntax is invalid. * @exception IllegalStateException If the connection is broken. * @exception JMSException If the creation fails for any other reason. */ MessageConsumer(Session sess, Destination dest, String selector, String subName, boolean noLocal) throws JMSException { if (dest == null) throw new InvalidDestinationException("Invalid null destination."); if (dest instanceof TemporaryQueue) { Connection tempQCnx = ((TemporaryQueue) dest).getCnx(); if (tempQCnx == null || ! tempQCnx.equals(sess.cnx)) throw new JMSSecurityException("Forbidden consumer on this " + "temporary destination."); } else if (dest instanceof TemporaryTopic) { Connection tempTCnx = ((TemporaryTopic) dest).getCnx(); if (tempTCnx == null || ! tempTCnx.equals(sess.cnx)) throw new JMSSecurityException("Forbidden consumer on this " + "temporary destination."); } // If the destination is a topic, the consumer is a subscriber: if (dest instanceof Topic) { if (subName == null) { subName = sess.cnx.nextSubName(); durableSubscriber = false; } else durableSubscriber = true; sess.cnx.syncRequest(new ConsumerSubRequest(dest.getName(), subName, selector, noLocal, durableSubscriber)); targetName = subName; this.noLocal = noLocal; queueMode = false; } else { targetName = dest.getName(); queueMode = true; } this.sess = sess; this.dest = dest; this.selector = selector; sess.consumers.addElement(this); if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": created."); } /** * Constructs a consumer. * * @param sess The session the consumer belongs to. * @param dest The destination the consumer gets messages from. * @param selector Selector for filtering messages. * * @exception InvalidSelectorException If the selector syntax is invalid. * @exception IllegalStateException If the connection is broken. * @exception JMSException If the creation fails for any other reason. */ MessageConsumer(Session sess, Destination dest, String selector) throws JMSException { this(sess, dest, selector, null, false); } /** Returns a string view of this consumer. */ public String toString() { return "Consumer:" + sess.ident; } /** * API method. * <p> * This method must not be called if the connection the consumer belongs to * is started, because the session would then be accessed by the thread * calling this method and by the thread controlling asynchronous deliveries. * This situation is clearly forbidden by the single threaded nature of * sessions. Moreover, unsetting a message listener without stopping the * connection may lead to the situation where asynchronous deliveries would * arrive on the connection, the session or the consumer without being * able to reach their target listener! * * @exception IllegalStateException If the consumer is closed, or if the * connection is broken. * @exception JMSException If the request fails for any other reason. * @exception IllegalStateException If the consumer is closed. */ public void setMessageListener(MessageListener messageListener) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed consumer."); if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, "--- " + this + ": setting MessageListener to " + messageListener); if (sess.cnx.started && JoramTracing.dbgClient) JoramTracing.log(JoramTracing.WARN, this + ": improper call" + " on a started connection."); // If unsetting the listener: if (this.messageListener != null && messageListener == null) { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": unsets" + " listener request."); sess.cnx.requestsTable.remove(pendingReq.getKey()); this.messageListener = messageListener; sess.msgListeners--; ConsumerUnsetListRequest unsetLR = null; if (queueMode) { unsetLR = new ConsumerUnsetListRequest(true); unsetLR.setCancelledRequestId(pendingReq.getRequestId()); } else { unsetLR = new ConsumerUnsetListRequest(false); unsetLR.setTarget(targetName); } try { sess.cnx.syncRequest(unsetLR); } // A JMSException might be caught if the connection is broken. catch (JMSException jE) {} pendingReq = null; // Stopping the daemon if not needed anymore: if (sess.msgListeners == 0 && sess.started) { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": stops the" + " session daemon."); sess.daemon.stop(); sess.daemon = null; sess.started = false; } } // Else, if setting a new listener: else if (this.messageListener == null && messageListener != null) { sess.msgListeners++; if (sess.msgListeners == 1 && (sess.started || sess.cnx.started)) { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": starts the" + " session daemon."); sess.daemon = new SessionDaemon(sess); sess.daemon.setDaemon(false); sess.daemon.start(); sess.started = true; } this.messageListener = messageListener; pendingReq = new ConsumerSetListRequest(targetName, selector, queueMode); pendingReq.setRequestId(sess.cnx.nextRequestId()); sess.cnx.requestsTable.put(pendingReq.getKey(), this); sess.cnx.asyncRequest(pendingReq); } if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": MessageListener" + " set."); } /** * API method. * * @exception IllegalStateException If the consumer is closed. */ public MessageListener getMessageListener() throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed consumer."); return messageListener; } /** * API method. * * @exception IllegalStateException If the consumer is closed. */ public String getMessageSelector() throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed consumer."); return selector; } /** * API method implemented in subclasses. * * @exception IllegalStateException If the consumer is closed, or if the * connection is broken. * @exception JMSSecurityException If the requester is not a READER on the * destination. * @exception JMSException If the request fails for any other reason. */ public Message receive(long timeOut) throws JMSException {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -