⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 asyncchannelmessenger.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                stateMachine.saturatedEvent();                action = eventCalled(false);                change = true;            } else {                // Was already saturated.                queued = false;            }        }        if (queued && change) {            // If not queued, there was no change of condition as far as            // outsiders are concerned. (redundant saturatedEvent, only            // defensive; to guarantee statemachine in sync). else, if the            // saturation state did not change, we have no state change to            // notify.            notifyChange();        }        performDeferredAction(action);        // Before we return, make sure that this channel remains referenced if        // it has messages. It could become unreferenced if it is not yet        // resolved and the application lets go of it after sending messages.        // This means that we may need to do something only in the resolpending        // and resolsaturated cases. The way we do this test, there can be false        // positives. They're dealt with as part of the action that is carried        // out.        if ((stateMachine.getState() & (Messenger.RESOLPENDING | Messenger.RESOLSATURATED)) != 0) {            resolPendingImpl();        }        return queued;    }    /**     * {@inheritDoc}     */    public final boolean sendMessageN( Message msg, String rService, String rServiceParam ) {        try {            if (sendMessageCommon(msg, rService, rServiceParam)) {                // If it worked the message is queued; the outcome will be notified later.                return true;            }            // Non-blocking and queue full: report overflow.            msg.setMessageProperty(Messenger.class, OutgoingMessageEvent.OVERFLOW);            return false;        } catch (IOException oie) {            msg.setMessageProperty(Messenger.class, new OutgoingMessageEvent(msg, oie));            return false;        }    }    /**     * {@inheritDoc}     */    public final void sendMessageB( Message msg, String rService, String rServiceParam ) throws IOException {        try {            while (true) {                // Do a shallow check on the queue. If it seems empty (without                // getting into a critical section to verify it), then yielding                // is good bet. It is a lot cheaper and smoother than waiting.                if (queue.getCurrentInQueue() == queue.getMaxQueueSize()) {                    Thread.yield();                }                // if sendMessageCommon says "true" it worked.                if (sendMessageCommon(msg, rService, rServiceParam)) {                    return;                }                // If we reached this far, it is neither closed, nor ok. So it was saturated.                synchronized(stateMachine) {                    // Cheaper than waitState. sendMessageCommon already does                    // the relevant state checks.                    stateMachine.wait();                }            }        } catch(InterruptedException ie) {            InterruptedIOException iie = new InterruptedIOException("Message send interrupted");            iie.initCause( ie );            throw iie;        }    }    /**     * {@inheritDoc}     */    public final void resolve() {        int action = ACTION_NONE;        synchronized(stateMachine) {            stateMachine.resolveEvent();            action = eventCalled(true);        }        notifyChange();        performDeferredAction(action); // we expect connect but let the state machine decide.    }    /**     * {@inheritDoc}     */    public final int getState() {        return stateMachine.getState();    }    /**     * {@inheritDoc}     */    public final Messenger getChannelMessenger( PeerGroupID redirection, String service, String serviceParam ) {        // Channels don't make channels.        return null;    }    /**     * Three exposed methods may need to inject new events in the system:      * sendMessageN, close, and shutdown.      * Since they can all cause actions, and since connectAction and      * startAction are deferred, it seems possible that one of the actions      * caused by send, close, or shutdown be called while connectAction or      * startAction are in progress.     *     * <p/>However, the state machine gives us a few guarantees: connectAction      * and startAction can never nest. We will not be asked to perform one while     * still performing the other. Only the synchronous actions closeInput,      * closeOutput, or failAll can possibly be requested in the interval. We      * could make more assumptions and simplify the code, but rather keep at      * least some flexibility.     *     * DEAD LOCK WARNING: the implementor's method invoke some of our call backs     * while synchronized. Then our call backs synchronize on the state machine      * in here. This nesting order must always be respected. As a result, we can      * never invoke implementors methods while synchronized. Hence the      * deferredAction processing.     */    private void performDeferredAction(int action) {        switch(action) {        case ACTION_SEND:            startImpl();            break;        case ACTION_CONNECT:            connectImpl();            break;        }    }    /**     * A shortHand for a frequently used sequence. MUST be called while     * synchronized on stateMachine.     *     * @param notifyAll If {@code true} then this is a life-cycle event and all     * waiters on the stateMachine should be notified. If {@code false} then     * only a single waiter will be notified for simple activity events.     * @return the deferred action.     */    private int eventCalled(boolean notifyAll) {        int action = deferredAction;        deferredAction = ACTION_NONE;        if (notifyAll) {            stateMachine.notifyAll();        } else {            stateMachine.notify();        }        return action;    }    /*     * Implement the methods that our shared messenger will use to report progress.     */    /**     * The implementation will invoke this method when it becomes resolved,      * after connectImpl was invoked.     */    protected void up() {        int action = ACTION_NONE;        synchronized(stateMachine) {            stateMachine.upEvent();            action = eventCalled(true);        }        notifyChange();        performDeferredAction(action); // we expect start but let the state machine decide.    }    /**     * The implementation invokes this method when it becomes broken.     */    protected void down() {        int action = ACTION_NONE;        synchronized(stateMachine) {            stateMachine.downEvent();            action = eventCalled(true);        }        notifyChange();        performDeferredAction(action); // we expect connect but let the state machine decide.    }    /**     *  Here, we behave like a queue to the shared messenger. When we report      *  being empty, though, we're automatically removed from the active queues      *  list. We'll go back there the next time we have something to send by      *  calling startImpl.     */    protected PendingMessage peek() {        PendingMessage theMsg = null;        int action = ACTION_NONE;        synchronized(stateMachine) {            // We like the msg to keep occupying space in the queue until it's            // out the door. That way, idleness (that is, not currently working            // on a message), is always consistent with queue emptyness.            theMsg = (PendingMessage) queue.peek();            if (theMsg == null) {                stateMachine.idleEvent();                action = eventCalled(false);                // We do not notifyChange, here, because, if the queue is empty,                // it was already notified when the last message was popped. The                // call to idleEvent is only defensive programming to make extra                // sure the state machine is in sync.                return null;            }            if (outputClosed) {                // We've been asked to stop sending. Which, if we were sending,                //must be notified by either an idle event or a down                // event. Nothing needs to happen to the shared messenger. We're                // just a channel.                stateMachine.downEvent();                action = eventCalled(true);                theMsg = null;            }        }        notifyChange();        performDeferredAction(action); // we expect none but let the state machine decide.        return theMsg;    }    /**     * One message done. Update the saturated/etc state accordingly.     *     * @return true if there are more messages after the one we popped.     */    protected boolean pop() {        boolean result;        int action = ACTION_NONE;        synchronized(stateMachine) {            queue.pop();            if (queue.peek() == null) {                stateMachine.idleEvent();                action = eventCalled(false);                result = false;            } else {                stateMachine.msgsEvent();                action = eventCalled(false);                result = true;            }        }        notifyChange();        performDeferredAction(action); // we expect none but let the state machine decide.        return result;    }    /**     *  We invoke this method to be placed on the list of channels that have      *  message to send.     *     *  NOTE that it is the shared messenger responsibility to synchronize so      *  that we cannot be added to the active list just before we get removed      *  due to reporting an empty queue in parallel. So, if we report an empty      *  queue and have a new message to send before the shared messenger removes     *  us form the active list, startImpl will block until the removal is done.     *  Then we'll be added back.     *     *  If it cannot be done, it means that the shared messenger is no longer      *  usable. It may call down() in sequence. Out of defensiveness, it should     *  do so without holding its lock.     */    protected abstract void startImpl();    /**     *  We invoke this method to be placed on the list of channels that are      *  waiting for resolution.     *     *  If it cannot be done, it means that the shared messenger is no longer      *  usable. It may call down() in sequence. Out of defensiveness, it should      *  do so without holding its lock. If the messenger is already resolved it      *  may call up() in sequence. Same wisdom applies. It is a good idea to      *  create channels in the resolved state if the shared messenger is already     *  resolved. That avoids this extra contortion.     */    protected abstract void connectImpl();    /**     *  This is invoked to inform the implementation that this channel is now in     *  the resolPending or resolSaturated state. This is specific to this type      *  of channels. The shared messenger must make sure that this channel      *  remains strongly referenced, even though it is not resolved, because      *  there are messages in it. It is valid for an application to let go of a      *  channel after sending a message, even if the channel is not yet      *  resolved. The message will go if/when the channel resolves. This method      *  may be invoked redundantly and even once the channel is no longer among      *  the one awaiting resolution. The implementation must be careful to      *  ignore such calls.     */    protected abstract void resolPendingImpl();}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -