📄 nonblockingoutputpipe.java
字号:
/* * $Id: NonBlockingOutputPipe.java,v 1.55 2004/05/27 03:42:58 bondolo Exp $ * * Copyright (c) 2001 Sun Microsystems, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" * must not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.pipe;import java.util.Collections;import java.util.HashSet;import java.util.Set;import java.io.IOException;import org.apache.log4j.Level;import org.apache.log4j.Logger;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.EndpointService;import net.jxta.endpoint.Message;import net.jxta.endpoint.Messenger;import net.jxta.id.ID;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.PipeID;import net.jxta.protocol.PipeAdvertisement;import net.jxta.impl.util.TimeUtils;import net.jxta.impl.util.UnbiasedQueue;/** * An implementation of Ouput Pipe which sends messages on the pipe * asynchronously. The <code>send()</code> method for this implementation will * never block. **/class NonBlockingOutputPipe implements PipeResolver.Listener, OutputPipe, Runnable { /** * Log4J Logger **/ private static final Logger LOG = Logger.getLogger(NonBlockingOutputPipe.class.getName()); /** * Amount of time an idle worker thread will linger **/ private static final long IDLEWORKERLINGER = 10 * TimeUtils.ASECOND; /** * Minimum Query interval. Querys will not be sent more frequently than * this interval. **/ private static final long QUERYINTERVALMIN = 15 * TimeUtils.ASECOND; /** * Query timeout minimum. Waits for query response will not be shorter than * this interval. **/ private static final long QUERYTIMEOUTMIN = 1 * TimeUtils.AMINUTE; /** * If true then the pipe has been closed and will no longer accept messages. **/ private volatile boolean closed = false; /** * If true then this pipe has just migrated. Used to prevent re-entering * migration from an unfinished migration. **/ private boolean migrated = false; /** * Group in which we are working. **/ private PeerGroup myGroup = null; /** * The endpoint of our group. **/ private EndpointService endpoint = null; /** * The pipe resolver we will use for migrate and verify. **/ private PipeResolver myPipeResolver = null; /** * The advertisement we were created from. **/ private PipeAdvertisement pAdv = null; /** * The current peer the pipe is resolved to. **/ private PeerID destPeer = null; /** * The set of peers to which the pipe can be resolved. **/ private Set resolvablePeers = null; /** * The endpoint destination address for the remote peer we are resolved to. **/ private EndpointAddress destAddress = null; private Messenger destMessenger = null; /** * The worker thread which actually sends messages on the pipe **/ private volatile Thread serviceThread = null; /** * Absolute time in miliseconds at which we will send the next verify * request. **/ private long nextVerifyAt = 0; /** * Queue of messages waiting to be sent. **/ private UnbiasedQueue queue = UnbiasedQueue.synchronizedQueue( new UnbiasedQueue( 50, false ) ); /** * Tracks the state of our worker thread. **/ static class workerState { /** * Find a new eligible destination peer which is listening on the pipe. **/ public static final workerState STARTMIGRATE = new workerState() { public String toString() { return "STARTMIGRATE"; } }; /** * Issue resolution queries and wait for responses **/ public static final workerState PENDINGMIGRATE = new workerState() { public String toString() { return "PENDINGMIGRATE"; } }; /** * Determine if the destination peer is still listening on the pipe. **/ public static final workerState STARTVERIFY = new workerState() { public String toString() { return "STARTVERIFY"; } }; /** * Issue verify queries and wait for responses **/ public static final workerState PENDINGVERIFY = new workerState() { public String toString() { return "PENDINGVERIFY"; } }; /** * Acquire a messenger to the destination peer. **/ public static final workerState ACQUIREMESSENGER = new workerState() { public String toString() { return "ACQUIREMESSENGER"; } }; /** * Send messages via the messenger to the destination peer. **/ public static final workerState SENDMESSAGES = new workerState() { public String toString() { return "SENDMESSAGES"; } }; /** * Exit. **/ public static final workerState CLOSED = new workerState() { public String toString() { return "CLOSED"; } }; /** * Private Constructor. This class is only constants. **/ private workerState() {} } /** * The current state of the worker thread **/ private workerState workerstate; /** * The query id we are currently operating under. **/ private int queryID = -1; /** * Create a new output pipe * * @param g peergroup we are working in. * @param r the piperesolver this pipe is bound to. * @param pAdv advertisement for the pipe we are supporting. * @param destPeer the peer this pipe is currently bound to. * @param peers the set of peers we allow this pipe to be bound to. **/ public NonBlockingOutputPipe( PeerGroup g, PipeResolver r, PipeAdvertisement pAdv, PeerID destPeer, Set peers ) { myGroup = g; myPipeResolver = r; this.pAdv = pAdv; this.destPeer = destPeer; this.resolvablePeers = new HashSet( peers ); endpoint = g.getEndpointService(); if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Constructing for " + getPipeID() ); } workerstate = workerState.ACQUIREMESSENGER; startServiceThread(); } /**
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -