⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messageconsumer.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    // Synchronizing with a possible "close".    synchronized(this) {      if (JoramTracing.dbgClient)        JoramTracing.log(JoramTracing.DEBUG, "--- " + this                         + ": requests to receive a message.");      if (closed)        throw new IllegalStateException("Forbidden call on a closed consumer.");      if (messageListener != null) {        if (JoramTracing.dbgClient)          JoramTracing.log(JoramTracing.WARN, "Improper call as a"                           + " listener exists for this consumer.");      }      else if (sess.msgListeners > 0) {        if (JoramTracing.dbgClient)          JoramTracing.log(JoramTracing.WARN, "Improper call as"                           + " asynchronous consumers have already"                           + " been set on the session.");      }      pendingReq = new ConsumerReceiveRequest(targetName, selector, timeOut,                                              queueMode);      pendingReq.setRequestId(sess.cnx.nextRequestId());      receiving = true;      // In case of a timer, scheduling the receive:      if (timeOut > 0) {        replyingTask = new ConsumerReplyTask(pendingReq);        sess.schedule(replyingTask, timeOut);      }    }    // Expecting an answer:    ConsumerMessages reply =     (ConsumerMessages) sess.cnx.syncRequest(pendingReq);    // Synchronizing again with a possible "close":    synchronized(this) {      receiving = false;      pendingReq = null;      if (replyingTask != null)        replyingTask.cancel();      if (JoramTracing.dbgClient)        JoramTracing.log(JoramTracing.DEBUG, this + ": received a"                         + " reply.");      Vector msgs = reply.getMessages();      if (msgs != null && ! msgs.isEmpty()) {        com.scalagent.kjoram.messages.Message msg =          (com.scalagent.kjoram.messages.Message) msgs.elementAt(0);        String msgId = msg.getIdentifier();        // Auto ack: acknowledging the message:        if (sess.autoAck)          sess.cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId,                                                       queueMode));        // Session ack: passing the id for later ack or deny:        else          sess.prepareAck(targetName, msgId, queueMode);        return Message.wrapMomMessage(sess, msg);      }      else        return null;      }    }  /**    * API method.   *    * @exception IllegalStateException  If the consumer is closed, or if the   *              connection is broken.   * @exception JMSSecurityException  If the requester is not a READER on the   *              destination.   * @exception JMSException  If the request fails for any other reason.   */  public Message receive() throws JMSException  {    return receive(0);  }  /**    * API method.   *    * @exception IllegalStateException  If the consumer is closed, or if the   *              connection is broken.   * @exception JMSSecurityException  If the requester is not a READER on the   *              destination.   * @exception JMSException  If the request fails for any other reason.   */  public Message receiveNoWait() throws JMSException  {    return receive(-1);  }  /**   * API method.   *   * @exception JMSException  Actually never thrown.   */  public void close() throws JMSException  {    // Ignoring the call if consumer is already closed:    if (closed)      return;    if (JoramTracing.dbgClient)      JoramTracing.log(JoramTracing.DEBUG, "--- " + this                       + ": closing...");    // Synchronizig with a possible receive() or onMessage() ongoing process.    syncro();    // Removing this resource's reference from everywhere:    Object lock = null;    if (pendingReq != null)      lock = sess.cnx.requestsTable.remove(pendingReq.getKey());    sess.consumers.removeElement(this);    // Unsetting the listener, if any:    try {      if (messageListener != null) {        if (JoramTracing.dbgClient)          JoramTracing.log(JoramTracing.DEBUG, "Unsetting listener.");        if (queueMode) {          ConsumerUnsetListRequest unsetLR =            new ConsumerUnsetListRequest(true);          unsetLR.setCancelledRequestId(pendingReq.getRequestId());          sess.cnx.syncRequest(unsetLR);        }      }      if (durableSubscriber)        sess.cnx.syncRequest(new ConsumerCloseSubRequest(targetName));      else if (! queueMode)        sess.cnx.syncRequest(new ConsumerUnsubRequest(targetName));    }    // A JMSException might be caught if the connection is broken.    catch (JMSException jE) {}    // In the case of a pending "receive" request, replying by a null to it:    if (lock != null && receiving) {      if (JoramTracing.dbgClient)        JoramTracing.log(JoramTracing.DEBUG, "Replying to the"                         + " pending receive "                         + pendingReq.getRequestId()                         + " with a null message.");      sess.cnx.repliesTable.put(pendingReq.getKey(), new ConsumerMessages());      synchronized(lock) {        lock.notify();      }    }    // Synchronizing again:    syncro();    closed = true;        if (JoramTracing.dbgClient)      JoramTracing.log(JoramTracing.DEBUG, this + ": closed.");  }  /**   * Returns when the consumer isn't busy executing a "onMessage" or a   * "receive" anymore; method called for synchronization purposes.   */  synchronized void syncro() {}    /**   * Method called by the session daemon for passing an asynchronous message    * delivery to the listener.   */  synchronized void onMessage(com.scalagent.kjoram.messages.Message message)  {    String msgId = message.getIdentifier();    try {      // If the listener has been unset without having stopped the      // connection, this case might happen:      if (messageListener == null) {        if (JoramTracing.dbgClient)          JoramTracing.log(JoramTracing.WARN, this + ": an"                           + " asynchronous delivery arrived"                           + " for an improperly unset listener:"                           + " denying the message.");        sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,                                                     queueMode, true));      }      else {        // In session ack mode, preparing later ack or deny:        if (! sess.autoAck)          sess.prepareAck(targetName, msgId, queueMode);        try {          messageListener.onMessage(Message.wrapMomMessage(sess, message));          // Auto ack: acknowledging the message:          if (sess.autoAck)            sess.cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId,                                                         queueMode));        }        // Catching a JMSException means that the building of the Joram        // message went wrong: denying as expected by the spec:        catch (JMSException jE) {          JoramTracing.log(JoramTracing.ERROR, this                           + ": error while processing the"                           + " received message: " + jE);                    if (queueMode)            sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,                                                         queueMode));          else            sess.cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,                                                          queueMode));        }        // Catching a RuntimeException means that the client onMessage() code        // is incorrect; denying as expected by the JMS spec:        catch (RuntimeException rE) {          JoramTracing.log(JoramTracing.ERROR, this                           + ": RuntimeException thrown"                           + " by the listener: " + rE);          if (sess.autoAck && queueMode)            sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId,                                                         queueMode));          else if (sess.autoAck && ! queueMode)            sess.cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,                                                          queueMode));        }        // Sending a new request if queue mode:        if (queueMode) {          pendingReq = new ConsumerSetListRequest(targetName, selector, true);          pendingReq.setRequestId(sess.cnx.nextRequestId());          sess.cnx.requestsTable.put(pendingReq.getKey(), this);          sess.cnx.asyncRequest(pendingReq);        }      }    }    // Catching an IllegalStateException means that the acknowledgement or    // denying went wrong because the connection has been lost. Nothing more    // can be done here.    catch (JMSException jE) {      JoramTracing.log(JoramTracing.ERROR, this + ": " + jE);    }  }  /**   * The <code>ConsumerReplyTask</code> class is used by "receive" requests   * with timer for taking care of answering them if the timer expires.   */  private class ConsumerReplyTask extends TimerTask  {    /** The request to answer. */    private AbstractJmsRequest request;    /** The reply to put in the connection's table. */    private ConsumerMessages nullReply;    /**     * Constructs a <code>ConsumerReplyTask</code> instance.     *     * @param requestId  The request to answer.     */    ConsumerReplyTask(AbstractJmsRequest request)    {      this.request = request;      this.nullReply = new ConsumerMessages(request.getRequestId(),                                            targetName,                                            queueMode);    }    /**     * Method called when the timer expires, actually putting a null answer     * in the replies table and unlocking the requester.     */    public void run()    {      try {        if (JoramTracing.dbgClient)          JoramTracing.log(JoramTracing.WARN, "Receive request" +                           " answered because timer expired");        Lock lock = (Lock) sess.cnx.requestsTable.remove(request.getKey());        if (lock == null)          return;        synchronized (lock) {          sess.cnx.repliesTable.put(request.getKey(), nullReply);          lock.notify();        }      }      catch (Exception e) {}    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -