📄 blockingmessenger.java
字号:
performDeferredAction(action); } /** * {@inheritDoc} */ public void sendMessageB(Message msg, String service, String serviceParam) throws IOException { DeferredAction action; synchronized (stateMachine) { try { while ((currentMessage != null) && !inputClosed) { stateMachine.wait(); } } catch (InterruptedException ie) { throw new InterruptedIOException(); } if (inputClosed) { throw new IOException("Messenger is closed. It cannot be used to send messages"); } // We store the four elements of a pending msg separately. We do not want to pour millions of tmp objects on the GC for // nothing. storeCurrent(msg, service, serviceParam); stateMachine.saturatedEvent(); action = eventCalled(); } notifyChange(); // We called an event. State may have changed. performDeferredAction(action); // We called an event. There may be an action. (start, normally). // After deferred action, the message was either sent or failed. (done by this thread). // We can tell because, if failed, the currentMessage is still our msg. Throwable failure = null; synchronized (stateMachine) { if (currentMessage == msg) { failure = currentThrowable; if (failure == null) { failure = new IOException("Unknown error"); } // Ok, let it go, now. storeCurrent(null, null, null); } // Else, don't touch currentMsg; it's not our msg. } if (failure == null) { // No failure. Report ultimate succes. msg.setMessageProperty(Messenger.class, OutgoingMessageEvent.SUCCESS); return; } // Failure. See how we can manage to throw it. if (failure instanceof IOException) { throw (IOException) failure; } if (failure instanceof RuntimeException) { throw (RuntimeException) failure; } if (failure instanceof Error) { throw (Error) failure; } IOException failed = new IOException("Failure sending message"); failed.initCause(failure); throw failed; } /** * {@inheritDoc} */ public final boolean sendMessageN(Message msg, String service, String serviceParam) { boolean queued = false; DeferredAction action = DeferredAction.ACTION_NONE; boolean closed; synchronized (stateMachine) { closed = inputClosed; if ((!closed) && (currentMessage == null)) { // We copy the four elements of a pending msg right here. We do not want to pour millions of tmp objects on the GC. storeCurrent(msg, service, serviceParam); stateMachine.saturatedEvent(); action = eventCalled(); queued = true; } } if (queued) { notifyChange(); // We called an event. State may have changed. performDeferredAction(action); // We called an event. There may be an action. (start, normally). // After deferred action, the message was either sent or failed. (done by this thread). // We can tell because, if failed, the currentMessage is still our msg. synchronized (stateMachine) { if (currentMessage == msg) { if (currentThrowable == null) { currentThrowable = new IOException("Unknown error"); } msg.setMessageProperty(Message.class, currentThrowable); // Ok, let it go, now. storeCurrent(null, null, null); } else { msg.setMessageProperty(Message.class, OutgoingMessageEvent.SUCCESS); // Don't touch the current msg; it's not our msg. } } // Yes, we return true in either case. sendMessageN is supposed to be async. If a message fails // after it was successfuly queued, the error is not reported by the return value, but only by // the message property (and select). Just making sure the behaviour is as normal as can be // even it means suppressing some information. return true; } // Not queued. Either closed, or currently sending. If inputClosed, that's what we report. msg.setMessageProperty(Messenger.class, closed ? new OutgoingMessageEvent(msg, new IOException("This messenger is closed. " + "It cannot be used to send messages.")) : OutgoingMessageEvent.OVERFLOW); return false; } /** * {@inheritDoc} */ public final void resolve() {// We're born resolved. Don't bother calling the event. } /** * {@inheritDoc} */ public final int getState() { return stateMachine.getState(); } /** * {@inheritDoc} */ public final Messenger getChannelMessenger(PeerGroupID redirection, String service, String serviceParam) { // Our transport is always in the same group. If the channel's target group is the same, no group // redirection is ever needed. return new BlockingMessengerChannel(getDestinationAddress(), homeGroupID.equals(redirection) ? null : redirection, service, serviceParam); } /** * Three exposed methods may need to inject new events in the system: sendMessageN, close, and shutdown. Since they can both * cause actions, and since connectAction and startAction are deferred, it seems possible that one of the * actions caused by send, close, or shutdown be called while connectAction or startAction are in progress. * * However, the state machine gives us a few guarantees: connectAction and startAction can never nest. We will not be * asked to perform one while still performing the other. Only the synchronous actions closeInput, closeOutput, or * failAll can possibly be requested in the interval. We could make more assumptions and simplify the code, but rather * keep at least some flexibility. */ private void performDeferredAction(DeferredAction action) { switch (action) { case ACTION_SEND: sendIt(); break; case ACTION_CONNECT: cantConnect(); break; } } /** * A shortHand for a frequently used sequence. MUST be called while synchronized on stateMachine. * * @return the deferred action. */ private DeferredAction eventCalled() { DeferredAction action = deferredAction; deferredAction = DeferredAction.ACTION_NONE; stateMachine.notifyAll(); return action; } /** * Performs the ACTION_SEND deferred action: sends the one msg in our one msg queue. * This method *never* sets the outcome message property. This is left to sendMessageN and sendMessageB, because * sendMessageB does not want to set it in any other case than success, while sendMessageN does it in all cases. * The problem with that is: how do we communicate the outcome to sendMessage{NB} without having to keep * the 1 msg queue locked until then (which would be in contradiction with how we interact with the state machine). * To make it really inexpensive, here's the trick: when a message fails currentMessage and currentFailure remain. * So the sendMessageRoutine can check them and known that it is its message and not another one that caused the * failure. If all is well, currentMessage and currentFailure are nulled and if another message is send immediately * sendMessage is able to see that its own message was processed fully. (this is a small cheat regarding the * state of saturation after failall, but that's not actually detectable from the outside: input is closed * before failall anyway. See failall for that part. */ private void sendIt() { if (currentMessage == null) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("Internal error. Asked to send with no message."); } return; } DeferredAction action; try { sendMessageBImpl(currentMessage, currentService, currentParam); } catch (Throwable any) { // Did not work. We report the link down and let the state machine tell us when to fail the msg. It is assumed that // when this happens, the cnx is already down. FIXME - jice@jxta.org 20040413: check with the various kind of funky // exception. Some may not mean the link is down synchronized (stateMachine) { currentThrowable = any; stateMachine.downEvent(); action = eventCalled(); } notifyChange(); performDeferredAction(action); // we expect connect but let the state machine decide. return; } // Worked. synchronized (stateMachine) { storeCurrent(null, null, null); stateMachine.idleEvent(); action = eventCalled(); } // We did go from non-idle to idle. Report it. notifyChange(); performDeferredAction(action); // should be none but let the state machine decide. } /** * Performs the ACTION_CONNECT deferred action: generate a downEvent since we cannot reconnect. */ private void cantConnect() { DeferredAction action; synchronized (stateMachine) { stateMachine.downEvent(); action = eventCalled(); } notifyChange(); performDeferredAction(action); // should be none but let the state machine decide. } /* * Abstract methods to be provided by implementer (a transport for example). * To adapt legacy transport, keep extending BlockingMessenger and just rename your close, isIdle, sendMessage and * getLogicalDestinationAddress methods to closeImpl, isIdleImpl, sendMessageBImpl, and getLogicalDestinationImpl, respectively. */ /** * Close connection. May fail current send. */ protected abstract void closeImpl(); /** * Send a message blocking as needed until the message is sent. * * @param message The message to send. * @param service The destination service. * @param param The destination serivce param. * @throws IOException Thrown for errors encountered while sending the message. */ protected abstract void sendMessageBImpl(Message message, String service, String param) throws IOException; /** * return true if this messenger has not been used for a long time. The definition of long time is: "sufficient such that closing it * is worth the cost of having to re-open". A messenger should self close if it thinks it meets the definition of * idle. BlockingMessenger leaves the evaluation to the transport but does the work automatically. <b>Important:</b> if * self destruction is used, this method must work; not just return false. See the constructor. In general, if closeImpl does * not need to do anything, then self destruction is not needed. * * @return {@code true} if theis messenger is, by it's own definition, idle. */ protected abstract boolean isIdleImpl(); /** * Obtain the logical destination address from the implementer (a transport for example). */ protected abstract EndpointAddress getLogicalDestinationImpl();}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -