⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 requestmultiplexer.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        requestId + ')');    if (status == Status.CLOSE) return null;    return (ReplyListener)requestsTable.remove(      new Integer(requestId));  }  /**   * Not synchronized because it may be called by the   * demultiplexer during a concurrent close. It would deadlock   * as the close waits for the demultiplexer to stop.   */  private void route(AbstractJmsReply reply) {    if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))      JoramTracing.dbgClient.log(        BasicLevel.DEBUG,         "RequestMultiplexer.route(" + reply + ')');    int requestId = reply.getCorrelationId();    Integer requestKey = new Integer(requestId);    ReplyListener rl = (ReplyListener)requestsTable.get(requestKey);    if (reply instanceof MomExceptionReply) {      MomException momExc = ((MomExceptionReply)reply).getException();      JMSException jmsExc = null;      if (momExc instanceof AccessException) {        jmsExc = new JMSSecurityException(momExc.getMessage());      } else if (momExc instanceof DestinationException) {        jmsExc = new InvalidDestinationException(momExc.getMessage());      } else {        jmsExc = new JMSException(momExc.getMessage());      }      if (rl instanceof ErrorListener) {        ((ErrorListener)rl).errorReceived(requestId, jmsExc);      } else {        // The listener is null or doesn't implement ErrorListener        onException(jmsExc);      }    } else {      if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))        JoramTracing.dbgClient.log(          BasicLevel.DEBUG, " -> rl = " + rl + ')');      if (rl != null) {        try {          if (rl.replyReceived(reply)) {            requestsTable.remove(requestKey);          }        } catch (AbortedRequestException exc) {          JoramTracing.dbgClient.log(            BasicLevel.WARN,             " -> Request aborted: " + requestId);          abortReply(reply);        }      } else {        if (JoramTracing.dbgClient.isLoggable(BasicLevel.WARN))          JoramTracing.dbgClient.log(            BasicLevel.WARN,             " -> Listener not found for the reply: " + requestId);        abortReply(reply);      }    }  }  private void abortReply(AbstractJmsReply reply) {    if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))      JoramTracing.dbgClient.log(        BasicLevel.DEBUG, "RequestMultiplexer.abortReply(" +         reply + ')');    if (reply instanceof ConsumerMessages) {      deny((ConsumerMessages)reply);    }    // Else nothing to do.  }  public void deny(ConsumerMessages messages) {    if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))      JoramTracing.dbgClient.log(        BasicLevel.DEBUG, "RequestMultiplexer.deny(" +         messages + ')');    Vector msgList = messages.getMessages();    Vector ids = new Vector();    for (int i = 0; i < msgList.size(); i++) {      org.objectweb.joram.shared.messages.Message msg =        (org.objectweb.joram.shared.messages.Message) msgList.elementAt(i);      ids.addElement(msg.getIdentifier());    }    SessDenyRequest deny = new SessDenyRequest(      messages.comesFrom(),      ids,      messages.getQueueMode());    try {      sendRequest(deny);    } catch (JMSException exc) {      if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))        JoramTracing.dbgClient.log(          BasicLevel.DEBUG, "", exc);      // Connection failure      // Nothing to do    }  }  class onExceptionRunner implements Runnable {    Exception exc;    onExceptionRunner(Exception exc) {      this.exc = exc;    }    public void run() {      onException(exc);    }  }  private void onException(Exception exc) {    if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))      JoramTracing.dbgClient.log(        BasicLevel.DEBUG, "RequestMultiplexer.onException(" + exc + ')');    JMSException jmsExc;    if (exc instanceof JMSException) {      jmsExc = (JMSException) exc;    } else {      jmsExc = new IllegalStateException(exc.getMessage());    }    if (exceptionListener != null)      exceptionListener.onException(jmsExc);  }  public void schedule(TimerTask task,                       long period) {    if (timer != null) {      try {        timer.schedule(task, period);      } catch (Exception exc) {        if (JoramTracing.dbgClient.isLoggable(BasicLevel.ERROR))          JoramTracing.dbgClient.log(BasicLevel.ERROR, "", exc);      }    }  }    public void setDemultiplexerDaemonName(String name) {    demtpx.setName(name);  }    public String getDemultiplexerDaemonName() {    return demtpx.getName();  }  private class DemultiplexerDaemon extends fr.dyade.aaa.util.Daemon {    DemultiplexerDaemon() {      // The real name is set later when      // the proxy id and connection id are known      // see setDemultiplexerDaemonName()      super("Connection#?");    }    public void run() {      try {        loop:        while (running) {          canStop = true;          AbstractJmsReply reply;          try {            reply = channel.receive();            if (reply instanceof ConsumerMessages) {              java.util.Vector msgs = ((ConsumerMessages)reply).getMessages();              // set momMessage read-only              for (int i = 0; i < msgs.size(); i++ )                ((org.objectweb.joram.shared.messages.Message)                 msgs.elementAt(i)).setReadOnly();            }          } catch (Exception exc) {            if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))              JoramTracing.dbgClient.log(BasicLevel.DEBUG,                                         "Exception during receive", exc);            // Check if the connection is not already            // closed (the exception may occur as a consequence            // of a closure or at the same time as an independant            // close call).            if (! isClosed()) {              RequestMultiplexer.this.close();              // The connection close() must be              // called by another thread. Calling it with              // this thread (demultiplexer daemon) could              // lead to a deadlock if another thread called              // close() just before.              Closer closer = new Closer(exc);              new Thread(closer).start();            } else {              // Else it means that the connection is already closed              // Runs the onException in a separate thread in order to avoid              // deadlock in connector onException (synchronized).              onExceptionRunner oer = new onExceptionRunner(exc);              new Thread(oer).start();            }                        break loop;          }          canStop = false;           route(reply);        }      } finally {        finish();      }    }        /**     * Enables the daemon to stop itself.     */    public void stop() {      if (isCurrentThread()) {        finish();      } else {        super.stop();      }    }    protected void shutdown() {}    protected void close() {}  }    private class Closer implements Runnable {    private Exception exc;        Closer(Exception e) {      exc = e;    }        public void run() {      try {        RequestMultiplexer.this.cnx.close();      } catch (JMSException exc2) {        if (JoramTracing.dbgClient.isLoggable(BasicLevel.WARN))          JoramTracing.dbgClient.log(BasicLevel.WARN,              "Error during close", exc2);      }            onException(exc);    }  }  /**   * Timer task responsible for sending a ping message   * to the server if no request has been sent during    * the specified timeout ('cnxPendingTimer' from the   * factory parameters).   */  private class HeartBeatTask extends TimerTask {        private long heartBeat;    HeartBeatTask(long heartBeat) {      this.heartBeat = heartBeat;    }    public void run() {      try {        long date = System.currentTimeMillis();                if ((date - lastRequestDate) > heartBeat) {          sendRequest(new PingRequest());        }      } catch (Exception exc) {        if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))          JoramTracing.dbgClient.log(BasicLevel.DEBUG, "", exc);      }    }    public void start() throws Exception {      timer.schedule(this, heartBeat, heartBeat);    }  }  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -