📄 endpointreceivequeue.java
字号:
/*
* Copyright (c) 2001 Sun Microsystems, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Sun Microsystems, Inc. for Project JXTA."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact Project JXTA at http://www.jxta.org.
*
* 5. Products derived from this software may not be called "JXTA",
* nor may "JXTA" appear in their name, without prior written
* permission of Sun.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of Project JXTA. For more
* information on Project JXTA, please see
* <http://www.jxta.org/>.
*
* This license is based on the BSD license adopted by the Apache Foundation.
*
* $Id: EndpointReceiveQueue.java,v 1.2 2002/03/04 21:42:57 echtcherbina Exp $
*/
package net.jxta.impl.endpoint;
import net.jxta.endpoint.*;
import java.io.*;
import java.util.*;
/**
* A EndpointMessageQueue is the queing mechanism to queue Message
* between the various components of the Juxtapose Transport layer.
* Messages can be pushed onto it in a non blocking way. However receiving
* messages can be done by nonblocking poll or a blocking waitForMessage.
*
* @version $Revision: 1.2 $
* @since JXTA 1.0
*/
public class EndpointReceiveQueue {
public static final int Max_Messages = 100;
/**
* Contains the messages we currently have queued.
**/
private Vector queue = null;
private boolean closeFlag = false;
private int maxNbOfMessages = Max_Messages;
private int nbOfQueuedMessages = 0;
/**
* Default constructor
*/
public EndpointReceiveQueue() {
queue = new Vector( Max_Messages );
closeFlag = false;
}
/**
* Default constructor
*/
public EndpointReceiveQueue( int size ) {
queue = new Vector( size );
closeFlag = false;
}
/**
* Push a TransportMessage onto the queue.
*
* @param message TransportMessage to be pushed onto the queue
**/
public synchronized void push(Message message) {
// This queue is closed. No additional messages allowed.
if (closeFlag) {
return;
}
if (queue.size() == getMaxNbOfMessages() ) {
// Queue is full. Drop the oldest message
--nbOfQueuedMessages;
queue.removeElementAt(0);
}
++nbOfQueuedMessages;
queue.addElement(message);
notifyAll(); // inform all those waiting.
}
/**
* Return next message in the queue if there is one.
*
* @return Message, null if the queue is empty
**/
public synchronized Message next() {
if (queue.isEmpty()) {
return null;
}
Message result = (Message) queue.firstElement();
queue.removeElementAt(0);
--nbOfQueuedMessages;
return result;
}
/**
* Gets a Message from the queue. If no Message is immediately available,
* then wait the specified amount of time. Per Java convention, a timeout
* of zero (0) means wait an infinite amount of time.
*
* @param timeOut in ms
* @return a Message
**/
public synchronized Message poll(long timeOut) throws InterruptedException {
/*
* Because there may be more than one thread waiting on this queue,
* when we are woken up we do not necessarily get the next message in
* the queue. In this case, rather than terminating because we didn't
* get the message we resume waiting, but we ensure that we never wait
* longer than the amount of time which was originally requested.
* (if we fail to get the message after being woken its actually a
* little less than the requested time)
*/
if( timeOut < 0 )
throw new IllegalArgumentException( "timeOut must be >= 0" );
long realTimeOut = System.currentTimeMillis() + timeOut;
do {
Message result = next();
if( null != result )
return result; // we have a message
if( !isClosed() ) {
wait(timeOut);
result = next();
if( null != result )
return result; // we have a message
if ( 0 != timeOut ) {
// reduce the wait timeOut by the amount we have already
// waited.
timeOut = (realTimeOut - System.currentTimeMillis());
if( timeOut <= 0 ) {
// we have passed the time at which we said we would stop
break; // so we give up.
}
}
}
}
while( !isClosed() );
return null; // the queue has been closed down.
}
/**
* Wait until there is a Message in the queue, and return it. Returns null
* if the queue has been closed.
*
* @return a Message
*/
public Message waitForMessage() throws InterruptedException {
return poll( 0 );
}
/**
* Atomically return whether or not this queue has been closed.
*
* @return boolean indicating whether this queue has been closed.
**/
public synchronized boolean isClosed() {
return closeFlag;
}
/**
* close it
**/
public synchronized void close() {
closeFlag = true;
notifyAll(); // make sure anyone waiting knows about it.
}
/**
* how many messages will fit in this queue
*
* @return int indicating how many messages will fit in the queue.
**/
public int getMaxNbOfMessages() {
int maxNb = queue.capacity() < maxNbOfMessages ? queue.capacity() : maxNbOfMessages;
return maxNb;
}
/**
* Set how many messages this queue may store. Note that if there are more
* messages already in the queue than the specified amount then the queue
* will retain its current capacity.
*
* @param maxMsgs The number of messages which the queue must be able to
* store.
**/
public synchronized void setMaxNbOfMessages( int maxMsgs ) {
maxNbOfMessages = maxMsgs;
queue.ensureCapacity( maxMsgs );
}
public synchronized int getNbOfQueuedMessages() {
return nbOfQueuedMessages;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -