⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 endpointreceivequeue.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
字号:
/*
 * Copyright (c) 2001 Sun Microsystems, Inc.  All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in
 *    the documentation and/or other materials provided with the
 *    distribution.
 *
 * 3. The end-user documentation included with the redistribution,
 *    if any, must include the following acknowledgment:
 *       "This product includes software developed by the
 *       Sun Microsystems, Inc. for Project JXTA."
 *    Alternately, this acknowledgment may appear in the software itself,
 *    if and wherever such third-party acknowledgments normally appear.
 *
 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
 *    not be used to endorse or promote products derived from this
 *    software without prior written permission. For written
 *    permission, please contact Project JXTA at http://www.jxta.org.
 *
 * 5. Products derived from this software may not be called "JXTA",
 *    nor may "JXTA" appear in their name, without prior written
 *    permission of Sun.
 *
 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED.  IN NO EVENT SHALL SUN MICROSYSTEMS OR
 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of Project JXTA.  For more
 * information on Project JXTA, please see
 * <http://www.jxta.org/>.
 *
 * This license is based on the BSD license adopted by the Apache Foundation.
 *
 * $Id: EndpointReceiveQueue.java,v 1.2 2002/03/04 21:42:57 echtcherbina Exp $
 */
package net.jxta.impl.endpoint;
import net.jxta.endpoint.*;

import java.io.*;
import java.util.*;

/**
 * A EndpointMessageQueue is the queing mechanism to queue Message
 * between the various components of the Juxtapose Transport layer.
 * Messages can be pushed onto it in a non blocking way. However receiving
 * messages can be done by nonblocking poll or a blocking waitForMessage.
 *
 * @version     $Revision: 1.2 $
 * @since       JXTA 1.0
 */

public class EndpointReceiveQueue  {
    
    public static final int Max_Messages = 100;
    
    /**
     *  Contains the messages we currently have queued.
     **/
    private Vector queue = null;
    private boolean closeFlag = false;
    private int maxNbOfMessages = Max_Messages;
    private int nbOfQueuedMessages = 0;

    /**
     * Default constructor
     */
    public EndpointReceiveQueue() {
        queue = new Vector( Max_Messages );
        closeFlag = false;
    }
    
    /**
     * Default constructor
     */
    public EndpointReceiveQueue( int size ) {
        queue = new Vector( size );
        closeFlag = false;
    }

    /**
     * Push a TransportMessage onto the queue.
     *
     * @param message TransportMessage to be pushed onto the queue
     **/
    public synchronized void push(Message message) {
        
        // This queue is closed. No additional messages allowed.
        if (closeFlag) {
            return;
        }
        
        if (queue.size() == getMaxNbOfMessages() ) {
            // Queue is full. Drop the oldest message
	    --nbOfQueuedMessages;
            queue.removeElementAt(0);
        }
        
	++nbOfQueuedMessages;
        queue.addElement(message);
        
        notifyAll(); // inform all those waiting.
    }
    
    /**
     * Return next message in the queue if there is one.
     *
     * @return Message, null if the queue is empty
     **/
    public synchronized Message next() {
        
        if (queue.isEmpty()) {
            return null;
        }
        
        Message result = (Message) queue.firstElement();
        queue.removeElementAt(0);
	--nbOfQueuedMessages;

        return  result;
    }
    
    /**
     *  Gets a Message from the queue. If no Message is immediately available,
     *  then wait the specified amount of time. Per Java convention, a timeout
     *  of zero (0) means wait an infinite amount of time.
     *
     *  @param timeOut in ms
     *  @return a Message
     **/
    public synchronized Message poll(long timeOut) throws InterruptedException {
        /*
         *  Because there may be more than one thread waiting on this queue,
         *  when we are woken up we do not necessarily get the next message in
         *  the queue. In this case, rather than terminating because we didn't
         *  get the message we resume waiting, but we ensure that we never wait
         *  longer than the amount of time which was originally requested.
         *  (if we fail to get the message after being woken its actually a
         *  little less than the requested time)
         */
        
        if( timeOut < 0 )
            throw new IllegalArgumentException( "timeOut must be >= 0" );
        
        long realTimeOut = System.currentTimeMillis() + timeOut;
        
        do {
            Message result = next();
            
            if( null != result )
                return result;          // we have a message
            
            if( !isClosed() ) {
                wait(timeOut);
                
                result = next();
                
                if( null != result )
                    return result;          // we have a message
                
                if ( 0 != timeOut ) {
                    // reduce the wait timeOut by the amount we have already
                    // waited.
                    timeOut = (realTimeOut - System.currentTimeMillis());
                    
                    if( timeOut <= 0 ) {
                        // we have passed the time at which we said we would stop
                        break;  // so we give up.
                    }
                }
            }
        }
        while( !isClosed() );
        
        return null;          // the queue has been closed down.
    }
    
    /**
     *  Wait until there is a Message in the queue, and return it. Returns null
     *  if the queue has been closed.
     *
     *  @return a Message
     */
    public Message waitForMessage() throws InterruptedException {
        return poll( 0 );
    }
    
    /**
     *  Atomically return whether or not this queue has been closed.
     *
     *  @return boolean indicating whether this queue has been closed.
     **/
    public synchronized boolean isClosed() {
        return closeFlag;
    }
    
    /**
     * close it
     **/
    public synchronized void close() {
        closeFlag = true;
        notifyAll();        // make sure anyone waiting knows about it.
    }
    
    /**
     * how many messages will fit in this queue
     *
     *  @return int indicating how many messages will fit in the queue.
     **/
    public int getMaxNbOfMessages() {
	int maxNb = queue.capacity() < maxNbOfMessages ? queue.capacity() : maxNbOfMessages;
        return maxNb;
    }
    
    /**
     *  Set how many messages this queue may store. Note that if there are more
     *  messages already in the queue than the specified amount then the queue
     *  will retain its current capacity.
     *
     *  @param maxMsgs  The number of messages which the queue must be able to
     *  store.
     **/
    public synchronized void setMaxNbOfMessages( int maxMsgs ) {
	maxNbOfMessages = maxMsgs;
        queue.ensureCapacity( maxMsgs );
    }

    public synchronized int getNbOfQueuedMessages() {
	return nbOfQueuedMessages;
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -