📄 blockingmessenger.java
字号:
/* * Copyright (c) 2004-2007 Sun Microsystems, Inc. All rights reserved. * * The Sun Project JXTA(TM) Software License * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. The end-user documentation included with the redistribution, if any, must * include the following acknowledgment: "This product includes software * developed by Sun Microsystems, Inc. for JXTA(TM) technology." * Alternately, this acknowledgment may appear in the software itself, if * and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this software * without prior written permission. For written permission, please contact * Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", nor may * "JXTA" appear in their name, without prior written permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * JXTA is a registered trademark of Sun Microsystems, Inc. in the United * States and other countries. * * Please see the license information page at : * <http://www.jxta.org/project/www/license.html> for instructions on use of * the license in source files. * * ==================================================================== * * This software consists of voluntary contributions made by many individuals * on behalf of Project JXTA. For more information on Project JXTA, please see * http://www.jxta.org. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.endpoint;import java.util.Timer;import java.util.TimerTask;import java.io.IOException;import java.io.InterruptedIOException;import java.util.logging.Level;import net.jxta.logging.Logging;import java.util.logging.Logger;import net.jxta.endpoint.AbstractMessenger;import net.jxta.endpoint.ChannelMessenger;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.Message;import net.jxta.endpoint.Messenger;import net.jxta.endpoint.MessengerState;import net.jxta.endpoint.OutgoingMessageEvent;import net.jxta.peergroup.PeerGroupID;import net.jxta.util.SimpleSelectable;import net.jxta.impl.util.TimeUtils;/** * This class is a near-drop-in replacement for the previous BlockingMessenger class. * To subclassers (that is, currently, transports) the only difference is that some * overloaded methods have a different name (class hierarchy reasons made it impossible * to preserve the names without forcing an API change for applications). * * The other difference which is not API visible, is that it implements the * standard MessengerState behaviour and semantics required by the changes in the endpoint framework. * * This the only base messenger class meant to be extended by outside code that is in the impl tree. The * reason being that what it replaces was there already and that new code should not become dependant upon it. */public abstract class BlockingMessenger extends AbstractMessenger { /** * Logger */ private final static transient Logger LOG = Logger.getLogger(BlockingMessenger.class.getName()); /** * The self destruct timer. * <p/> * When a messenger has become idle, it is closed. As a side effect, it * makes the owning canonical messenger, if any, subject to removal if it is * otherwise unreferenced. */ private final static transient Timer timer = new Timer("BlockingMessenger self destruct timer", true); /* * Actions that we defer to after returning from event methods. In other * words, they cannot be done with the lock held, or they require calling * more event methods. Because this messenger can take only one message at * a time (saturated while sending), actions do not cascade much. Start can * lead to connect if the sending fails, but, because we always fail to * connect, connect will not lead to start. As a result we can get away with * performing deferred actions recursively. That simplifies the code. */ private enum DeferredAction { /** * No deferred action. */ ACTION_NONE, /** * Must send message. */ ACTION_SEND, /** * Must report failure to connect. */ ACTION_CONNECT } /** * The outstanding message. */ private Message currentMessage = null; /** * The serviceName override for that message. */ private String currentService = null; /** * The serviceParam override for that message. */ private String currentParam = null; /** * The exception that caused that message to not be sent. */ private Throwable currentThrowable = null; /** * true if we have deliberately closed our one message input queue. */ private boolean inputClosed = false; /** * Need to know which group this transport lives in, so that we can suppress * channel redirection when in the same group. This is currently the norm. */ private final PeerGroupID homeGroupID; /** * The current deferred action. */ private DeferredAction deferredAction = DeferredAction.ACTION_NONE; /** * Reference to owning object. This is there so that the owning object is not subject to garbage collection * unless this object here becomes itself unreferenced. That happens when the self destruct timer closed it. */ private Object owner = null; /** * The timer task watching over our self destruction requirement. */ private final TimerTask selfDestructTask; /** * State lock and engine. */ private final BlockingMessengerState stateMachine = new BlockingMessengerState(); /** * legacy artefact: transports need to believe the messenger is not yet closed in order to actually close it. * So we lie to them just while we run their closeImpl method so that they do not see that the messenger is * officially closed. */ private boolean lieToOldTransports = false; /** * Our statemachine implementation; just connects the standard AbstractMessengerState action methods to * this object. */ private class BlockingMessengerState extends MessengerState { protected BlockingMessengerState() { super(true); } /* * The required action methods. */ /** * {@inheritDoc} */ @Override protected void connectAction() { deferredAction = DeferredAction.ACTION_CONNECT; } /** * {@inheritDoc} */ @Override protected void startAction() { deferredAction = DeferredAction.ACTION_SEND; } /** * {@inheritDoc} */ @Override protected void closeInputAction() { // we're synchonized here. (invoked from stateMachine). inputClosed = true; } /** * {@inheritDoc} */ @Override protected void closeOutputAction() { // This will break the cnx; thereby causing a down event if we have a send in progress. // If the cnx does not break before the current message is sent, then the message will be sent successfully, // resulting in an idle event. Either of these events is enough to complete the shutdown process. lieToOldTransports = true; closeImpl(); lieToOldTransports = false; // Disconnect from the timer. if (selfDestructTask != null) { selfDestructTask.cancel(); } } // This is a synchronous action. No synchronization needed: we're already synchronized, here. // There's a subtlety here: we do not clear the current message. We let sendMessageB or sendMessageN // deal with it, so that they can handle the status reporting each in their own way. So basically, all we // do is to set a reason for that message to fail in case we are shutdown from the outside and that message // is not sent yet. As long as there is a current message, it is guaranteed that there is a thread // in charge of reporting its status. It is also guaranteed that when failAll is called, the input is // already closed, and so, we have no obligation of making room for future messages immediately. // All this aggravation is so that we do not have to create one context wrapper for each message just so // that we can associate it with its result. Instead we use our single msg and single status model // throughout. @Override protected void failAllAction() { if (currentMessage == null) { return; } if (currentThrowable == null) { currentThrowable = new IOException("Messenger unexpectedly closed"); } } } /** * The implementation of channel messenger that getChannelMessenger returns: * All it does is address rewriting. Even close() is forwarded to the shared messenger. * The reason is that BlockingMessengers are not really shared; they're transitional * entities used directly by CanonicalMessenger. GetChannel is used only to provide address * rewriting when we pass a blocking messenger directly to incoming messenger listeners...this * practice is to be removed in the future, in favor of making incoming messengers full-featured * async messengers that can be shared. */ private final class BlockingMessengerChannel extends ChannelMessenger { public BlockingMessengerChannel(EndpointAddress baseAddress, PeerGroupID redirection, String origService, String origServiceParam) { super(baseAddress, redirection, origService, origServiceParam); // We tell our super class that we synchronize on the stateMachine object. Althoug it is not obvious, our getState() // method calls the shared messenger getState() method, which synchronizes on the shared messenger's state machine // object. So, that's what we must specify. Logic would dictate that we pass it to super(), but it is not itself // constructed until super() returns. No way around it.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -