⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messageconsumerlistener.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
   /**   * Called by Session.   */  synchronized void start() throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG, "MessageConsumerListener.start()");    if (status == Status.INIT) {      subscribe(null);      setStatus(Status.RUN);    } else {      // Should not happen      throw new IllegalStateException("Status error");    }  }  private void subscribe(String[] toAck) throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG, "MessageConsumerListener.subscribe()");        ConsumerSetListRequest req =       new ConsumerSetListRequest(        targetName,        selector,         queueMode,        toAck,        queueMessageReadMax);        // Change the receive status before sending    // the request. subscribe() is not synchronized    // so the reply can be received before the end    // of this method.    setReceiveStatus(ReceiveStatus.WAIT_FOR_REPLY);    rm.sendRequest(req, this);    requestId = req.getRequestId();  }  /**   * Called by Session.   */  public void close() throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG, "MessageConsumerListener.close()");    synchronized (this) {      while (status == Status.ON_MSG) {        try {          // Wait for the message listener to return from           // onMessage()          wait();        } catch (InterruptedException exc) {}      }            if (status == Status.INIT ||          status == Status.CLOSE) return;            rm.abortRequest(requestId);      // If session ack mode is DUPS_OK      acknowledge(0);      setStatus(Status.CLOSE);    }        if (queueMode) {      // Out of the synchronized block because it could      // lead to a dead lock with      // the connection driver thread calling replyReceived.      ConsumerUnsetListRequest unsetLR = new ConsumerUnsetListRequest(          queueMode);      unsetLR.setTarget(targetName);      unsetLR.setCancelledRequestId(requestId);      rm.sendRequest(unsetLR);    }    // else useless for a topic     // because the subscription    // is deleted (see MessageConsumer.close())  }  private void acknowledge(int threshold) {    try {      synchronized (messagesToAck) {        if (messagesToAck.size() > threshold) {          ConsumerAckRequest ack = new ConsumerAckRequest(              targetName,              queueMode);          for (int i = 0; i < messagesToAck.size(); i++) {            String msgId = (String) messagesToAck.elementAt(i);            ack.addId(msgId);          }          rm.sendRequest(ack);          messagesToAck.clear();        }      }    } catch (JMSException exc) {      if (logger.isLoggable(BasicLevel.ERROR))        logger.log(          BasicLevel.ERROR, "", exc);     }  }    /**   * Called by RequestMultiplexer.   */  public synchronized boolean replyReceived(AbstractJmsReply reply)     throws AbortedRequestException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG, "MessageConsumerListener.replyReceived(" +         reply + ')');        if (status == Status.CLOSE) {      throw new AbortedRequestException();    } else {      if (queueMode) {        // 1- Change the status before pushing the         // messages into the session queue.        setReceiveStatus(ReceiveStatus.CONSUMING_REPLY);      }      try {        ConsumerMessages cm = (ConsumerMessages)reply;        // 2- increment messageCount (synchronized)        messageCount += cm.getMessageCount();                pushMessages(cm);      } catch (StoppedQueueException exc) {        throw new AbortedRequestException();      } catch (JMSException exc) {        throw new AbortedRequestException();      }      if (queueMode) {        return true;      } else {        return false;      }    }  }    /**   * Pushes the received messages.   * Currently two behaviors:   * 1- SingleSessionConsumer pushes the message   * in a single session (standard JMS)   * 2- MultiSessionConsumer pushes the message   * in several session (from a session pool)   *    * @param cm   */  public abstract void pushMessages(ConsumerMessages cm) throws JMSException;    public void replyAborted(int requestId) {    // Nothing to do.  }  public synchronized boolean isClosed() {    return (status == Status.CLOSE);  }    public final MessageListener getMessageListener() {    return listener;  }    public final boolean getQueueMode() {    return queueMode;  }    public final String getTargetName() {    return targetName;  }    protected void activateListener(      Message msg, MessageListener listener, int ackMode)     throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG, "MessageConsumerListener.onMessage(" +         msg + ')');        // Consume one message    decreaseMessageCount(ackMode);    try {      listener.onMessage(msg);      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(          BasicLevel.DEBUG, " -> consumer.onMessage(" +           msg + ") returned");    } catch (RuntimeException re) {      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(          BasicLevel.DEBUG, "", re);      JMSException exc = new JMSException(re.toString());      exc.setLinkedException(re);      throw exc;    }   }    public abstract void onMessage(      Message msg, MessageListener listener, int ackMode)     throws JMSException;    /**   * Called by Session (standard JMS, mono-threaded   */  public void onMessage(Message msg, int ackMode) throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG, "MessageConsumerListener.onMessage(" + msg + ')');    if (listener != null) {      try {        synchronized (this) {          if (status == Status.RUN) {            setStatus(Status.ON_MSG);          } else {            throw new javax.jms.IllegalStateException("Message listener closed");          }        }        activateListener(msg, listener, ackMode);      } finally {        synchronized (this) {          setStatus(Status.RUN);          // Notify threads trying to close the listener.          notifyAll();        }      }    } else {      throw new JMSException("Null listener");    }  }  void ack(String msgId, int ackMode)      throws JMSException {    if (ackMode == javax.jms.Session.DUPS_OK_ACKNOWLEDGE) {      // All the operations on messagesToAck are synchronized      // on the vector (see subscribe() and acknowledge()).      messagesToAck.addElement(msgId);      if (! queueMode) {        acknowledge(topicAckBufferMax);      }    } else {      ConsumerAckRequest ack = new ConsumerAckRequest(targetName, queueMode);      ack.addId(msgId);      rm.sendRequest(ack);    }  }  void activateMessageInput() throws JMSException {    rm.sendRequest(      new ActivateConsumerRequest(targetName, true));  }  void passivateMessageInput() throws JMSException {    rm.sendRequest(      new ActivateConsumerRequest(targetName, false));  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -