⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queue.java

📁 用java实现的一个socket服务器。采用非阻塞模式
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package com.huawei.comm.smap;

// $Id: Queue.java,v 1.29 2006/08/08 15:34:22 belaban Exp $

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.LinkedList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Enumeration;

/**
 * Elements are added at the tail and removed from the head. Class is thread-safe in that
 * 1 producer and 1 consumer may add/remove elements concurrently. The class is not
 * explicitely designed for multiple producers or consumers. Implemented as a linked
 * list, so that removal of an element at the head does not cause a right-shift of the
 * remaining elements (as in a Vector-based implementation).
 * @author Bela Ban
 */
public class Queue
{

  /*head and the tail of the list so that we can easily add and remove objects*/
  private Element head = null, tail = null;

  /*flag to determine the state of the queue*/
  private boolean closed = false;

  /*current size of the queue*/
  private int size = 0;

  /* Lock object for synchronization. Is notified when element is added */
  private final Object mutex = new Object();

  /** Lock object for syncing on removes. It is notified when an object is removed */
  // Object  remove_mutex=new Object();

  /*the number of end markers that have been added*/
  private int num_markers = 0;

  /**
   * if the queue closes during the runtime
   * an endMarker object is added to the end of the queue to indicate that
   * the queue will close automatically when the end marker is encountered
   * This allows for a "soft" close.
   * @see Queue#close
   */
  private static final Object endMarker = new Object();

  protected static final Log log = LogFactory.getLog(Queue.class);

  /**
   * the class Element indicates an object in the queue.
   * This element allows for the linked list algorithm by always holding a
   * reference to the next element in the list.
   * if Element.next is null, then this element is the tail of the list.
   */
  static class Element
  {
    /*the actual value stored in the queue*/
    Object obj = null;
    /*pointer to the next item in the (queue) linked list*/
    Element next = null;

    /**
     * creates an Element object holding its value
     * @param o - the object to be stored in the queue position
     */
    Element(Object o)
    {
      obj = o;
    }

    /**
     * prints out the value of the object
     */
    public String toString()
    {
      return obj != null ? obj.toString() : "null";
    }
  }

  /**
   * creates an empty queue
   */
  public Queue()
  {
  }

  /**
   * Returns the first element. Returns null if no elements are available.
   */
  public Object getFirst()
  {
    synchronized (mutex)
    {
      return head != null ? head.obj : null;
    }
  }

  /**
   * Returns the last element. Returns null if no elements are available.
   */
  public Object getLast()
  {
    synchronized (mutex)
    {
      return tail != null ? tail.obj : null;
    }
  }

  /**
   * returns true if the Queue has been closed
   * however, this method will return false if the queue has been closed
   * using the close(true) method and the last element has yet not been received.
   * @return true if the queue has been closed
   */
  public boolean closed()
  {
    synchronized (mutex)
    {
      return closed;
    }
  }

  /**
   * adds an object to the tail of this queue
   * If the queue has been closed with close(true) no exception will be
   * thrown if the queue has not been flushed yet.
   * @param obj - the object to be added to the queue
   * @exception QueueClosedException exception if closed() returns true
   */
  public void add(Object obj)
      throws QueueClosedException
  {
    if (obj == null)
    {
      if (log.isErrorEnabled())
      {
        log.error("argument must not be null");
      }
      return;
    }

    /*lock the queue from other threads*/
    synchronized (mutex)
    {
      if (closed)
      {
        throw new QueueClosedException();
      }
      if (this.num_markers > 0)
      {
        throw new QueueClosedException(
            "queue has been closed. You can not add more elements. " +
            "Waiting for removal of remaining elements.");
      }
      addInternal(obj);

      /*wake up all the threads that are waiting for the lock to be released*/
      mutex.notifyAll();
    }
  }

  public void addAll(Collection c)
      throws QueueClosedException
  {
    if (c == null)
    {
      if (log.isErrorEnabled())
      {
        log.error("argument must not be null");
      }
      return;
    }

    /*lock the queue from other threads*/
    synchronized (mutex)
    {
      if (closed)
      {
        throw new QueueClosedException();
      }
      if (this.num_markers > 0)
      {
        throw new QueueClosedException(
            "queue has been closed. You can not add more elements. " +
            "Waiting for removal of remaining elements.");
      }

      Object obj;
      for (Iterator it = c.iterator(); it.hasNext(); )
      {
        obj = it.next();
        if (obj != null)
        {
          addInternal(obj);
        }
      }

      /*wake up all the threads that are waiting for the lock to be released*/
      mutex.notifyAll();
    }
  }

  public void addAll(List l)
      throws QueueClosedException
  {
    if (l == null)
    {
      if (log.isErrorEnabled())
      {
        log.error("argument must not be null");
      }
      return;
    }

    /*lock the queue from other threads*/
    synchronized (mutex)
    {
      if (closed)
      {
        throw new QueueClosedException();
      }
      if (this.num_markers > 0)
      {
        throw new QueueClosedException(
            "queue has been closed. You can not add more elements. " +
            "Waiting for removal of remaining elements.");
      }

      Object obj;
      for (Enumeration en = l.elements(); en.hasMoreElements(); )
      {
        obj = en.nextElement();
        if (obj != null)
        {
          addInternal(obj);
        }
      }

      /*wake up all the threads that are waiting for the lock to be released*/
      mutex.notifyAll();
    }
  }

  /**
   * Adds a new object to the head of the queue
   * basically (obj.equals(queue.remove(queue.add(obj)))) returns true
   * If the queue has been closed with close(true) no exception will be
   * thrown if the queue has not been flushed yet.
   * @param obj - the object to be added to the queue
   * @exception QueueClosedException exception if closed() returns true
   */
  public void addAtHead(Object obj)
      throws QueueClosedException
  {
    if (obj == null)
    {
      if (log.isErrorEnabled())
      {
        log.error("argument must not be null");
      }
      return;
    }

    /*lock the queue from other threads*/
    synchronized (mutex)
    {
      if (closed)
      {
        throw new QueueClosedException();
      }
      if (this.num_markers > 0)
      {
        throw new QueueClosedException(
            "Queue.addAtHead(): queue has been closed. You can not add more elements. " +
            "Waiting for removal of remaining elements.");
      }

      Element el = new Element(obj);
      /*check the head element in the list*/
      if (head == null)
      {
        /*this is the first object, we could have done add(obj) here*/
        head = el;
        tail = head;
        size = 1;
      }
      else
      {
        /*set the head element to be the child of this one*/
        el.next = head;
        /*set the head to point to the recently added object*/
        head = el;
        /*increase the size*/
        size++;
      }
      /*wake up all the threads that are waiting for the lock to be released*/
      mutex.notifyAll();
    }
  }

  /**
   * Removes 1 element from head or <B>blocks</B>
   * until next element has been added or until queue has been closed
   * @return the first element to be taken of the queue
   */
  public Object remove()
      throws QueueClosedException
  {
    Object retval;
    synchronized (mutex)
    {
      /*wait as long as the queue is empty. return when an element is present or queue is closed*/
      while (size == 0)
      {
        if (closed)
        {
          throw new QueueClosedException();
        }
        try
        {
          mutex.wait();
        }
        catch (InterruptedException ex)
        {
        }
      }

      if (closed)
      {
        throw new QueueClosedException();
      }

      /*remove the head from the queue, if we make it to this point, retval should not be null !*/
      retval = removeInternal();
      if (retval == null)
      {
        if (log.isErrorEnabled())
        {
          log.error("element was null, should never be the case");
        }
      }
    }

    /*
     * we ran into an Endmarker, which means that the queue was closed before
     * through close(true)
     */
//        if(retval == endMarker) {
//            close(false); // mark queue as closed
//            throw new QueueClosedException();
//        }
    return retval;
  }

  /**
   * Removes 1 element from the head.
   * If the queue is empty the operation will wait for timeout ms.
   * if no object is added during the timeout time, a Timout exception is thrown
   * @param timeout - the number of milli seconds this operation will wait before it times out
   * @return the first object in the queue
   */
  public Object remove(long timeout)
      throws QueueClosedException, TimeoutException
  {
    Object retval;

    synchronized (mutex)
    {
      if (closed)
      {
        throw new QueueClosedException();
      }

      /*if the queue size is zero, we want to wait until a new object is added*/
      if (size == 0)
      {
        try
        {
          /*release the mutex lock and wait no more than timeout ms*/
          mutex.wait(timeout);
        }
        catch (InterruptedException ex)
        {
        }
      }
      /*we either timed out, or got notified by the mutex lock object*/
      if (closed)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -