📄 asyncchannelmessenger.java
字号:
// Was already saturated. queued = false; } } if (queued && change) { // If not queued, there was no change of condition as far as // outsiders are concerned. (redundant saturatedEvent, only // defensive; to guarantee statemachine in sync). else, if the // saturation state did not change, we have no state change to // notify. notifyChange(); } performDeferredAction(action); // Before we return, make sure that this channel remains referenced if // it has messages. It could become unreferenced if it is not yet // resolved and the application lets go of it after sending messages. // This means that we may need to do something only in the resolpending // and resolsaturated cases. The way we do this test, there can be false // positives. They're dealt with as part of the action that is carried // out. if ((stateMachine.getState() & (Messenger.RESOLPENDING | Messenger.RESOLSATURATED)) != 0) { resolPendingImpl(); } return queued; } /** * {@inheritDoc} */ public final boolean sendMessageN(Message msg, String rService, String rServiceParam) { try { if (sendMessageCommon(msg, rService, rServiceParam)) { // If it worked the message is queued; the outcome will be notified later. return true; } // Non-blocking and queue full: report overflow. msg.setMessageProperty(Messenger.class, OutgoingMessageEvent.OVERFLOW); } catch (IOException oie) { msg.setMessageProperty(Messenger.class, new OutgoingMessageEvent(msg, oie)); } catch (InterruptedException interrupted) { msg.setMessageProperty(Messenger.class, new OutgoingMessageEvent(msg, interrupted)); } return false; } /** * {@inheritDoc} */ public final void sendMessageB(Message msg, String rService, String rServiceParam) throws IOException { try { while (true) { // if sendMessageCommon says "true" it worked. if (sendMessageCommon(msg, rService, rServiceParam)) { return; } // Do a shallow check on the queue. If it seems empty (without getting into a critical section to // verify it), then yielding is good bet. It is a lot cheaper and smoother than waiting. // Note the message should be enqueued now. yielding makes sense now if the queue is empty if (queue.isEmpty()) { Thread.yield(); } // If we reached this far, it is neither closed, nor ok. So it was saturated. synchronized (stateMachine) { // Cheaper than waitState. sendMessageCommon already does the relevant state checks. stateMachine.wait(); } } } catch (InterruptedException ie) { InterruptedIOException iie = new InterruptedIOException("Message send interrupted"); iie.initCause(ie); throw iie; } } /** * {@inheritDoc} */ public final void resolve() { DeferredAction action; synchronized (stateMachine) { stateMachine.resolveEvent(); action = eventCalled(true); } notifyChange(); performDeferredAction(action); // we expect connect but let the state machine decide. } /** * {@inheritDoc} */ public final int getState() { return stateMachine.getState(); } /** * {@inheritDoc} */ @Override public final Messenger getChannelMessenger(PeerGroupID redirection, String service, String serviceParam) { // Channels don't make channels. return null; } /** * Three exposed methods may need to inject new events in the system: * sendMessageN, close, and shutdown. * Since they can all cause actions, and since connectAction and * startAction are deferred, it seems possible that one of the actions * caused by send, close, or shutdown be called while connectAction or * startAction are in progress. * <p/>However, the state machine gives us a few guarantees: connectAction * and startAction can never nest. We will not be asked to perform one while * still performing the other. Only the synchronous actions closeInput, * closeOutput, or failAll can possibly be requested in the interval. We * could make more assumptions and simplify the code, but rather keep at * least some flexibility. * <p/> * DEAD LOCK WARNING: the implementor's method invoke some of our call backs * while synchronized. Then our call backs synchronize on the state machine * in here. This nesting order must always be respected. As a result, we can * never invoke implementors methods while synchronized. Hence the * deferredAction processing. * * @param action the action */ private void performDeferredAction(DeferredAction action) { switch (action) { case ACTION_SEND: startImpl(); break; case ACTION_CONNECT: connectImpl(); break; } } /** * A shortHand for a frequently used sequence. MUST be called while * synchronized on stateMachine. * * @param notifyAll If {@code true} then this is a life-cycle event and all * waiters on the stateMachine should be notified. If {@code false} then * only a single waiter will be notified for simple activity events. * @return the deferred action. */ private DeferredAction eventCalled(boolean notifyAll) { DeferredAction action = deferredAction; deferredAction = DeferredAction.ACTION_NONE; if (notifyAll) { stateMachine.notifyAll(); } else { stateMachine.notify(); } return action; } /* * Implement the methods that our shared messenger will use to report progress. */ /** * The implementation will invoke this method when it becomes resolved, * after connectImpl was invoked. */ protected void up() { DeferredAction action; synchronized (stateMachine) { stateMachine.upEvent(); action = eventCalled(true); } notifyChange(); performDeferredAction(action); // we expect start but let the state machine decide. } /** * The implementation invokes this method when it becomes broken. */ protected void down() { DeferredAction action; synchronized (stateMachine) { stateMachine.downEvent(); action = eventCalled(true); } notifyChange(); performDeferredAction(action); // we expect connect but let the state machine decide. } /** * Here, we behave like a queue to the shared messenger. When we report * being empty, though, we're automatically removed from the active queues * list. We'll go back there the next time we have something to send by * calling startImpl. * * @return pending message */ protected PendingMessage peek() { PendingMessage theMsg; DeferredAction action = DeferredAction.ACTION_NONE; synchronized (stateMachine) { // We like the msg to keep occupying space in the queue until it's // out the door. That way, idleness (that is, not currently working // on a message), is always consistent with queue emptyness. theMsg = queue.peek(); if (theMsg == null) { stateMachine.idleEvent(); action = eventCalled(false); // We do not notifyChange, here, because, if the queue is empty, // it was already notified when the last message was popped. The // call to idleEvent is only defensive programming to make extra // sure the state machine is in sync. return null; } if (outputClosed) { // We've been asked to stop sending. Which, if we were sending, // must be notified by either an idle event or a down // event. Nothing needs to happen to the shared messenger. We're // just a channel. stateMachine.downEvent(); action = eventCalled(true); theMsg = null; } } notifyChange(); performDeferredAction(action); // we expect none but let the state machine decide. return theMsg; } /** * Returns the number of elements in this collection. If this collection * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns * <tt>Integer.MAX_VALUE</tt>. * * @return the number of elements in this collection */ protected int size() { return queue.size(); } /** * One message done. Update the saturated/etc state accordingly. * * @return true if there are more messages after the one we removed. */ protected boolean poll() { boolean result; DeferredAction action; synchronized (stateMachine) { queue.poll(); if (queue.peek() == null) { stateMachine.idleEvent(); action = eventCalled(false); result = false; } else { stateMachine.msgsEvent(); action = eventCalled(false); result = true; } } notifyChange(); performDeferredAction(action); // we expect none but let the state machine decide. return result; } /** * We invoke this method to be placed on the list of channels that have * message to send. * <p/> * NOTE that it is the shared messenger responsibility to synchronize so * that we cannot be added to the active list just before we get removed * due to reporting an empty queue in parallel. So, if we report an empty * queue and have a new message to send before the shared messenger removes * us form the active list, startImpl will block until the removal is done. * Then we'll be added back. * <p/> * If it cannot be done, it means that the shared messenger is no longer * usable. It may call down() in sequence. Out of defensiveness, it should * do so without holding its lock. */ protected abstract void startImpl(); /** * We invoke this method to be placed on the list of channels that are * waiting for resolution. * <p/> * If it cannot be done, it means that the shared messenger is no longer * usable. It may call down() in sequence. Out of defensiveness, it should * do so without holding its lock. If the messenger is already resolved it * may call up() in sequence. Same wisdom applies. It is a good idea to * create channels in the resolved state if the shared messenger is already * resolved. That avoids this extra contortion. */ protected abstract void connectImpl(); /** * This is invoked to inform the implementation that this channel is now in * the resolPending or resolSaturated state. This is specific to this type * of channels. The shared messenger must make sure that this channel * remains strongly referenced, even though it is not resolved, because * there are messages in it. It is valid for an application to let go of a * channel after sending a message, even if the channel is not yet * resolved. The message will go if/when the channel resolves. This method * may be invoked redundantly and even once the channel is no longer among * the one awaiting resolution. The implementation must be careful to * ignore such calls. */ protected abstract void resolPendingImpl();}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -