⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 session.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
    throws JMSException {    for (int j = 0; j < consumers.size(); j++) {      MessageConsumer cons =         (MessageConsumer) consumers.elementAt(j);      if (agentId.equals(cons.dest.agentId)) {        throw new JMSException(          "Consumers still exist for this temp queue.");      }    }  }  /**   * Called here and by sub-classes.   */  protected void addProducer(MessageProducer mp) {    producers.addElement(mp);  }  /**   * Called by MessageProducer.   */  synchronized void closeProducer(MessageProducer mp) {    producers.removeElement(mp);  }  /**   * Called by Queue browser.   */  synchronized void closeBrowser(QueueBrowser qb) {    browsers.removeElement(qb);  }  /**   * Called by MessageConsumer   */  synchronized MessageConsumerListener addMessageListener(    MessageConsumerListener mcl) throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.addMessageListener(" + mcl + ')');    checkClosed();    checkThreadOfControl();    checkSessionMode(SessionMode.LISTENER);    mcl.start();        if (status == Status.START &&        listenerCount == 0) {      doStart();    }    listenerCount++;    return mcl;  }  /**   * Called by MessageConsumer. The thread of control and the status   * must be checked if the call results from a setMessageListener   * but not from a close.   */  void removeMessageListener(    MessageConsumerListener mcl,    boolean check) throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.removeMessageListener(" +         mcl + ',' + check + ')');    if (check) {      checkClosed();      checkThreadOfControl();    }        // This may block if a message listener    // is currently receiving a message (onMessage is called)    // so we have to be out of the synchronized block.    mcl.close();        synchronized (this) {      listenerCount--;      if (status == Status.START && listenerCount == 0) {        try {          repliesIn.stop();        } catch (InterruptedException iE) {        }        // All the message listeners have been closed        // so we can call doStop() in a synchronized        // block. No deadlock possible.        doStop();      }    }  }  /**   * Called by MessageConsumerListener (demultiplexer thread   * from RequestMultiplexer) in order to distribute messages    * to a message consumer.   * Not synchronized because a concurrent close   * can be done.   *   * @exception    */  void pushMessages(SingleSessionConsumer consumerListener,                    ConsumerMessages messages) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.pushMessages(" +         consumerListener + ',' + messages + ')');    repliesIn.push(      new MessageListenerContext(        consumerListener, messages));  }  /**   * Called by ConnectionConsumer in order   * to distribute a message through the    * method run().   * (session mode is APP_SERVER)   */  void onMessage(org.objectweb.joram.shared.messages.Message momMsg) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(BasicLevel.DEBUG, "Session.onMessage(" + momMsg + ')');    repliesIn.push(momMsg);  }  /**   * Called by:   * - method run (application server thread) synchronized   */  private void ackMessage(String targetName,                           String msgId,                          boolean queueMode)     throws JMSException {    ConsumerAckRequest ack = new ConsumerAckRequest(      targetName, queueMode);    ack.addId(msgId);    mtpx.sendRequest(ack);  }  /**   * Called by:   * - method run (application server thread) synchronized   * - method onMessage (SessionDaemon thread) not synchronized   * but no concurrent call except a close which first stops   * SessionDaemon.   */  private void denyMessage(String targetName,                            String msgId,                           boolean queueMode)     throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.denyMessage(" +         targetName + ',' +         msgId + ',' +         queueMode + ')');    ConsumerDenyRequest cdr = new ConsumerDenyRequest(      targetName, msgId, queueMode);    if (queueMode) {      requestor.request(cdr);    } else {      mtpx.sendRequest(cdr, null);    }  }    /**   * Called by SessionDaemon.   * Not synchronized but no concurrent call except    * a close which first stops SessionDaemon.   */  private void onMessages(MessageListenerContext ctx) throws JMSException {    Vector msgs = ctx.messages.getMessages();    for (int i = 0; i < msgs.size(); i++) {      onMessage(        (org.objectweb.joram.shared.messages.Message)msgs.elementAt(i),        ctx.consumerListener);    }  }  /**   * Called by onMessage()   */  private Message prepareMessage(    org.objectweb.joram.shared.messages.Message momMsg,    String targetName,    boolean queueMode) throws JMSException {    if (! autoAck) {      prepareAck(targetName,                  momMsg.getIdentifier(),                  queueMode);    }        Message msg;    try {      return Message.wrapMomMessage(this, momMsg);          } catch (JMSException jE) {      // Catching a JMSException means that the building of the Joram      // message went wrong: denying the message:      if (autoAck) {        denyMessage(targetName,                     momMsg.getIdentifier(),                     queueMode);      }      return null;    }  }    /**   * Called by onMessages()   */  void onMessage(    org.objectweb.joram.shared.messages.Message momMsg,    MessageConsumerListener consumerListener) throws JMSException {        Message msg = prepareMessage(      momMsg,       consumerListener.getTargetName(),      consumerListener.getQueueMode());        if (msg == null) return;        try {      if (messageListener == null) {        // Standard JMS (MessageConsumer)        consumerListener.onMessage(msg, acknowledgeMode);      } else {        // ASF (ConnectionConsumer)        consumerListener.onMessage(msg, messageListener, acknowledgeMode);      }    } catch (JMSException exc) {      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(          BasicLevel.DEBUG, "", exc);      if (autoAck || consumerListener.isClosed()) {        denyMessage(consumerListener.getTargetName(),                     momMsg.getIdentifier(),                     consumerListener.getQueueMode());      }      return;    }        if (recover) {      // The session has been recovered by the      // listener thread.      if (autoAck) {        denyMessage(consumerListener.getTargetName(),                     momMsg.getIdentifier(),                     consumerListener.getQueueMode());      } else {        doRecover();        recover = false;      }    } else {      if (autoAck) {        consumerListener.ack(            momMsg.getIdentifier(),             acknowledgeMode);      }    }  }  /**   * Called by MessageProducer.   */  synchronized void send(Destination dest,                          javax.jms.Message message,                         int deliveryMode,                          int priority,                         long timeToLive,                         boolean timestampDisabled) throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,        "Session.send(" +         dest + ',' +        message + ',' +         deliveryMode + ',' +         priority + ',' +         timeToLive + ',' +         timestampDisabled + ')');        checkClosed();    checkThreadOfControl();    // Updating the message property fields:    String msgID = cnx.nextMessageId();    message.setJMSMessageID(msgID);    message.setJMSDeliveryMode(deliveryMode);    message.setJMSDestination(dest);    if (timeToLive == 0) {      message.setJMSExpiration(0);    } else {      message.setJMSExpiration(System.currentTimeMillis() + timeToLive);    }     message.setJMSPriority(priority);    if (! timestampDisabled) {      message.setJMSTimestamp(System.currentTimeMillis());    }        org.objectweb.joram.shared.messages.Message momMsg = null;    if (message instanceof org.objectweb.joram.client.jms.Message) {      // If the message to send is a proprietary one, getting the MOM message      // it wraps:      momMsg = ((Message) message).getMomMessage();    } else if (message instanceof javax.jms.Message) {      // If the message to send is a non proprietary JMS message, building      // a proprietary message and then getting the MOM message it wraps:      try {        Message joramMessage = Message.convertJMSMessage(message);        momMsg = joramMessage.getMomMessage();      } catch (JMSException jE) {        MessageFormatException mE = new MessageFormatException("Message to"                                                               + " send is"                                                               + " invalid.");        mE.setLinkedException(jE);        throw mE;      }    } else {      // If not, building a new request and sending it:      MessageFormatException mE = new MessageFormatException("Message to"                                                             + " send is"                                                             + " invalid.");      throw mE;    }    if (transacted) {      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG, "Buffering the message.");      // If the session is transacted, keeping the request for later delivery:      prepareSend(        dest,        (org.objectweb.joram.shared.messages.Message) momMsg.clone());    } else {      ProducerMessages pM =         new ProducerMessages(dest.getName(),                             (org.objectweb.joram.shared.messages.Message) momMsg.clone());            if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(BasicLevel.DEBUG, "Sending " + momMsg);            if (asyncSend || (! momMsg.getPersistent())) {        // Asynchronous sending        pM.setAsyncSend(true);             mtpx.sendRequest(pM);      } else {        requestor.request(pM);      }    }  }  /**   * Called by MessageConsumer. The requestor raises an   * exception if it is called during another request.   * This cannot happen as a session is monothreaded.   * A concurrent close first aborts the current request   * so it releases the requestor for a subsequent use.   */  synchronized AbstractJmsReply syncRequest(    AbstractJmsRequest request)     throws JMSException {    return requestor.request(request);  }  final Connection getConnection() {    return cnx;  }  final String getId() {    return ident;  }  final RequestMultiplexer getRequestMultiplexer() {    return mtpx;  }  public final boolean isAutoAck() {    return autoAck;  }  private void activateMessageInput() throws JMSException {    for (int i = 0; i < consumers.size(); i++) {      MessageConsumer cons =         (MessageConsumer) consumers.elementAt(i);      cons.activateMessageInput();    }    passiveMsgInput = false;  }  private void passivateMessageInput() throws JMSException {    for (int i = 0; i < consumers.size(); i++) {      MessageConsumer cons =         (MessageConsumer) consumers.elementAt(i);      cons.passivateMessageInput();    }    passiveMsgInput = true;  }  /**   * Set asyncSend for this Session.   *    * @param b   */  public void setAsyncSend(boolean b) {    asyncSend = b;  }    /**   * Set queueMessageReadMax for this Session.   *    * @param i   */  public void setQueueMessageReadMax(int i) {    queueMessageReadMax = i;  }    public final int getQueueMessageReadMax() {    return queueMessageReadMax;  }    public final int getTopicAckBufferMax() {    return topicAckBufferMax;  }    public void setTopicAckBufferMax(int i) {    topicAckBufferMax = i;  }    public final int getTopicActivationThreshold() {    return topicActivationThreshold;  }    public void setTopicActivationThreshold(int i) {    topicActivationThreshold = i;  }    public final int getTopicPassivationThreshold() {    return topicPassivationThreshold;  }    public void setTopicPassivationThreshold(int i) {    topicPassivationThreshold = i;  }    /**   * The <code>SessionCloseTask</code> class is used by non-XA transacted   * sessions for taking care of closing them if they tend to be pending,   * and if a transaction timer has been set.   */  private class SessionCloseTask extends TimerTask {    private long txPendingTimer;    SessionCloseTask(long txPendingTimer) {      this.txPendingTimer = txPendingTimer;    }    /** Method called when the timer expires, actually closing the session. */    public void run() {      try {        if (logger.isLoggable(BasicLevel.WARN))          logger.log(BasicLevel.WARN, "Session closed "                                     + "because of pending transaction");        close();      } catch (Exception e) {}    }    public void start() {      try {        mtpx.schedule(this, txPendingTimer);      } catch (Exception e) {}    }  }  /**   * This thread controls the session in mode LISTENER.   */  private class SessionDaemon extends fr.dyade.aaa.util.Daemon {    SessionDaemon() {      super("Connection#" + cnx + " - Session#" + ident);    }    public void run() {      while (running) {        canStop = true;        MessageListenerContext ctx;        try {                    ctx = (MessageListenerContext)repliesIn.get();          repliesIn.pop();        } catch (InterruptedException exc) {          if (logger.isLoggable(BasicLevel.DEBUG))            logger.log(BasicLevel.DEBUG, "", exc);          return;        }        canStop = false;        try {          onMessages(ctx);        } catch (JMSException exc) {          if (logger.isLoggable(BasicLevel.DEBUG))            logger.log(BasicLevel.DEBUG, "", exc);        }      }    }    Thread getThread() {      return thread;    }    protected void shutdown() {}    protected void close() {}  }  /**   * Context used to associate a message consumer with    * a set of messages to consume.   */  private static class MessageListenerContext {    SingleSessionConsumer consumerListener;    ConsumerMessages messages;    MessageListenerContext(      SingleSessionConsumer consumerListener,       ConsumerMessages messages) {      this.consumerListener = consumerListener;      this.messages = messages;    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -