⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 blockingmessenger.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        performDeferredAction(action);    }    /**     * {@inheritDoc}     */    public void sendMessageB(Message msg, String service, String serviceParam) throws IOException {        DeferredAction action;        synchronized (stateMachine) {            try {                while ((currentMessage != null) && !inputClosed) {                    stateMachine.wait();                }            } catch (InterruptedException ie) {                throw new InterruptedIOException();            }            if (inputClosed) {                throw new IOException("Messenger is closed. It cannot be used to send messages");            }            // We store the four elements of a pending msg separately. We do not want to pour millions of tmp objects on the GC for            // nothing.            storeCurrent(msg, service, serviceParam);            stateMachine.saturatedEvent();            action = eventCalled();        }        notifyChange(); // We called an event. State may have changed.        performDeferredAction(action); // We called an event. There may be an action. (start, normally).        // After deferred action, the message was either sent or failed. (done by this thread).        // We can tell because, if failed, the currentMessage is still our msg.        Throwable failure = null;        synchronized (stateMachine) {            if (currentMessage == msg) {                failure = currentThrowable;                if (failure == null) {                    failure = new IOException("Unknown error");                }                // Ok, let it go, now.                storeCurrent(null, null, null);            } // Else, don't touch currentMsg; it's not our msg.        }        if (failure == null) {            // No failure. Report ultimate succes.            msg.setMessageProperty(Messenger.class, OutgoingMessageEvent.SUCCESS);            return;        }        // Failure. See how we can manage to throw it.        if (failure instanceof IOException) {            throw (IOException) failure;        }        if (failure instanceof RuntimeException) {            throw (RuntimeException) failure;        }        if (failure instanceof Error) {            throw (Error) failure;        }        IOException failed = new IOException("Failure sending message");        failed.initCause(failure);        throw failed;    }    /**     * {@inheritDoc}     */    public final boolean sendMessageN(Message msg, String service, String serviceParam) {        boolean queued = false;        DeferredAction action = DeferredAction.ACTION_NONE;        boolean closed;        synchronized (stateMachine) {            closed = inputClosed;            if ((!closed) && (currentMessage == null)) {                // We copy the four elements of a pending msg right here. We do not want to pour millions of tmp objects on the GC.                storeCurrent(msg, service, serviceParam);                stateMachine.saturatedEvent();                action = eventCalled();                queued = true;            }        }        if (queued) {            notifyChange(); // We called an event. State may have changed.            performDeferredAction(action); // We called an event. There may be an action. (start, normally).            // After deferred action, the message was either sent or failed. (done by this thread).            // We can tell because, if failed, the currentMessage is still our msg.            synchronized (stateMachine) {                if (currentMessage == msg) {                    if (currentThrowable == null) {                        currentThrowable = new IOException("Unknown error");                    }                    msg.setMessageProperty(Message.class, currentThrowable);                    // Ok, let it go, now.                    storeCurrent(null, null, null);                } else {                    msg.setMessageProperty(Message.class, OutgoingMessageEvent.SUCCESS);                    // Don't touch the current msg; it's not our msg.                }            }            // Yes, we return true in either case. sendMessageN is supposed to be async. If a message fails            // after it was successfuly queued, the error is not reported by the return value, but only by            // the message property (and select). Just making sure the behaviour is as normal as can be            // even it means suppressing some information.            return true;        }        // Not queued. Either closed, or currently sending. If inputClosed, that's what we report.        msg.setMessageProperty(Messenger.class,                closed ?                        new OutgoingMessageEvent(msg, new IOException("This messenger is closed. " + "It cannot be used to send messages.")) :                        OutgoingMessageEvent.OVERFLOW);        return false;    }    /**     * {@inheritDoc}     */    public final void resolve() {// We're born resolved. Don't bother calling the event.    }    /**     * {@inheritDoc}     */    public final int getState() {        return stateMachine.getState();    }    /**     * {@inheritDoc}     */    public final Messenger getChannelMessenger(PeerGroupID redirection, String service, String serviceParam) {        // Our transport is always in the same group. If the channel's target group is the same, no group        // redirection is ever needed.        return new BlockingMessengerChannel(getDestinationAddress(),                homeGroupID.equals(redirection) ? null : redirection, service,                serviceParam);    }    /**     * Three exposed methods may need to inject new events in the system: sendMessageN, close, and shutdown. Since they can both     * cause actions, and since connectAction and startAction are deferred, it seems possible that one of the     * actions caused by send, close, or shutdown be called while connectAction or startAction are in progress.     *     * However, the state machine gives us a few guarantees: connectAction and startAction can never nest. We will not be     * asked to perform one while still performing the other. Only the synchronous actions closeInput, closeOutput, or     * failAll can possibly be requested in the interval. We could make more assumptions and simplify the code, but rather     * keep at least some flexibility.     */    private void performDeferredAction(DeferredAction action) {        switch (action) {            case ACTION_SEND:                sendIt();                break;            case ACTION_CONNECT:                cantConnect();                break;        }    }    /**     * A shortHand for a frequently used sequence. MUST be called while synchronized on stateMachine.     *     * @return the deferred action.     */    private DeferredAction eventCalled() {        DeferredAction action = deferredAction;        deferredAction = DeferredAction.ACTION_NONE;        stateMachine.notifyAll();        return action;    }    /**     * Performs the ACTION_SEND deferred action: sends the one msg in our one msg queue.     * This method *never* sets the outcome message property. This is left to sendMessageN and sendMessageB, because     * sendMessageB does not want to set it in any other case than success, while sendMessageN does it in all cases.     * The problem with that is: how do we communicate the outcome to sendMessage{NB} without having to keep     * the 1 msg queue locked until then (which would be in contradiction with how we interact with the state machine).     * To make it really inexpensive, here's the trick: when a message fails currentMessage and currentFailure remain.     * So the sendMessageRoutine can check them and known that it is its message and not another one that caused the     * failure. If all is well, currentMessage and currentFailure are nulled and if another message is send immediately     * sendMessage is able to see that its own message was processed fully. (this is a small cheat regarding the     * state of saturation after failall, but that's not actually detectable from the outside: input is closed     * before failall anyway. See failall for that part.     */    private void sendIt() {        if (currentMessage == null) {            if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                LOG.severe("Internal error. Asked to send with no message.");            }            return;        }        DeferredAction action;        try {            sendMessageBImpl(currentMessage, currentService, currentParam);        } catch (Throwable any) {            // Did not work. We report the link down and let the state machine tell us when to fail the msg.  It is assumed that            // when this happens, the cnx is already down.  FIXME - jice@jxta.org 20040413: check with the various kind of funky            // exception. Some may not mean the link is down            synchronized (stateMachine) {                currentThrowable = any;                stateMachine.downEvent();                action = eventCalled();            }            notifyChange();            performDeferredAction(action); // we expect connect but let the state machine decide.            return;        }        // Worked.        synchronized (stateMachine) {            storeCurrent(null, null, null);            stateMachine.idleEvent();            action = eventCalled();        }        // We did go from non-idle to idle. Report it.        notifyChange();        performDeferredAction(action); // should be none but let the state machine decide.    }    /**     * Performs the ACTION_CONNECT deferred action: generate a downEvent since we cannot reconnect.     */    private void cantConnect() {        DeferredAction action;        synchronized (stateMachine) {            stateMachine.downEvent();            action = eventCalled();        }        notifyChange();        performDeferredAction(action); // should be none but let the state machine decide.    }    /*     * Abstract methods to be provided by implementer (a transport for example).     * To adapt legacy transport, keep extending BlockingMessenger and just rename your close, isIdle, sendMessage and     * getLogicalDestinationAddress methods to closeImpl, isIdleImpl, sendMessageBImpl, and getLogicalDestinationImpl, respectively.     */    /**     * Close connection. May fail current send.     */    protected abstract void closeImpl();    /**     * Send a message blocking as needed until the message is sent.     *     * @param message The message to send.     * @param service The destination service.     * @param param   The destination serivce param.     * @throws IOException Thrown for errors encountered while sending the message.     */    protected abstract void sendMessageBImpl(Message message, String service, String param) throws IOException;    /**     * return true if this messenger has not been used for a long time. The definition of long time is: "sufficient such that closing it     * is worth the cost of having to re-open". A messenger should self close if it thinks it meets the definition of     * idle. BlockingMessenger leaves the evaluation to the transport but does the work automatically. <b>Important:</b> if     * self destruction is used, this method must work; not just return false. See the constructor. In general, if closeImpl does     * not need to do anything, then self destruction is not needed.     *     * @return {@code true} if theis messenger is, by it's own definition, idle.     */    protected abstract boolean isIdleImpl();    /**     * Obtain the logical destination address from the implementer (a transport for example).     */    protected abstract EndpointAddress getLogicalDestinationImpl();}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -