⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 endpointreceivequeue.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
字号:
/*
 *  Copyright (c) 2001-2002 Sun Microsystems, Inc.  All rights reserved.
 *
 *  Redistribution and use in source and binary forms, with or without
 *  modification, are permitted provided that the following conditions
 *  are met:
 *
 *  1. Redistributions of source code must retain the above copyright
 *  notice, this list of conditions and the following disclaimer.
 *
 *  2. Redistributions in binary form must reproduce the above copyright
 *  notice, this list of conditions and the following disclaimer in
 *  the documentation and/or other materials provided with the
 *  distribution.
 *
 *  3. The end-user documentation included with the redistribution,
 *  if any, must include the following acknowledgment:
 *  "This product includes software developed by the
 *  Sun Microsystems, Inc. for Project JXTA."
 *  Alternately, this acknowledgment may appear in the software itself,
 *  if and wherever such third-party acknowledgments normally appear.
 *
 *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
 *  not be used to endorse or promote products derived from this
 *  software without prior written permission. For written
 *  permission, please contact Project JXTA at http://www.jxta.org.
 *
 *  5. Products derived from this software may not be called "JXTA",
 *  nor may "JXTA" appear in their name, without prior written
 *  permission of Sun.
 *
 *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
 *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 *  OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 *  DISCLAIMED.  IN NO EVENT SHALL SUN MICROSYSTEMS OR
 *  ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
 *  USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 *  ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 *  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 *  OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 *  SUCH DAMAGE.
 *  ====================================================================
 *
 *  This software consists of voluntary contributions made by many
 *  individuals on behalf of Project JXTA.  For more
 *  information on Project JXTA, please see
 *  <http://www.jxta.org/>.
 *
 *  This license is based on the BSD license adopted by the Apache Foundation.
 *
 *  $Id: EndpointReceiveQueue.java,v 1.20 2002/05/26 03:23:11 jice Exp $
 */
package net.jxta.impl.endpoint;
import net.jxta.endpoint.*;
import org.apache.log4j.Category;
import org.apache.log4j.Priority;
import net.jxta.endpoint.Message;

import java.util.Vector;

/**
 * A EndpointMessageQueue is the queing mechanism to queue Message
 * between the various components of the Juxtapose Transport layer.
 * Messages can be pushed onto it in a non blocking way. However receiving
 * messages can be done by nonblocking poll or a blocking waitForMessage.
 *
 * @since      JXTA 1.0
 */

public class EndpointReceiveQueue  {
    
    private static final Category LOG = Category.getInstance(EndpointReceiveQueue.class.getName());
    
    public static final int Max_Messages = 100;
    
    /**
     *  Contains the messages we currently have queued.
     **/
    private Vector queue = null;
    
    /**
     *  If true the queue is being closed and is currently in the process of
     *  being flushed.
     **/
    private boolean closeFlag = false;
    
    /**
     *  The maximum number of messages we will hold in the queue at one time.
     **/
    private int maxNbOfMessages = Max_Messages;
    
    /** total number of messages which have been enqueued into this queue **/
    private int nbOfQueuedMessages = 0;
    
    /** time in millis when it will be ok to display a message about dropping
     *  messages. we throttle this so that there is a chance to do work rather
     *  than just always spewing messages 
     */
    private long nextDroppedWarn = 0;
    
    /** the number of messages we have dropped since we began working. **/
    private long nbOfDroppedMessages = 0;
    /**
     * Default constructor
     */
    public EndpointReceiveQueue() {
       this( Max_Messages );
    }
    
    /**
     * Default constructor
     */
    public EndpointReceiveQueue( int size ) {
        queue = new Vector( size );
        closeFlag = false;
    }
    
    /**
     * Push a TransportMessage onto the queue.
     *
     * @param message TransportMessage to be pushed onto the queue
     **/
    public synchronized boolean push(Message message) {
        
        // This queue is closed. No additional messages allowed.
        if (closeFlag) {
            return false;
        }
        
        if (queue.size() >= getMaxNbOfMessages() ) {
            // Queue is full. Drop the oldest message. Issue a warning if we
            // have not done so in the last second.
            nbOfDroppedMessages++;
            long now = System.currentTimeMillis();
            if ( ( now > nextDroppedWarn) && LOG.isEnabledFor(Priority.WARN) ) {
                LOG.warn( "Receive queue full, dropped one message. Now dropped " + nbOfDroppedMessages + " messsages." );
                nextDroppedWarn = now + 1000; // a second from now
	    }
            queue.removeElementAt(0);
        }
	++nbOfQueuedMessages;

        queue.addElement(message);
        
        notify(); // inform someone who is waiting. we dont have to tell everyone though.
        
        return true;
    }
    
    /**
     * Return next message in the queue if there is one.
     *
     * @return Message, null if the queue is empty
     **/
    public synchronized Message next() {
        
        if (queue.isEmpty()) {
            return null;
        }
        
        Message result = (Message) queue.firstElement();
        queue.removeElementAt(0);
        
        return  result;
    }
    
    /**
     *  Gets a Message from the queue. If no Message is immediately available,
     *  then wait the specified amount of time. Per Java convention, a timeout
     *  of zero (0) means wait an infinite amount of time.
     *
     *  @param timeOut in ms
     *  @return a Message
     *  @throws InterruptedException    if the operation is interrupted before
     *      the timeout interval is completed.
     *  
     **/
    public synchronized Message poll(long timeOut) throws InterruptedException {
        /*
         *  Because there may be more than one thread waiting on this queue,
         *  when we are woken up we do not necessarily get the next message in
         *  the queue. In this case, rather than terminating because we didn't
         *  get the message we resume waiting, but we ensure that we never wait
         *  longer than the amount of time which was originally requested.
         *  (if we fail to get the message after being woken its actually a
         *  little less than the requested time)
         */
        
        if( timeOut < 0 )
            throw new IllegalArgumentException( "timeOut must be >= 0" );
        
        long realTimeOut = System.currentTimeMillis() + timeOut;
        
        do {
            Message result = next();
            
            if( null != result )
                return result;          // we have a message
            
            if( !isClosed() ) {
                wait(timeOut);
                
                result = next();
                
                if( null != result )
                    return result;          // we have a message
                
                if ( 0 != timeOut ) {
                    // reduce the wait timeOut by the amount we have already
                    // waited.
                    timeOut = (realTimeOut - System.currentTimeMillis());
                    
                    if( timeOut <= 0 ) {
                        // we have passed the time at which we said we would stop
                        break;  // so we give up.
                    }
                }
            }
        }
        while( !isClosed() );
        
        return null;          // the queue has been closed down.
    }
    
    /**
     *  Wait until there is a Message in the queue, and return it. Returns null
     *  if the queue has been closed.
     *
     *  @return a Message
     */
    public Message waitForMessage() throws InterruptedException {
        return poll( 0 );
    }
    
    /**
     *  Atomically return whether or not this queue has been closed.
     *
     *  @return boolean indicating whether this queue has been closed.
     **/
    public synchronized boolean isClosed() {
        return closeFlag;
    }
    
    /**
     * close it
     **/
    public synchronized void close() {
        closeFlag = true;
        notifyAll();        // make sure everyone waiting knows about it.
    }
    
    /**
     * how many messages will fit in this queue
     *
     *  @return int indicating how many messages will fit in the queue.
     **/
    public int getMaxNbOfMessages() {
        return maxNbOfMessages;
    }
    
    /**
     *  Set how many messages this queue may store. Note that if there are more
     *  messages already in the queue than the specified amount then the queue
     *  will retain its current capacity.
     *
     *  @param maxMsgs  The number of messages which the queue must be able to
     *  store.
     **/
    public synchronized void setMaxNbOfMessages( int maxMsgs ) {
        maxNbOfMessages = maxMsgs;
        queue.ensureCapacity( maxMsgs );
    }
    
    /**
     *  Return the total numbe of messages which have been enqueued on to this
     *  receive queue during its existance.
     *
     *  @return how many messages have been queued.
     **/
    public synchronized int getNbOfQueuedMessages() {
        return nbOfQueuedMessages;
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -