📄 asyncchannelmessenger.java
字号:
stateMachine.saturatedEvent(); action = eventCalled(false); change = true; } else { // Was already saturated. queued = false; } } if (queued && change) { // If not queued, there was no change of condition as far as // outsiders are concerned. (redundant saturatedEvent, only // defensive; to guarantee statemachine in sync). else, if the // saturation state did not change, we have no state change to // notify. notifyChange(); } performDeferredAction(action); // Before we return, make sure that this channel remains referenced if // it has messages. It could become unreferenced if it is not yet // resolved and the application lets go of it after sending messages. // This means that we may need to do something only in the resolpending // and resolsaturated cases. The way we do this test, there can be false // positives. They're dealt with as part of the action that is carried // out. if ((stateMachine.getState() & (Messenger.RESOLPENDING | Messenger.RESOLSATURATED)) != 0) { resolPendingImpl(); } return queued; } /** * {@inheritDoc} */ public final boolean sendMessageN( Message msg, String rService, String rServiceParam ) { try { if (sendMessageCommon(msg, rService, rServiceParam)) { // If it worked the message is queued; the outcome will be notified later. return true; } // Non-blocking and queue full: report overflow. msg.setMessageProperty(Messenger.class, OutgoingMessageEvent.OVERFLOW); return false; } catch (IOException oie) { msg.setMessageProperty(Messenger.class, new OutgoingMessageEvent(msg, oie)); return false; } } /** * {@inheritDoc} */ public final void sendMessageB( Message msg, String rService, String rServiceParam ) throws IOException { try { while (true) { // Do a shallow check on the queue. If it seems empty (without // getting into a critical section to verify it), then yielding // is good bet. It is a lot cheaper and smoother than waiting. if (queue.getCurrentInQueue() == queue.getMaxQueueSize()) { Thread.yield(); } // if sendMessageCommon says "true" it worked. if (sendMessageCommon(msg, rService, rServiceParam)) { return; } // If we reached this far, it is neither closed, nor ok. So it was saturated. synchronized(stateMachine) { // Cheaper than waitState. sendMessageCommon already does // the relevant state checks. stateMachine.wait(); } } } catch(InterruptedException ie) { InterruptedIOException iie = new InterruptedIOException("Message send interrupted"); iie.initCause( ie ); throw iie; } } /** * {@inheritDoc} */ public final void resolve() { int action = ACTION_NONE; synchronized(stateMachine) { stateMachine.resolveEvent(); action = eventCalled(true); } notifyChange(); performDeferredAction(action); // we expect connect but let the state machine decide. } /** * {@inheritDoc} */ public final int getState() { return stateMachine.getState(); } /** * {@inheritDoc} */ public final Messenger getChannelMessenger( PeerGroupID redirection, String service, String serviceParam ) { // Channels don't make channels. return null; } /** * Three exposed methods may need to inject new events in the system: * sendMessageN, close, and shutdown. * Since they can all cause actions, and since connectAction and * startAction are deferred, it seems possible that one of the actions * caused by send, close, or shutdown be called while connectAction or * startAction are in progress. * * <p/>However, the state machine gives us a few guarantees: connectAction * and startAction can never nest. We will not be asked to perform one while * still performing the other. Only the synchronous actions closeInput, * closeOutput, or failAll can possibly be requested in the interval. We * could make more assumptions and simplify the code, but rather keep at * least some flexibility. * * DEAD LOCK WARNING: the implementor's method invoke some of our call backs * while synchronized. Then our call backs synchronize on the state machine * in here. This nesting order must always be respected. As a result, we can * never invoke implementors methods while synchronized. Hence the * deferredAction processing. */ private void performDeferredAction(int action) { switch(action) { case ACTION_SEND: startImpl(); break; case ACTION_CONNECT: connectImpl(); break; } } /** * A shortHand for a frequently used sequence. MUST be called while * synchronized on stateMachine. * * @param notifyAll If {@code true} then this is a life-cycle event and all * waiters on the stateMachine should be notified. If {@code false} then * only a single waiter will be notified for simple activity events. * @return the deferred action. */ private int eventCalled(boolean notifyAll) { int action = deferredAction; deferredAction = ACTION_NONE; if (notifyAll) { stateMachine.notifyAll(); } else { stateMachine.notify(); } return action; } /* * Implement the methods that our shared messenger will use to report progress. */ /** * The implementation will invoke this method when it becomes resolved, * after connectImpl was invoked. */ protected void up() { int action = ACTION_NONE; synchronized(stateMachine) { stateMachine.upEvent(); action = eventCalled(true); } notifyChange(); performDeferredAction(action); // we expect start but let the state machine decide. } /** * The implementation invokes this method when it becomes broken. */ protected void down() { int action = ACTION_NONE; synchronized(stateMachine) { stateMachine.downEvent(); action = eventCalled(true); } notifyChange(); performDeferredAction(action); // we expect connect but let the state machine decide. } /** * Here, we behave like a queue to the shared messenger. When we report * being empty, though, we're automatically removed from the active queues * list. We'll go back there the next time we have something to send by * calling startImpl. */ protected PendingMessage peek() { PendingMessage theMsg = null; int action = ACTION_NONE; synchronized(stateMachine) { // We like the msg to keep occupying space in the queue until it's // out the door. That way, idleness (that is, not currently working // on a message), is always consistent with queue emptyness. theMsg = (PendingMessage) queue.peek(); if (theMsg == null) { stateMachine.idleEvent(); action = eventCalled(false); // We do not notifyChange, here, because, if the queue is empty, // it was already notified when the last message was popped. The // call to idleEvent is only defensive programming to make extra // sure the state machine is in sync. return null; } if (outputClosed) { // We've been asked to stop sending. Which, if we were sending, //must be notified by either an idle event or a down // event. Nothing needs to happen to the shared messenger. We're // just a channel. stateMachine.downEvent(); action = eventCalled(true); theMsg = null; } } notifyChange(); performDeferredAction(action); // we expect none but let the state machine decide. return theMsg; } /** * One message done. Update the saturated/etc state accordingly. * * @return true if there are more messages after the one we popped. */ protected boolean pop() { boolean result; int action = ACTION_NONE; synchronized(stateMachine) { queue.pop(); if (queue.peek() == null) { stateMachine.idleEvent(); action = eventCalled(false); result = false; } else { stateMachine.msgsEvent(); action = eventCalled(false); result = true; } } notifyChange(); performDeferredAction(action); // we expect none but let the state machine decide. return result; } /** * We invoke this method to be placed on the list of channels that have * message to send. * * NOTE that it is the shared messenger responsibility to synchronize so * that we cannot be added to the active list just before we get removed * due to reporting an empty queue in parallel. So, if we report an empty * queue and have a new message to send before the shared messenger removes * us form the active list, startImpl will block until the removal is done. * Then we'll be added back. * * If it cannot be done, it means that the shared messenger is no longer * usable. It may call down() in sequence. Out of defensiveness, it should * do so without holding its lock. */ protected abstract void startImpl(); /** * We invoke this method to be placed on the list of channels that are * waiting for resolution. * * If it cannot be done, it means that the shared messenger is no longer * usable. It may call down() in sequence. Out of defensiveness, it should * do so without holding its lock. If the messenger is already resolved it * may call up() in sequence. Same wisdom applies. It is a good idea to * create channels in the resolved state if the shared messenger is already * resolved. That avoids this extra contortion. */ protected abstract void connectImpl(); /** * This is invoked to inform the implementation that this channel is now in * the resolPending or resolSaturated state. This is specific to this type * of channels. The shared messenger must make sure that this channel * remains strongly referenced, even though it is not resolved, because * there are messages in it. It is valid for an application to let go of a * channel after sending a message, even if the channel is not yet * resolved. The message will go if/when the channel resolves. This method * may be invoked redundantly and even once the channel is no longer among * the one awaiting resolution. The implementation must be careful to * ignore such calls. */ protected abstract void resolPendingImpl();}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -