⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queue.java

📁 用java实现的一个socket服务器。采用非阻塞模式
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
      {
        throw new QueueClosedException();
      }

      /*get the next value*/
      retval = removeInternal();
      /*null result means we timed out*/
      if (retval == null)
      {
        throw new TimeoutException("timeout=" + timeout + "ms");
      }

      /*if we reached an end marker we are going to close the queue*/
//            if(retval == endMarker) {
//                close(false);
//                throw new QueueClosedException();
//            }
      /*at this point we actually did receive a value from the queue, return it*/
      return retval;
    }
  }

  /**
   * removes a specific object from the queue.
   * the object is matched up using the Object.equals method.
   * @param   obj the actual object to be removed from the queue
   */
  public void removeElement(Object obj)
      throws QueueClosedException
  {
    Element el, tmp_el;

    if (obj == null)
    {
      if (log.isErrorEnabled())
      {
        log.error("argument must not be null");
      }
      return;
    }

    synchronized (mutex)
    {
      if (closed) /*check to see if the queue is closed*/
      {
        throw new QueueClosedException();
      }

      el = head;

      /*the queue is empty*/
      if (el == null)
      {
        return;
      }

      /*check to see if the head element is the one to be removed*/
      if (el.obj.equals(obj))
      {
        /*the head element matched we will remove it*/
        head = el.next;
        el.next = null;
        el.obj = null;
        /*check if we only had one object left
         *at this time the queue becomes empty
         *this will set the tail=head=null
         */
        if (size == 1)
        {
          tail = head; // null
        }
        decrementSize();
        return;
      }

      /*look through the other elements*/
      while (el.next != null)
      {
        if (el.next.obj.equals(obj))
        {
          tmp_el = el.next;
          if (tmp_el == tail) // if it is the last element, move tail one to the left (bela Sept 20 2002)
          {
            tail = el;
          }
          el.next.obj = null;
          el.next = el.next.next; // point to the el past the next one. can be null.
          tmp_el.next = null;
          tmp_el.obj = null;
          decrementSize();
          break;
        }
        el = el.next;
      }
    }
  }

  /**
   * returns the first object on the queue, without removing it.
   * If the queue is empty this object blocks until the first queue object has
   * been added
   * @return the first object on the queue
   */
  public Object peek()
      throws QueueClosedException
  {
    Object retval;

    synchronized (mutex)
    {
      while (size == 0)
      {
        if (closed)
        {
          throw new QueueClosedException();
        }
        try
        {
          mutex.wait();
        }
        catch (InterruptedException ex)
        {
        }
      }

      if (closed)
      {
        throw new QueueClosedException();
      }

      retval = (head != null) ? head.obj : null;
    }

    if (retval == endMarker)
    {
      close(false); // mark queue as closed
      throw new QueueClosedException();
    }

    return retval;
  }

  /**
   * returns the first object on the queue, without removing it.
   * If the queue is empty this object blocks until the first queue object has
   * been added or the operation times out
   * @param timeout how long in milli seconds will this operation wait for an object to be added to the queue
   *        before it times out
   * @return the first object on the queue
   */
  public Object peek(long timeout)
      throws QueueClosedException, TimeoutException
  {
    Object retval;

    synchronized (mutex)
    {
      if (size == 0)
      {
        if (closed)
        {
          throw new QueueClosedException();
        }
        try
        {
          mutex.wait(timeout);
        }
        catch (InterruptedException ex)
        {
        }
      }
      if (closed)
      {
        throw new QueueClosedException();
      }

      retval = head != null ? head.obj : null;

      if (retval == null)
      {
        throw new TimeoutException("timeout=" + timeout + "ms");
      }

      if (retval == endMarker)
      {
        close(false);
        throw new QueueClosedException();
      }
      return retval;
    }
  }

  /** Removes all elements from the queue. This method can succeed even when the queue is closed */
  public void clear()
  {
    synchronized (mutex)
    {
      head = tail = null;
      size = 0;
      num_markers = 0;
      mutex.notifyAll();
    }
  }

  /**
   Marks the queues as closed. When an <code>add</code> or <code>remove</code> operation is
   attempted on a closed queue, an exception is thrown.
   @param flush_entries When true, a end-of-entries marker is added to the end of the queue.
   Entries may be added and removed, but when the end-of-entries marker
   is encountered, the queue is marked as closed. This allows to flush
   pending messages before closing the queue.
   */
  public void close(boolean flush_entries)
  {
    synchronized (mutex)
    {
      if (flush_entries && size > 0)
      {
        try
        {
          add(endMarker); // add an end-of-entries marker to the end of the queue
          num_markers++;
        }
        catch (QueueClosedException closed_ex)
        {
        }
        return;
      }
      closed = true;
      mutex.notifyAll();
    }
  }

  /** Waits until the queue has been closed. Returns immediately if already closed
   * @param timeout Number of milliseconds to wait. A value <= 0 means to wait forever
   */
  public void waitUntilClosed(long timeout)
  {
    synchronized (mutex)
    {
      if (closed)
      {
        return;
      }
      try
      {
        mutex.wait(timeout);
      }
      catch (InterruptedException e)
      {
      }
    }
  }

  /**
   * resets the queue.
   * This operation removes all the objects in the queue and marks the queue open
   */
  public void reset()
  {
    synchronized (mutex)
    {
      num_markers = 0;
      if (!closed)
      {
        close(false);
      }
      size = 0;
      head = null;
      tail = null;
      closed = false;
      mutex.notifyAll();
    }
  }

  /**
   * Returns all the elements of the queue
   * @return A copy of the queue
   */
  public LinkedList values()
  {
    LinkedList retval = new LinkedList();
    synchronized (mutex)
    {
      Element el = head;
      while (el != null)
      {
        retval.add(el.obj);
        el = el.next;
      }
    }
    return retval;
  }

  /**
   * returns the number of objects that are currently in the queue
   */
  public int size()
  {
    synchronized (mutex)
    {
      return size - num_markers;
    }
  }

  /**
   * prints the size of the queue
   */
  public String toString()
  {
    return "Queue (" + size() + ") elements";
  }

  /* ------------------------------------- Private Methods ----------------------------------- */

  private final void addInternal(Object obj)
  {
    /*create a new linked list element*/
    Element el = new Element(obj);
    /*check the first element*/
    if (head == null)
    {
      /*the object added is the first element*/
      /*set the head to be this object*/
      head = el;
      /*set the tail to be this object*/
      tail = head;
      /*set the size to be one, since the queue was empty*/
      size = 1;
    }
    else
    {
      /*add the object to the end of the linked list*/
      tail.next = el;
      /*set the tail to point to the last element*/
      tail = el;
      /*increase the size*/
      size++;
    }
  }

  /**
   * Removes the first element. Returns null if no elements in queue.
   * Always called with mutex locked (we don't have to lock mutex ourselves)
   */
  private Object removeInternal()
  {
    Element retval;
    Object obj;

    /*if the head is null, the queue is empty*/
    if (head == null)
    {
      return null;
    }

    retval = head; // head must be non-null now

    head = head.next;
    if (head == null)
    {
      tail = null;

    }
    decrementSize();
    if (head != null && head.obj == endMarker)
    {
      closed = true;
      mutex.notifyAll();
    }

    retval.next = null;
    obj = retval.obj;
    retval.obj = null;
    return obj;
  }

  /** Doesn't need to be synchronized; is always called from synchronized methods */
  final private void decrementSize()
  {
    size--;
    if (size < 0)
    {
      size = 0;
    }
  }

  /* ---------------------------------- End of Private Methods -------------------------------- */

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -