📄 threadedmessenger.java
字号:
/* * * $Id: ThreadedMessenger.java,v 1.10 2006/08/02 19:06:16 bondolo Exp $ * * Copyright (c) 2004 Sun Microsystems, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" * must not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.endpoint;import java.util.WeakHashMap;import java.io.IOException;import org.apache.log4j.Level;import org.apache.log4j.Logger;import net.jxta.peergroup.PeerGroupID;import net.jxta.impl.util.UnbiasedQueue;import net.jxta.impl.util.TimeUtils;/** * This is a messenger meant to be shared by multiple channels and automatically * distribute the available bandwith among the channels. This one is implemented * with a dedicated background thread. */public abstract class ThreadedMessenger extends AbstractMessenger implements Runnable { /** * Log4J Category */ private final static transient Logger LOG = Logger.getLogger(ThreadedMessenger.class.getName()); /** * Our thread group. */ private final static transient ThreadGroup myThreadGroup = new ThreadGroup( "Threaded Messengers" ); /** * The logical destination address of the other party (if we know it). */ private volatile EndpointAddress logicalDestination = null; /** * true if we have deliberately closed our input queue. */ private volatile boolean inputClosed = false; /** * Need to know which group the transports we use live in, so that we can suppress channel redirection when in the same group. * This is currently the norm. */ private PeerGroupID homeGroupID = null; /** * How long the BG thread can remain unemployed before bailing. */ private static final long THREAD_IDLE_DEAD = 4 * TimeUtils.ASECOND; /* * Actions that we defer to after returning from event methods. In other words, * they cannot be done with the lock held, or they require calling more event methods. * Because this messenger can take only one message at a time, actions do not cascade * much. It may happen that the invoking thread is required to perform closure after * performing send. That's about it. * In addition, there's always only one deferred action per event. The only actions that cluster are * closeInput and closeOutput. We do not defer those. */ /** * No action deferred. */ private static final int ACTION_NONE = 0; /** * Must send the current message. */ private static final int ACTION_SEND = 1; /** * Must report failure to connect. */ private static final int ACTION_CONNECT = 2; /** * The current deferred action. */ private int deferredAction = ACTION_NONE; /** * The current background thread. */ private Thread bgThread = null; /** * The size that channel queues should have. */ private final int channelQueueSize; /** * The active channels queue. */ private final UnbiasedQueue activeChannels; /** * The resolving channels set. This is unordered. We use a weak hash map because abandoned channels could otherwise * accumulate in-there until the resolution attempt completes. A buggy application could easily do much damage. * * Note: a channel with at least one message in it is not considered abandoned. To prevent it from disappearing we set a * strong reference as the value in the map. A buggy application can do much damage, still, by queuing a single message * and then abandoning the channel. This is has to be dealt with at another level; limiting the number of channels * per application, or having a global limit on messages...TBD. */ private WeakHashMap resolvingChannels = null; /** * A default channel were we put messages that are send directly through this messenger rather than via one of * its channels. */ private ThreadedMessengerChannel defaultChannel = null; /** * State lock and engine. */ private final ThreadedMessengerState stateMachine = new ThreadedMessengerState(); /** * The implementation of channel messenger that getChannelMessenger returns: */ private class ThreadedMessengerChannel extends AsyncChannelMessenger { public ThreadedMessengerChannel( EndpointAddress baseAddress, PeerGroupID redirection, String origService, String origServiceParam, int queueSize, boolean connected ) { super(baseAddress, redirection, origService, origServiceParam, queueSize, connected); } /** * {@inheritDoc} * * <p/>We're supposed to return the complete destination, including * service and param specific to that channel. It is not clear, whether * this should include the cross-group mangling, though. Historically, * it does not. */ public EndpointAddress getLogicalDestinationAddress() { if (logicalDestination == null) { return null; } return (EndpointAddress) logicalDestination.clone(); } /** * {@inheritDoc} */ protected void startImpl() { if (! addToActiveChannels(this)) { // We do not need to hold our lock to call this, and it is just as well since it could re-enter. down(); } } /** * {@inheritDoc} */ protected void connectImpl() { // If it cannot be done, it is because we known that we will never be able to generate the resulting event. That means // that either the shared messenger is already resolved, or that it is already dead. In that case, we invoke down/up // in sequence accordingly. // // NOTE: the shared messenger may become dead 1 ns from now...that or 1 hour makes no difference, the channel will // notice when it first tries to send, in that case. The otherway around is more obvious: If the shared messenger is // not USABLE, it cannot come back. // // addToResolvingChannels() garantees us that if it returns true, either of the channel's down or up methods will be // invoked at some point. if (! addToResolvingChannels(this)) { if ((ThreadedMessenger.this.getState() & USABLE) != 0) { up(); } else { down(); } } } /** * {@inheritDoc} */ protected void resolPendingImpl() { // If this channel is still among the ones pending resolution, make sure // it becomes strongly referenced. strongRefResolvingChannel(this); } } /** * Our statemachine implementation; just connects the standard AbstractMessengerState action methods to * this object. */ private class ThreadedMessengerState extends MessengerState { protected ThreadedMessengerState() { super(false); } /* * The required action methods. */ /** * {@inheritDoc} */ protected void connectAction() { deferAction(ACTION_CONNECT); } /** * {@inheritDoc} */ protected void startAction() { deferAction(ACTION_SEND); } /** * {@inheritDoc} * * <p/>This is a synchronous action. The state machine assumes that it * is done when we return. There is No need (nor means) to signal * completion. No need for synchronization either: we're already * synchronized. */ protected void closeInputAction() { inputClosed = true; ThreadedMessengerChannel[] channels = (ThreadedMessengerChannel[]) resolvingChannels.keySet().toArray(new ThreadedMessengerChannel[0]); resolvingChannels.clear(); int i = channels.length; while (i-->0) { channels[i].down(); } channels = null; } /** * {@inheritDoc} */ protected void closeOutputAction() { // This will break the cnx; thereby causing a down event if we have a send in progress. // If the cnx does not break before the current message is sent, then the message will be sent successfully, // resulting in an idle event. Either of these events is enough to complete the shutdown process. closeImpl(); } /** * {@inheritDoc} * * <p/>The input is now closed, so we can rest assured that the last channel is really the last one. * This is a synchronous action. The state machine assumes that it is done when we return. There is * no need to signal completion with an idleEvent. * No need for synchronization either: we're already synchronized. */ protected void failAllAction() { while (true) { ThreadedMessengerChannel theChannel = null; theChannel = (ThreadedMessengerChannel) activeChannels.pop(); if (theChannel == null) { break; } theChannel.down(); } } } /** * Create a new ThreadedMessenger. * * @param homeGroupID the group that this messenger works for. This is the group of the endpoint service or transport * that created this messenger. * @param destination where messages should be addressed to * @param logicalDestination the expected logical address of the destination. Pass null if unknown/irrelevant * @param channelQueueSize the queue size that channels should have. */ public ThreadedMessenger( PeerGroupID homeGroupID, EndpointAddress destination, EndpointAddress logicalDestination, int channelQueueSize ) { super(destination); this.homeGroupID = homeGroupID; // We tell our super class that we synchronize our state on the stateMachine object. Logic would dictate // that we pass it to super(), but it is not itself constructed until super() returns. No way around it. setStateLock(stateMachine); this.logicalDestination = logicalDestination; this.channelQueueSize = channelQueueSize; activeChannels = new UnbiasedQueue(Integer.MAX_VALUE, false); resolvingChannels = new WeakHashMap(4); } /** * Runs the state machine until there's nothing left to do. * * <p/>Three exposed methods may need to inject new events in the system: sendMessageN, close, and shutdown. Since they can both * cause actions, and since connectAction and startAction are deferred, it seems possible that one of the * actions caused by send, close, or shutdown be called while connectAction or startAction are in progress. * * <p/>However, the state machine gives us a few guarantees: All the actions except closeInput and closeOutput have an *end* * event. No state transition that results in an action other than closeInput or closeOutput, may occur until the end event * for an on-going action has been called. * * <p/>We perform closeInput and closeOutput on the fly, so none of the exposed methods are capable of producing deferred actions * while an action is already deferred. So, there is at most one deferred action after returning from an event method, * regardless the number of concurrent threads invoking the exposed methods, and it can only happen once per deferred action * performed. */ public void run() { try { while (true) { switch(nextAction()) { case ACTION_NONE: return; case ACTION_SEND: send(); break; case ACTION_CONNECT: connect(); break; default: // huh ? }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -