⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 session.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
    checkThreadOfControl();        MessageConsumer cons;    if (consumers != null) {      for (int i = 0; i < consumers.size(); i++) {        cons = (MessageConsumer) consumers.get(i);        if (! cons.queueMode && cons.targetName.equals(name))          throw new JMSException("Can't delete durable subscription " + name                                 + " as long as an active subscriber exists.");      }    }    syncRequest(new ConsumerUnsubRequest(name));  }  /**   * API method.   *   * @exception JMSException   */  public void close() throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.close()");    closer.close();  }  /**   * This class synchronizes the close.   * Close can't be synchronized with 'this'    * because the Session must be accessed   * concurrently during its closure. So   * we need a second lock.   */  class Closer {    synchronized void close()       throws JMSException {      doClose();    }  }  void doClose() throws JMSException {    synchronized (this) {      if (status == Status.CLOSE) return;    }        // Don't synchronize the consumer closure because    // it could deadlock with message listeners or    // client threads still using the session.    Vector consumersToClose = (Vector)consumers.clone();    consumers.clear();    for (int i = 0; i < consumersToClose.size(); i++) {      MessageConsumer mc =         (MessageConsumer)consumersToClose.elementAt(i);      try {        mc.close();      } catch (JMSException exc) {        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(            BasicLevel.DEBUG, "", exc);      }    }        Vector browsersToClose = (Vector)browsers.clone();    browsers.clear();    for (int i = 0; i < browsersToClose.size(); i++) {      QueueBrowser qb =         (QueueBrowser)browsersToClose.elementAt(i);      try {        qb.close();      } catch (JMSException exc) {        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(            BasicLevel.DEBUG, "", exc);      }    }        Vector producersToClose = (Vector)producers.clone();    producers.clear();    for (int i = 0; i < producersToClose.size(); i++) {      MessageProducer mp =         (MessageProducer)producersToClose.elementAt(i);      try {        mp.close();      } catch (JMSException exc) {        if (logger.isLoggable(BasicLevel.DEBUG))          logger.log(            BasicLevel.DEBUG, "", exc);      }    }        // This is now in removeMessageListener    // called by MessageConsumer.close()    // (see above)//     try {//       repliesIn.stop();//     } catch (InterruptedException iE) {}          stop();    // The requestor must be closed because    // it could be used by a concurrent receive    // as it is not synchronized (see receive()).    receiveRequestor.close();          // Denying the non acknowledged messages:    if (transacted) {      rollback();    } else {      deny();    }    cnx.closeSession(this);          synchronized (this) {      setStatus(Status.CLOSE);    }  }  /**   * Starts the asynchronous deliveries in the session.   * <p>   * This method is called by a started connection.   */  synchronized void start() {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.start()");    if (status == Status.CLOSE) return;    if (status == Status.START) return;    if (listenerCount > 0) {      doStart();    }    setStatus(Status.START);  }  private void doStart() {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.doStart()");    repliesIn.start();    daemon = new SessionDaemon();    daemon.setDaemon(false);    daemon.start();    singleThreadOfControl = daemon.getThread();  }  /**   * Stops the asynchronous deliveries processing in the session.   * <p>   * This method must be carefully used. When the session is stopped, the   * connection might very well going on pushing deliveries in the   * session's queue. If the session is never re-started, these deliveries   * will never be poped out, and this may lead to a situation of consumed   * but never acknowledged messages.   * <p>   * This fatal situation never occurs as the <code>stop()</code> method is   * either called by he <code>Session.close()</code>   * and <code>Connection.stop()</code> methods, which first empties the   * session's deliveries and forbid any further push.   */  synchronized void stop() {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,        "Session.stop()");    if (status == Status.STOP ||        status == Status.CLOSE) return;    // DF: According to JMS 1.1 java doc    // the method stop "blocks until receives in progress have completed."     // But the JMS 1.1 specification doesn't mention this point.     // So we don't implement it: a stop doesn't block until     // receives have completed.//     while (requestStatus != RequestStatus.NONE) {//       try {//         wait();//       } catch (InterruptedException exc) {}//     }    doStop();    setStatus(Status.STOP);  }  private void doStop() {    if (daemon != null) {      daemon.stop();      daemon = null;      singleThreadOfControl = null;    }  }  /**    * Method called by message producers when producing a message for   * preparing the session to later commit it.   *   * @param dest  The destination the message is destinated to.   * @param msg  The message.   */  private void prepareSend(    Destination dest,     org.objectweb.joram.shared.messages.Message msg)     throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,        "Session.prepareSend(" + dest + ',' + msg + ')');    checkClosed();    checkThreadOfControl();        // If the transaction was scheduled, cancelling:    if (scheduled)      closingTask.cancel();    ProducerMessages pM = (ProducerMessages) sendings.get(dest.getName());    if (pM == null) {      pM = new ProducerMessages(dest.getName());      sendings.put(dest.getName(), pM);    }    pM.addMessage(msg);    // If the transaction was scheduled, re-scheduling it:    if (scheduled)      closingTask.start();  }  /**    * Method called by message consumers when receiving a message for   * preparing the session to later acknowledge or deny it.   *   * @param name  Name of the destination or of the proxy subscription    *          the message comes from.   * @param id  Identifier of the consumed message.   * @param queueMode  <code>true</code> if the message consumed comes from   *          a queue.   */  private void prepareAck(String name,                           String id,                           boolean queueMode) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,        "Session.prepareAck(" +         name + ',' + id + ',' + queueMode + ')');    // If the transaction was scheduled, cancelling:    if (scheduled)      closingTask.cancel();    MessageAcks acks = (MessageAcks) deliveries.get(name);    if (acks == null) {      acks = new MessageAcks(queueMode);      deliveries.put(name, acks);    }    acks.addId(id);    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG, " -> acks = " + acks);    // If the transaction must be scheduled, scheduling it:    if (closingTask != null) {      scheduled = true;      closingTask.start();    }  }  /**   * Method acknowledging the received messages.   * Called by Message.   */  synchronized void acknowledge() throws JMSException {    checkClosed();    if (transacted ||        acknowledgeMode != javax.jms.Session.CLIENT_ACKNOWLEDGE) {      return;    }    doAcknowledge();  }  /**   * Method acknowledging the received messages.   */  private void doAcknowledge() throws JMSException {    Enumeration targets = deliveries.keys();    while (targets.hasMoreElements()) {      String target = (String) targets.nextElement();      MessageAcks acks = (MessageAcks) deliveries.remove(target);      mtpx.sendRequest(        new SessAckRequest(          target,           acks.getIds(),          acks.getQueueMode()));    }  }  /**    * Method denying the received messages.   *   * Called from:   * - rollback -> synchronized client thread   * - recover -> synchronized client thread   * - close -> synchronized client thread    * - onMessage -> not synchronized session daemon.   * It is the only thread that can run into the session   * (session mode = LISTENER) except for the method close that   * can be called concurrently. But close() first stops the session   * daemon and then calls deny().   *   * The hashtable deliveries is also accessed from:   * - acknowledge -> synchronized client thread   * - commit -> synchronized client thread   * - receive -> synchronized client thread.   * - onMessage -> not synchronized session daemon (see above).   */  private void deny() throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.deny()");    Enumeration targets = deliveries.keys();    while (targets.hasMoreElements()) {      String target = (String) targets.nextElement();      MessageAcks acks = (MessageAcks) deliveries.remove(target);      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(          BasicLevel.DEBUG,           " -> acks = " + acks + ')');      SessDenyRequest deny = new SessDenyRequest(        target,         acks.getIds(),         acks.getQueueMode());      if (acks.getQueueMode()) {        requestor.request(deny);      } else {        mtpx.sendRequest(deny);      }    }  }  /**   * Called by MessageConsumer   * Not synchronized because ot it can be   * concurrently called by close()   * and Connection.stop().   */  javax.jms.Message receive(    long requestTimeToLive,    long waitTimeOut,    MessageConsumer mc,    String targetName,    String selector,    boolean queueMode)     throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.receive(" +         requestTimeToLive + ',' +         waitTimeOut + ',' +         targetName + ',' +         selector + ',' +         queueMode + ')');    preReceive(mc);    try {      ConsumerMessages reply = null;      ConsumerReceiveRequest request =        new ConsumerReceiveRequest(          targetName,           selector,           requestTimeToLive,          queueMode);      if (receiveAck) request.setReceiveAck(true);      reply =        (ConsumerMessages)receiveRequestor.request(          request,          waitTimeOut);      if (logger.isLoggable(BasicLevel.DEBUG))        logger.log(          BasicLevel.DEBUG,           " -> reply = " + reply);              synchronized (this) {        // The session may have been         // closed in between.        if (status == Status.CLOSE) {          if (reply != null) {            mtpx.deny(reply);          }          return null;        }                if (reply != null) {          Vector msgs = reply.getMessages();          if (msgs != null && ! msgs.isEmpty()) {            org.objectweb.joram.shared.messages.Message msg =              (org.objectweb.joram.shared.messages.Message) msgs.get(0);            String msgId = msg.getIdentifier();                        // Auto ack: acknowledging the message:            if (autoAck && ! receiveAck) {              ConsumerAckRequest req =                 new ConsumerAckRequest(                  targetName,                  queueMode);              req.addId(msgId);              mtpx.sendRequest(req);            } else {              prepareAck(targetName,                         msgId,                         queueMode);            }            return Message.wrapMomMessage(this, msg);          } else {            return null;          }        } else {            return null;        }      }    } finally {      postReceive();    }  }  /**   * First stage before calling the proxy and waiting   * for the reply. It is synchronized because it   * locks the session in order to prevent any other   * thread to make another operation.   */  private synchronized void preReceive(    MessageConsumer mc) throws JMSException {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.preReceive(" + mc + ')');    // The message consumer may have been closed    // after the first check (in MessageConsumer.receive())    // and before preReceive.    mc.checkClosed();    checkClosed();    checkThreadOfControl();        // Don't call checkSessionMode because    // we also check that the session mode is not     // already set to RECEIVE.    switch (sessionMode) {    case SessionMode.NONE:      setSessionMode(SessionMode.RECEIVE);      break;    default:      throw new IllegalStateException("Illegal session mode");    }    if (requestStatus != RequestStatus.NONE)       throw new IllegalStateException("Illegal request status");    singleThreadOfControl = Thread.currentThread();    pendingMessageConsumer = mc;        setRequestStatus(RequestStatus.RUN);  }    /**   * Final stage after calling the reply has been returned   * by the roxy. It releases the session and enables another   * thread to call it.   */  private synchronized void postReceive() {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.postReceive()");    singleThreadOfControl = null;    pendingMessageConsumer = null;    setRequestStatus(RequestStatus.NONE);    setSessionMode(SessionMode.NONE);    notifyAll();  }    /**   * Called here and by sub-classes.   */  protected synchronized void addConsumer(    MessageConsumer mc) {    consumers.addElement(mc);  }  /**   * Called by MessageConsumer.   */  synchronized void closeConsumer(MessageConsumer mc) {    if (logger.isLoggable(BasicLevel.DEBUG))      logger.log(        BasicLevel.DEBUG,         "Session.closeConsumer(" + mc + ')');    consumers.removeElement(mc);    if (pendingMessageConsumer == mc) {      if (requestStatus == RequestStatus.RUN) {        // Close the requestor. A call to abortRequest()         // is not enough because the receiving thread         // may call request() just after this thread         // calls abort().        receiveRequestor.close();        // Wait for the end of the request        try {          while (requestStatus != RequestStatus.NONE) {            wait();          }        } catch (InterruptedException exc) {}        // Create a new requestor.        receiveRequestor = new Requestor(mtpx);      }    }  }    /**   * Called by Connection (i.e. temporary destinations deletion)   */  synchronized void checkConsumers(String agentId) 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -