⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 asyncchannelmessenger.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                // Was already saturated.                queued = false;            }        }        if (queued && change) {            // If not queued, there was no change of condition as far as            // outsiders are concerned. (redundant saturatedEvent, only            // defensive; to guarantee statemachine in sync). else, if the            // saturation state did not change, we have no state change to            // notify.            notifyChange();        }        performDeferredAction(action);        // Before we return, make sure that this channel remains referenced if        // it has messages. It could become unreferenced if it is not yet        // resolved and the application lets go of it after sending messages.        // This means that we may need to do something only in the resolpending        // and resolsaturated cases. The way we do this test, there can be false        // positives. They're dealt with as part of the action that is carried        // out.        if ((stateMachine.getState() & (Messenger.RESOLPENDING | Messenger.RESOLSATURATED)) != 0) {            resolPendingImpl();        }        return queued;    }    /**     * {@inheritDoc}     */    public final boolean sendMessageN(Message msg, String rService, String rServiceParam) {        try {            if (sendMessageCommon(msg, rService, rServiceParam)) {                // If it worked the message is queued; the outcome will be notified later.                return true;            }            // Non-blocking and queue full: report overflow.            msg.setMessageProperty(Messenger.class, OutgoingMessageEvent.OVERFLOW);        } catch (IOException oie) {            msg.setMessageProperty(Messenger.class, new OutgoingMessageEvent(msg, oie));        } catch (InterruptedException interrupted) {            msg.setMessageProperty(Messenger.class, new OutgoingMessageEvent(msg, interrupted));        }        return false;    }    /**     * {@inheritDoc}     */    public final void sendMessageB(Message msg, String rService, String rServiceParam) throws IOException {        try {            while (true) {                // if sendMessageCommon says "true" it worked.                if (sendMessageCommon(msg, rService, rServiceParam)) {                    return;                }                // Do a shallow check on the queue. If it seems empty (without getting into a critical section to                // verify it), then yielding is good bet. It is a lot cheaper and smoother than waiting.                // Note the message should be enqueued now. yielding makes sense now if the queue is empty                if (queue.isEmpty()) {                    Thread.yield();                }                // If we reached this far, it is neither closed, nor ok. So it was saturated.                synchronized (stateMachine) {                    // Cheaper than waitState. sendMessageCommon already does the relevant state checks.                    stateMachine.wait();                }            }        } catch (InterruptedException ie) {            InterruptedIOException iie = new InterruptedIOException("Message send interrupted");            iie.initCause(ie);            throw iie;        }    }    /**     * {@inheritDoc}     */    public final void resolve() {        DeferredAction action;        synchronized (stateMachine) {            stateMachine.resolveEvent();            action = eventCalled(true);        }        notifyChange();        performDeferredAction(action); // we expect connect but let the state machine decide.    }    /**     * {@inheritDoc}     */    public final int getState() {        return stateMachine.getState();    }    /**     * {@inheritDoc}     */    @Override    public final Messenger getChannelMessenger(PeerGroupID redirection, String service, String serviceParam) {        // Channels don't make channels.        return null;    }    /**     * Three exposed methods may need to inject new events in the system:     * sendMessageN, close, and shutdown.     * Since they can all cause actions, and since connectAction and     * startAction are deferred, it seems possible that one of the actions     * caused by send, close, or shutdown be called while connectAction or     * startAction are in progress.     * <p/>However, the state machine gives us a few guarantees: connectAction     * and startAction can never nest. We will not be asked to perform one while     * still performing the other. Only the synchronous actions closeInput,     * closeOutput, or failAll can possibly be requested in the interval. We     * could make more assumptions and simplify the code, but rather keep at     * least some flexibility.     * <p/>     * DEAD LOCK WARNING: the implementor's method invoke some of our call backs     * while synchronized. Then our call backs synchronize on the state machine     * in here. This nesting order must always be respected. As a result, we can     * never invoke implementors methods while synchronized. Hence the     * deferredAction processing.     *     * @param action the action     */    private void performDeferredAction(DeferredAction action) {        switch (action) {        case ACTION_SEND:            startImpl();            break;        case ACTION_CONNECT:            connectImpl();            break;        }    }    /**     * A shortHand for a frequently used sequence. MUST be called while     * synchronized on stateMachine.     *     * @param notifyAll If {@code true} then this is a life-cycle event and all     *                  waiters on the stateMachine should be notified. If {@code false} then     *                  only a single waiter will be notified for simple activity events.     * @return the deferred action.     */    private DeferredAction eventCalled(boolean notifyAll) {        DeferredAction action = deferredAction;        deferredAction = DeferredAction.ACTION_NONE;        if (notifyAll) {            stateMachine.notifyAll();        } else {            stateMachine.notify();        }        return action;    }    /*     * Implement the methods that our shared messenger will use to report progress.     */    /**     * The implementation will invoke this method when it becomes resolved,     * after connectImpl was invoked.     */    protected void up() {        DeferredAction action;        synchronized (stateMachine) {            stateMachine.upEvent();            action = eventCalled(true);        }        notifyChange();        performDeferredAction(action); // we expect start but let the state machine decide.    }    /**     * The implementation invokes this method when it becomes broken.     */    protected void down() {        DeferredAction action;        synchronized (stateMachine) {            stateMachine.downEvent();            action = eventCalled(true);        }        notifyChange();        performDeferredAction(action); // we expect connect but let the state machine decide.    }    /**     * Here, we behave like a queue to the shared messenger. When we report     * being empty, though, we're automatically removed from the active queues     * list. We'll go back there the next time we have something to send by     * calling startImpl.     *     * @return pending message     */    protected PendingMessage peek() {        PendingMessage theMsg;        DeferredAction action = DeferredAction.ACTION_NONE;        synchronized (stateMachine) {            // We like the msg to keep occupying space in the queue until it's            // out the door. That way, idleness (that is, not currently working            // on a message), is always consistent with queue emptyness.            theMsg = queue.peek();            if (theMsg == null) {                stateMachine.idleEvent();                action = eventCalled(false);                // We do not notifyChange, here, because, if the queue is empty,                // it was already notified when the last message was popped. The                // call to idleEvent is only defensive programming to make extra                // sure the state machine is in sync.                return null;            }            if (outputClosed) {                // We've been asked to stop sending. Which, if we were sending,                // must be notified by either an idle event or a down                // event. Nothing needs to happen to the shared messenger. We're                // just a channel.                stateMachine.downEvent();                action = eventCalled(true);                theMsg = null;            }        }        notifyChange();        performDeferredAction(action); // we expect none but let the state machine decide.        return theMsg;    }    /**     * Returns the number of elements in this collection.  If this collection     * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns     * <tt>Integer.MAX_VALUE</tt>.     *     * @return the number of elements in this collection     */    protected int size() {        return queue.size();    }        /**     * One message done. Update the saturated/etc state accordingly.     *     * @return true if there are more messages after the one we removed.     */    protected boolean poll() {        boolean result;        DeferredAction action;        synchronized (stateMachine) {            queue.poll();            if (queue.peek() == null) {                stateMachine.idleEvent();                action = eventCalled(false);                result = false;            } else {                stateMachine.msgsEvent();                action = eventCalled(false);                result = true;            }        }        notifyChange();        performDeferredAction(action); // we expect none but let the state machine decide.        return result;    }    /**     * We invoke this method to be placed on the list of channels that have     * message to send.     * <p/>     * NOTE that it is the shared messenger responsibility to synchronize so     * that we cannot be added to the active list just before we get removed     * due to reporting an empty queue in parallel. So, if we report an empty     * queue and have a new message to send before the shared messenger removes     * us form the active list, startImpl will block until the removal is done.     * Then we'll be added back.     * <p/>     * If it cannot be done, it means that the shared messenger is no longer     * usable. It may call down() in sequence. Out of defensiveness, it should     * do so without holding its lock.     */    protected abstract void startImpl();    /**     * We invoke this method to be placed on the list of channels that are     * waiting for resolution.     * <p/>     * If it cannot be done, it means that the shared messenger is no longer     * usable. It may call down() in sequence. Out of defensiveness, it should     * do so without holding its lock. If the messenger is already resolved it     * may call up() in sequence. Same wisdom applies. It is a good idea to     * create channels in the resolved state if the shared messenger is already     * resolved. That avoids this extra contortion.     */    protected abstract void connectImpl();    /**     * This is invoked to inform the implementation that this channel is now in     * the resolPending or resolSaturated state. This is specific to this type     * of channels. The shared messenger must make sure that this channel     * remains strongly referenced, even though it is not resolved, because     * there are messages in it. It is valid for an application to let go of a     * channel after sending a message, even if the channel is not yet     * resolved. The message will go if/when the channel resolves. This method     * may be invoked redundantly and even once the channel is no longer among     * the one awaiting resolution. The implementation must be careful to     * ignore such calls.     */    protected abstract void resolPendingImpl();}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -