📄 messageconsumerlistener.java
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies * Copyright (C) 1996 - 2000 Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): ScalAgent Distributed Technologies */package org.objectweb.joram.client.jms;import org.objectweb.joram.shared.client.AbstractJmsReply;import org.objectweb.joram.shared.client.ConsumerCloseSubRequest;import org.objectweb.joram.shared.client.ConsumerMessages;import org.objectweb.joram.shared.client.ConsumerSetListRequest;import org.objectweb.joram.shared.client.ConsumerUnsetListRequest;import org.objectweb.joram.shared.client.ConsumerAckRequest;import org.objectweb.joram.shared.client.ActivateConsumerRequest;import org.objectweb.joram.shared.client.ConsumerUnsubRequest;import org.objectweb.joram.client.jms.connection.ReplyListener;import org.objectweb.joram.client.jms.connection.AbortedRequestException;import org.objectweb.joram.client.jms.connection.RequestMultiplexer;import fr.dyade.aaa.util.Debug;import fr.dyade.aaa.util.StoppedQueueException;import javax.jms.MessageListener;import javax.jms.JMSException;import java.util.Vector;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;/** * This class listens to replies * asynchronously returned by the user proxy for * a message consumer. */abstract class MessageConsumerListener implements ReplyListener { public static Logger logger = Debug.getLogger(MessageConsumerListener.class.getName()); /** * Status of the message consumer listener. */ protected static class Status { public static final int INIT = 0; public static final int RUN = 1; public static final int ON_MSG = 2; public static final int CLOSE = 3; private static final String[] names = { "INIT", "RUN", "ON_MSG", "CLOSE"}; public static String toString(int status) { return names[status]; } } private static class ReceiveStatus { public static final int INIT = 0; public static final int WAIT_FOR_REPLY = 1; public static final int CONSUMING_REPLY = 2; private static final String[] names = { "INIT", "WAIT_FOR_REPLY", "CONSUMING_REPLY" }; public static String toString(int status) { return names[status]; } } private boolean queueMode; private boolean durable; private String selector; private String targetName; /** * The identifier of the subscription request. */ private volatile int requestId; private int status; private Vector messagesToAck; /** * The number of messages which * are in queue (Session.qin) * waiting for being consumed. */ private volatile int messageCount; /** * The receve status of this message * listener: * WAIT_FOR_REPLY if a reply is expected * from the destination * CONSUMING_REPLY if a reply is being * consumed and no new request has been * sent */ private volatile int receiveStatus; /** * Indicates whether the topic message * input has been passivated or not. */ private boolean topicMsgInputPassivated; private int queueMessageReadMax; private RequestMultiplexer rm; private int topicActivationThreshold; private int topicPassivationThreshold; private int topicAckBufferMax; private MessageListener listener; MessageConsumerListener(boolean queueMode, boolean durable, String selector, String targetName, MessageListener listener, int queueMessageReadMax, int topicActivationThreshold, int topicPassivationThreshold, int topicAckBufferMax, RequestMultiplexer reqMultiplexer) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "MessageConsumerListener(" + queueMode + ',' + durable + ',' + selector + ',' + targetName + ',' + listener + ',' + queueMessageReadMax + ',' + topicActivationThreshold + ',' + topicPassivationThreshold + ',' + topicAckBufferMax + ',' + reqMultiplexer + ')'); this.queueMode = queueMode; this.durable = durable; this.selector = selector; this.targetName = targetName; this.listener = listener; this.queueMessageReadMax = queueMessageReadMax; this.topicActivationThreshold = topicActivationThreshold; this.topicPassivationThreshold = topicPassivationThreshold; rm = reqMultiplexer; messagesToAck = new Vector(0); requestId = -1; messageCount = 0; topicMsgInputPassivated = false; setStatus(Status.INIT); setReceiveStatus(ReceiveStatus.INIT); } protected final int getStatus() { return status; } protected void setStatus(int status) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "MessageConsumerListener.setStatus(" + Status.toString(status) + ')'); this.status = status; } private void setReceiveStatus(int s) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "MessageConsumerListener.setReceiveStatus(" + ReceiveStatus.toString(s) + ')'); receiveStatus = s; } /** * Decrease the message count. * Synchronized with the method replyReceived() * that increments the * messageCount += cm.getMessageCount(); * * @return the decreased value */ private int decreaseMessageCount(int ackMode) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "MessageConsumerListener.decreaseMessageCount()"); synchronized (this) { messageCount--; } if (queueMode) { boolean subscribe = false; String[] toAck = null; synchronized (this) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> messageCount = " + messageCount); // Consume in advance (default is one message in advance) if (messageCount < queueMessageReadMax && receiveStatus == ReceiveStatus.CONSUMING_REPLY) { subscribe = true; if (ackMode == javax.jms.Session.DUPS_OK_ACKNOWLEDGE) { synchronized (messagesToAck) { if (messagesToAck.size() > 0) { toAck = new String[messagesToAck.size()]; messagesToAck.copyInto(toAck); messagesToAck.clear(); } } } } } if (subscribe) { // out of the synchronized block subscribe(toAck); } } else { synchronized (this) { if (topicMsgInputPassivated) { if (messageCount < topicActivationThreshold) { activateMessageInput(); topicMsgInputPassivated = false; } } else { if (messageCount > topicPassivationThreshold) { passivateMessageInput(); topicMsgInputPassivated = true; } } } } if (ackMode == javax.jms.Session.DUPS_OK_ACKNOWLEDGE && messageCount == 0) { // Need to acknowledge the received messages // if we are in lazy mode (DUPS_OK) acknowledge(0); } return messageCount; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -