📄 requestmultiplexer.java
字号:
requestId + ')'); if (status == Status.CLOSE) return null; return (ReplyListener)requestsTable.remove( new Integer(requestId)); } /** * Not synchronized because it may be called by the * demultiplexer during a concurrent close. It would deadlock * as the close waits for the demultiplexer to stop. */ private void route(AbstractJmsReply reply) { if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log( BasicLevel.DEBUG, "RequestMultiplexer.route(" + reply + ')'); int requestId = reply.getCorrelationId(); Integer requestKey = new Integer(requestId); ReplyListener rl = (ReplyListener)requestsTable.get(requestKey); if (reply instanceof MomExceptionReply) { MomException momExc = ((MomExceptionReply)reply).getException(); JMSException jmsExc = null; if (momExc instanceof AccessException) { jmsExc = new JMSSecurityException(momExc.getMessage()); } else if (momExc instanceof DestinationException) { jmsExc = new InvalidDestinationException(momExc.getMessage()); } else { jmsExc = new JMSException(momExc.getMessage()); } if (rl instanceof ErrorListener) { ((ErrorListener)rl).errorReceived(requestId, jmsExc); } else { // The listener is null or doesn't implement ErrorListener onException(jmsExc); } } else { if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log( BasicLevel.DEBUG, " -> rl = " + rl + ')'); if (rl != null) { try { if (rl.replyReceived(reply)) { requestsTable.remove(requestKey); } } catch (AbortedRequestException exc) { JoramTracing.dbgClient.log( BasicLevel.WARN, " -> Request aborted: " + requestId); abortReply(reply); } } else { if (JoramTracing.dbgClient.isLoggable(BasicLevel.WARN)) JoramTracing.dbgClient.log( BasicLevel.WARN, " -> Listener not found for the reply: " + requestId); abortReply(reply); } } } private void abortReply(AbstractJmsReply reply) { if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log( BasicLevel.DEBUG, "RequestMultiplexer.abortReply(" + reply + ')'); if (reply instanceof ConsumerMessages) { deny((ConsumerMessages)reply); } // Else nothing to do. } public void deny(ConsumerMessages messages) { if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log( BasicLevel.DEBUG, "RequestMultiplexer.deny(" + messages + ')'); Vector msgList = messages.getMessages(); Vector ids = new Vector(); for (int i = 0; i < msgList.size(); i++) { org.objectweb.joram.shared.messages.Message msg = (org.objectweb.joram.shared.messages.Message) msgList.elementAt(i); ids.addElement(msg.getIdentifier()); } SessDenyRequest deny = new SessDenyRequest( messages.comesFrom(), ids, messages.getQueueMode()); try { sendRequest(deny); } catch (JMSException exc) { if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log( BasicLevel.DEBUG, "", exc); // Connection failure // Nothing to do } } class onExceptionRunner implements Runnable { Exception exc; onExceptionRunner(Exception exc) { this.exc = exc; } public void run() { onException(exc); } } private void onException(Exception exc) { if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log( BasicLevel.DEBUG, "RequestMultiplexer.onException(" + exc + ')'); JMSException jmsExc; if (exc instanceof JMSException) { jmsExc = (JMSException) exc; } else { jmsExc = new IllegalStateException(exc.getMessage()); } if (exceptionListener != null) exceptionListener.onException(jmsExc); } public void schedule(TimerTask task, long period) { if (timer != null) { try { timer.schedule(task, period); } catch (Exception exc) { if (JoramTracing.dbgClient.isLoggable(BasicLevel.ERROR)) JoramTracing.dbgClient.log(BasicLevel.ERROR, "", exc); } } } public void setDemultiplexerDaemonName(String name) { demtpx.setName(name); } public String getDemultiplexerDaemonName() { return demtpx.getName(); } private class DemultiplexerDaemon extends fr.dyade.aaa.util.Daemon { DemultiplexerDaemon() { // The real name is set later when // the proxy id and connection id are known // see setDemultiplexerDaemonName() super("Connection#?"); } public void run() { try { loop: while (running) { canStop = true; AbstractJmsReply reply; try { reply = channel.receive(); if (reply instanceof ConsumerMessages) { java.util.Vector msgs = ((ConsumerMessages)reply).getMessages(); // set momMessage read-only for (int i = 0; i < msgs.size(); i++ ) ((org.objectweb.joram.shared.messages.Message) msgs.elementAt(i)).setReadOnly(); } } catch (Exception exc) { if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, "Exception during receive", exc); // Check if the connection is not already // closed (the exception may occur as a consequence // of a closure or at the same time as an independant // close call). if (! isClosed()) { RequestMultiplexer.this.close(); // The connection close() must be // called by another thread. Calling it with // this thread (demultiplexer daemon) could // lead to a deadlock if another thread called // close() just before. Closer closer = new Closer(exc); new Thread(closer).start(); } else { // Else it means that the connection is already closed // Runs the onException in a separate thread in order to avoid // deadlock in connector onException (synchronized). onExceptionRunner oer = new onExceptionRunner(exc); new Thread(oer).start(); } break loop; } canStop = false; route(reply); } } finally { finish(); } } /** * Enables the daemon to stop itself. */ public void stop() { if (isCurrentThread()) { finish(); } else { super.stop(); } } protected void shutdown() {} protected void close() {} } private class Closer implements Runnable { private Exception exc; Closer(Exception e) { exc = e; } public void run() { try { RequestMultiplexer.this.cnx.close(); } catch (JMSException exc2) { if (JoramTracing.dbgClient.isLoggable(BasicLevel.WARN)) JoramTracing.dbgClient.log(BasicLevel.WARN, "Error during close", exc2); } onException(exc); } } /** * Timer task responsible for sending a ping message * to the server if no request has been sent during * the specified timeout ('cnxPendingTimer' from the * factory parameters). */ private class HeartBeatTask extends TimerTask { private long heartBeat; HeartBeatTask(long heartBeat) { this.heartBeat = heartBeat; } public void run() { try { long date = System.currentTimeMillis(); if ((date - lastRequestDate) > heartBeat) { sendRequest(new PingRequest()); } } catch (Exception exc) { if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) JoramTracing.dbgClient.log(BasicLevel.DEBUG, "", exc); } } public void start() throws Exception { timer.schedule(this, heartBeat, heartBeat); } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -