⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messageconsumerlistener.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies * Copyright (C) 1996 - 2000 Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): ScalAgent Distributed Technologies */package org.objectweb.joram.client.jms;import org.objectweb.joram.shared.client.AbstractJmsReply;import org.objectweb.joram.shared.client.ConsumerCloseSubRequest;import org.objectweb.joram.shared.client.ConsumerMessages;import org.objectweb.joram.shared.client.ConsumerSetListRequest;import org.objectweb.joram.shared.client.ConsumerUnsetListRequest;import org.objectweb.joram.shared.client.ConsumerAckRequest;import org.objectweb.joram.shared.client.ActivateConsumerRequest;import org.objectweb.joram.shared.client.ConsumerUnsubRequest;import org.objectweb.joram.client.jms.connection.ReplyListener;import org.objectweb.joram.client.jms.connection.AbortedRequestException;import org.objectweb.joram.client.jms.connection.RequestMultiplexer;import fr.dyade.aaa.util.Debug;import fr.dyade.aaa.util.StoppedQueueException;import javax.jms.MessageListener;import javax.jms.JMSException;import java.util.Vector;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;/** * This class listens to replies  * asynchronously returned by the user proxy for * a message consumer. */abstract class MessageConsumerListener implements ReplyListener {    public static Logger logger =     Debug.getLogger(MessageConsumerListener.class.getName());  /**   * Status of the message consumer listener.   */  protected static class Status {    public static final int INIT = 0;    public static final int RUN = 1;    public static final int ON_MSG = 2;    public static final int CLOSE = 3;    private static final String[] names = {      "INIT", "RUN", "ON_MSG", "CLOSE"};    public static String toString(int status) {      return names[status];    }  }    private static class ReceiveStatus {    public static final int INIT = 0;        public static final int WAIT_FOR_REPLY = 1;    public static final int CONSUMING_REPLY = 2;    private static final String[] names = {         "INIT", "WAIT_FOR_REPLY", "CONSUMING_REPLY" };    public static String toString(int status) {      return names[status];    }  }  private boolean queueMode;    private boolean durable;    private String selector;    private String targetName;  /**   * The identifier of the subscription request.   */   private volatile int requestId;  private int status;  private Vector messagesToAck;    /**   * The number of messages which   * are in queue (Session.qin)   * waiting for being consumed.   */  private volatile int messageCount;    /**   * The receve status of this message   * listener:   * WAIT_FOR_REPLY if a reply is expected   * from the destination   * CONSUMING_REPLY if a reply is being   * consumed and no new request has been   * sent   */  private volatile int receiveStatus;    /**   * Indicates whether the topic message   * input has been passivated or not.   */  private boolean topicMsgInputPassivated;    private int queueMessageReadMax;    private RequestMultiplexer rm;    private int topicActivationThreshold;    private int topicPassivationThreshold;    private int topicAckBufferMax;    private MessageListener listener;    MessageConsumerListener(boolean queueMode,      					  boolean durable,                          String selector,                          String targetName,      					  MessageListener listener,                          int queueMessageReadMax,                          int topicActivationThreshold,                          int topicPassivationThreshold,                          int topicAckBufferMax,                          RequestMultiplexer reqMultiplexer) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG,           "MessageConsumerListener(" + queueMode +          ',' + durable + ',' + selector + ',' + targetName + ',' +          listener + ',' + queueMessageReadMax + ',' +          topicActivationThreshold + ',' + topicPassivationThreshold + ',' +          topicAckBufferMax + ',' + reqMultiplexer + ')');    this.queueMode = queueMode;    this.durable = durable;    this.selector = selector;    this.targetName = targetName;    this.listener = listener;    this.queueMessageReadMax = queueMessageReadMax;    this.topicActivationThreshold = topicActivationThreshold;    this.topicPassivationThreshold = topicPassivationThreshold;    rm = reqMultiplexer;    messagesToAck = new Vector(0);    requestId = -1;    messageCount = 0;    topicMsgInputPassivated = false;    setStatus(Status.INIT);    setReceiveStatus(ReceiveStatus.INIT);  }    protected final int getStatus() {    return status;  }  protected void setStatus(int status) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG, "MessageConsumerListener.setStatus(" +        Status.toString(status) + ')');    this.status = status;  }    private void setReceiveStatus(int s) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG, "MessageConsumerListener.setReceiveStatus(" +        ReceiveStatus.toString(s) + ')');    receiveStatus = s;  }    /**   * Decrease the message count.   * Synchronized with the method replyReceived()   * that increments the    * messageCount += cm.getMessageCount();   *    * @return the decreased value   */  private int decreaseMessageCount(int ackMode) throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG, "MessageConsumerListener.decreaseMessageCount()");        synchronized (this) {      messageCount--;    }        if (queueMode) {      boolean subscribe = false;      String[] toAck = null;      synchronized (this) {        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(BasicLevel.DEBUG, " -> messageCount = " + messageCount);        // Consume in advance (default is one message in advance)        if (messageCount < queueMessageReadMax            && receiveStatus == ReceiveStatus.CONSUMING_REPLY) {          subscribe = true;          if (ackMode == javax.jms.Session.DUPS_OK_ACKNOWLEDGE) {            synchronized (messagesToAck) {              if (messagesToAck.size() > 0) {                toAck = new String[messagesToAck.size()];                messagesToAck.copyInto(toAck);                messagesToAck.clear();              }            }          }        }      }      if (subscribe) {        // out of the synchronized block        subscribe(toAck);      }    } else {      synchronized (this) {        if (topicMsgInputPassivated) {          if (messageCount < topicActivationThreshold) {            activateMessageInput();            topicMsgInputPassivated = false;          }        } else {          if (messageCount > topicPassivationThreshold) {            passivateMessageInput();            topicMsgInputPassivated = true;          }        }      }    }        if (ackMode == javax.jms.Session.DUPS_OK_ACKNOWLEDGE        && messageCount == 0) {      // Need to acknowledge the received messages      // if we are in lazy mode (DUPS_OK)      acknowledge(0);    }        return messageCount;  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -