⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pooledexecutor.java

📁 采用JAVA开发
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
 *        pool.setKeepAliveTime(1000 * 60 * 5);
 *        pool.abortWhenBlocked();
 *        pool.createThreads(9);
 *     </pre>
 *  <li> An unbounded queue serviced by exactly 5 threads:
 *     <pre>
 *        pool = new PooledExecutor(new LinkedQueue());
 *        pool.setKeepAliveTime(-1); // live forever
 *        pool.createThreads(5);
 *     </pre>
 *  </ol>
 *
 * <p>
 * <b>Usage notes.</b>
 * <p>
 *
 * Pools do not mesh well with using thread-specific storage via
 * java.lang.ThreadLocal.  ThreadLocal relies on the identity of a
 * thread executing a particular task. Pools use the same thread to
 * perform different tasks.  <p>
 *
 * If you need a policy not handled by the parameters in this class
 * consider writing a subclass.  <p>
 *
 * Version note: Previous versions of this class relied on
 * ThreadGroups for aggregate control. This has been removed, and the
 * method interruptAll added, to avoid differences in behavior across
 * JVMs.
 *
 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
 **/

public class PooledExecutor extends ThreadFactoryUser implements Executor {

  /** 
   * The maximum pool size; used if not otherwise specified.  Default
   * value is essentially infinite (Integer.MAX_VALUE)
   **/
  public static final int  DEFAULT_MAXIMUMPOOLSIZE = Integer.MAX_VALUE;

  /** 
   * The minimum pool size; used if not otherwise specified.  Default
   * value is 1.
   **/
  public static final int  DEFAULT_MINIMUMPOOLSIZE = 1;

  /**
   * The maximum time to keep worker threads alive waiting for new
   * tasks; used if not otherwise specified. Default value is one
   * minute (60000 milliseconds).
   **/
  public static final long DEFAULT_KEEPALIVETIME = 60 * 1000;

  /** The maximum number of threads allowed in pool. **/
  protected int maximumPoolSize_ = DEFAULT_MAXIMUMPOOLSIZE;

  /** The minumum number of threads to maintain in pool. **/
  protected int minimumPoolSize_ = DEFAULT_MINIMUMPOOLSIZE;

  /**  Current pool size.  **/
  protected int poolSize_ = 0;

  /** The maximum time for an idle thread to wait for new task. **/
  protected long keepAliveTime_ = DEFAULT_KEEPALIVETIME;

  /** 
   * Shutdown flag - latches true when a shutdown method is called 
   * in order to disable queuing/handoffs of new tasks.
   **/
  protected boolean shutdown_ = false;

  /**
   * The channel used to hand off the command to a thread in the pool.
   **/
  protected final Channel handOff_;

  /**
   * The set of active threads, declared as a map from workers to
   * their threads.  This is needed by the interruptAll method.  It
   * may also be useful in subclasses that need to perform other
   * thread management chores.
   **/
  protected final Map threads_;

  /** The current handler for unserviceable requests. **/
  protected BlockedExecutionHandler blockedExecutionHandler_;

  /** 
   * Create a new pool with all default settings
   **/

  public PooledExecutor() {
    this (new SynchronousChannel(), DEFAULT_MAXIMUMPOOLSIZE);
  }

  /** 
   * Create a new pool with all default settings except
   * for maximum pool size.
   **/

  public PooledExecutor(int maxPoolSize) {
    this(new SynchronousChannel(), maxPoolSize);
  }

  /** 
   * Create a new pool that uses the supplied Channel for queuing, and
   * with all default parameter settings.
   **/

  public PooledExecutor(Channel channel) {
    this(channel, DEFAULT_MAXIMUMPOOLSIZE);
  }

  /** 
   * Create a new pool that uses the supplied Channel for queuing, and
   * with all default parameter settings except for maximum pool size.
   **/

  public PooledExecutor(Channel channel, int maxPoolSize) {
    maximumPoolSize_ = maxPoolSize;
    handOff_ = channel;
    runWhenBlocked();
    threads_ = new HashMap();
  }
  
  /** 
   * Return the maximum number of threads to simultaneously execute
   * New unqueued requests will be handled according to the current
   * blocking policy once this limit is exceeded.
   **/
  public synchronized int getMaximumPoolSize() { 
    return maximumPoolSize_; 
  }

  /** 
   * Set the maximum number of threads to use. Decreasing the pool
   * size will not immediately kill existing threads, but they may
   * later die when idle.
   * @exception IllegalArgumentException if less or equal to zero.
   * (It is
   * not considered an error to set the maximum to be less than than
   * the minimum. However, in this case there are no guarantees
   * about behavior.)
   **/
  public synchronized void setMaximumPoolSize(int newMaximum) { 
    if (newMaximum <= 0) throw new IllegalArgumentException();
    maximumPoolSize_ = newMaximum; 
  }

  /** 
   * Return the minimum number of threads to simultaneously execute.
   * (Default value is 1).  If fewer than the mininum number are
   * running upon reception of a new request, a new thread is started
   * to handle this request.
   **/
  public synchronized int getMinimumPoolSize() { 
    return minimumPoolSize_; 
  }

  /** 
   * Set the minimum number of threads to use. 
   * @exception IllegalArgumentException if less than zero. (It is not
   * considered an error to set the minimum to be greater than the
   * maximum. However, in this case there are no guarantees about
   * behavior.)
   **/
  public synchronized void setMinimumPoolSize(int newMinimum) { 
    if (newMinimum < 0) throw new IllegalArgumentException();
    minimumPoolSize_ = newMinimum; 
  }
  
  /** 
   * Return the current number of active threads in the pool.  This
   * number is just a snaphot, and may change immediately upon
   * returning
   **/
  public synchronized int getPoolSize() { 
    return poolSize_; 
  }

  /** 
   * Return the number of milliseconds to keep threads alive waiting
   * for new commands. A negative value means to wait forever. A zero
   * value means not to wait at all.
   **/
  public synchronized long getKeepAliveTime() { 
    return keepAliveTime_; 
  }

  /** 
   * Set the number of milliseconds to keep threads alive waiting for
   * new commands. A negative value means to wait forever. A zero
   * value means not to wait at all.
   **/
  public synchronized void setKeepAliveTime(long msecs) { 
    keepAliveTime_ = msecs; 
  }

  /** Get the handler for blocked execution **/
  public synchronized BlockedExecutionHandler getBlockedExecutionHandler() {
    return blockedExecutionHandler_;
  }

  /** Set the handler for blocked execution **/
  public synchronized void setBlockedExecutionHandler(BlockedExecutionHandler h) {
    blockedExecutionHandler_ = h;
  }

  /**
   * Create and start a thread to handle a new command.  Call only
   * when holding lock.
   **/
  protected void addThread(Runnable command) {
    Worker worker = new Worker(command);
    Thread thread = getThreadFactory().newThread(worker);
    threads_.put(worker, thread);
    ++poolSize_;
    thread.start();
  }

  /**
   * Create and start up to numberOfThreads threads in the pool.
   * Return the number created. This may be less than the number
   * requested if creating more would exceed maximum pool size bound.
   **/
  public int createThreads(int numberOfThreads) {
    int ncreated = 0;
    for (int i = 0; i < numberOfThreads; ++i) {
      synchronized(this) { 
        if (poolSize_ < maximumPoolSize_) {
          addThread(null);
          ++ncreated;
        }
        else 
          break;
      }
    }
    return ncreated;
  }

  /**
   * Interrupt all threads in the pool, causing them all to
   * terminate. Assuming that executed tasks do not disable (clear)
   * interruptions, each thread will terminate after processing its
   * current task. Threads will terminate sooner if the executed tasks
   * themselves respond to interrupts.
   **/
  public synchronized void interruptAll() {
    for (Iterator it = threads_.values().iterator(); it.hasNext(); ) {
      Thread t = (Thread)(it.next());
      t.interrupt();
    }
  }

  /**
   * Interrupt all threads and disable construction of new
   * threads. Any tasks entered after this point will be discarded. A
   * shut down pool cannot be restarted.
   */
  public void shutdownNow() {
    shutdownNow(new DiscardWhenBlocked());
  }

  /**
   * Interrupt all threads and disable construction of new
   * threads. Any tasks entered after this point will be handled by
   * the given BlockedExecutionHandler.  A shut down pool cannot be
   * restarted.
   */
  public synchronized void shutdownNow(BlockedExecutionHandler handler) {
    setBlockedExecutionHandler(handler);
    shutdown_ = true; // don't allow new tasks
    minimumPoolSize_ = maximumPoolSize_ = 0; // don't make new threads
    interruptAll(); // interrupt all existing threads
  }

  /**
   * Terminate threads after processing all elements currently in
   * queue. Any tasks entered after this point will be discarded. A
   * shut down pool cannot be restarted.
   **/
  public void shutdownAfterProcessingCurrentlyQueuedTasks() {
    shutdownAfterProcessingCurrentlyQueuedTasks(new DiscardWhenBlocked());
  }

  /**
   * Terminate threads after processing all elements currently in
   * queue. Any tasks entered after this point will be handled by the
   * given BlockedExecutionHandler.  A shut down pool cannot be
   * restarted.
   **/
  public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks(BlockedExecutionHandler handler) {
    setBlockedExecutionHandler(handler);
    shutdown_ = true;
    if (poolSize_ == 0) // disable new thread construction when idle
      minimumPoolSize_ = maximumPoolSize_ = 0;
  }

  /** 
   * Return true if a shutDown method has succeeded in terminating all
   * threads.
   */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -