⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pooledexecutor.java

📁 采用JAVA开发
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
  public synchronized boolean isTerminatedAfterShutdown() {
    return shutdown_ && poolSize_ == 0;
  }

  /**
   * Wait for a shutdown pool to fully terminate, or until the timeout
   * has expired. This method may only be called <em>after</em>
   * invoking shutdownNow or
   * shutdownAfterProcessingCurrentlyQueuedTasks.
   *
   * @param maxWaitTime  the maximum time in milliseconds to wait
   * @return true if the pool has terminated within the max wait period
   * @exception IllegalStateException if shutdown has not been requested
   * @exception InterruptedException if the current thread has been interrupted in the course of waiting
   */
  public synchronized boolean awaitTerminationAfterShutdown(long maxWaitTime) throws InterruptedException {
    if (!shutdown_)
      throw new IllegalStateException();
    if (poolSize_ == 0)
      return true;
    long waitTime = maxWaitTime;
    if (waitTime <= 0)
      return false;
    long start = System.currentTimeMillis();
    for (;;) {
      wait(waitTime);
      if (poolSize_ == 0)
        return true;
      waitTime = maxWaitTime - (System.currentTimeMillis() - start);
      if (waitTime <= 0) 
        return false;
    }
  }

  /**
   * Wait for a shutdown pool to fully terminate.  This method may
   * only be called <em>after</em> invoking shutdownNow or
   * shutdownAfterProcessingCurrentlyQueuedTasks.
   *
   * @exception IllegalStateException if shutdown has not been requested
   * @exception InterruptedException if the current thread has been interrupted in the course of waiting
   */
  public synchronized void awaitTerminationAfterShutdown() throws InterruptedException {
    if (!shutdown_)
      throw new IllegalStateException();
    while (poolSize_ > 0)
      wait();
  }

  /**
   * Remove all unprocessed tasks from pool queue, and return them in
   * a java.util.List. Thsi method should be used only when there are
   * not any active clients of the pool. Otherwise you face the
   * possibility that the method will loop pulling out tasks as
   * clients are putting them in.  This method can be useful after
   * shutting down a pool (via shutdownNow) to determine whether there
   * are any pending tasks that were not processed.  You can then, for
   * example execute all unprocessed commands via code along the lines
   * of:
   *
   * <pre>
   *   List tasks = pool.drain();
   *   for (Iterator it = tasks.iterator(); it.hasNext();) 
   *     ( (Runnable)(it.next()) ).run();
   * </pre>
   **/
  public List drain() {
    boolean wasInterrupted = false;
    Vector tasks = new Vector();
    for (;;) {
      try {
        Object x = handOff_.poll(0);
        if (x == null) 
          break;
        else
          tasks.addElement(x);
      }
      catch (InterruptedException ex) {
        wasInterrupted = true; // postpone re-interrupt until drained
      }
    }
    if (wasInterrupted) Thread.currentThread().interrupt();
    return tasks;
  }
  
  /** 
   * Cleanup method called upon termination of worker thread.
   **/
  protected synchronized void workerDone(Worker w) {
    threads_.remove(w);
    if (--poolSize_ == 0 && shutdown_) { 
      maximumPoolSize_ = minimumPoolSize_ = 0; // disable new threads
      notifyAll(); // notify awaitTerminationAfterShutdown
    }

    // Create a replacement if needed
    if (poolSize_ == 0 || poolSize_ < minimumPoolSize_) {
      try {
         Runnable r = (Runnable)(handOff_.poll(0));
         if (r != null && !shutdown_) // just consume task if shut down
           addThread(r);
      } catch(InterruptedException ie) {
        return;
      }
    }
  }

  /** 
   * Get a task from the handoff queue, or null if shutting down.
   **/
  protected Runnable getTask() throws InterruptedException {
    long waitTime;
    synchronized(this) {
      if (poolSize_ > maximumPoolSize_) // Cause to die if too many threads
        return null;
      waitTime = (shutdown_)? 0 : keepAliveTime_;
    }
    if (waitTime >= 0) 
      return (Runnable)(handOff_.poll(waitTime));
    else 
      return (Runnable)(handOff_.take());
  }
  

  /**
   * Class defining the basic run loop for pooled threads.
   **/
  protected class Worker implements Runnable {
    protected Runnable firstTask_;

    protected Worker(Runnable firstTask) { firstTask_ = firstTask; }

    public void run() {
      try {
        Runnable task = firstTask_;
        firstTask_ = null; // enable GC

        if (task != null) {
          task.run();
          task = null;
        }
        
        while ( (task = getTask()) != null) {
          task.run();
          task = null;
        }
      }
      catch (InterruptedException ex) { } // fall through
      finally {
        workerDone(this);
      }
    }
  }

  /**
   * Class for actions to take when execute() blocks. Uses Strategy
   * pattern to represent different actions. You can add more in
   * subclasses, and/or create subclasses of these. If so, you will
   * also want to add or modify the corresponding methods that set the
   * current blockedExectionHandler_.
   **/
  public interface BlockedExecutionHandler {
    /** 
     * Return true if successfully handled so, execute should
     * terminate; else return false if execute loop should be retried.
     **/
    boolean blockedAction(Runnable command) throws InterruptedException;
  }

  /** Class defining Run action. **/
  protected class RunWhenBlocked implements BlockedExecutionHandler {
    public boolean blockedAction(Runnable command) {
      command.run();
      return true;
    }
  }

  /** 
   * Set the policy for blocked execution to be that the current
   * thread executes the command if there are no available threads in
   * the pool.
   **/
  public void runWhenBlocked() {
    setBlockedExecutionHandler(new RunWhenBlocked());
  }

  /** Class defining Wait action. **/
  protected class WaitWhenBlocked implements BlockedExecutionHandler {
    public boolean blockedAction(Runnable command) throws InterruptedException{
      synchronized(PooledExecutor.this) {
        if (shutdown_)
          return true;
      }
      handOff_.put(command);
      return true;
    }
  }

  /** 
   * Set the policy for blocked execution to be to wait until a thread
   * is available, unless the pool has been shut down, in which case
   * the action is discarded.
   **/
  public void waitWhenBlocked() {
    setBlockedExecutionHandler(new WaitWhenBlocked());
  }

  /** Class defining Discard action. **/
  protected class DiscardWhenBlocked implements BlockedExecutionHandler {
    public boolean blockedAction(Runnable command) {
      return true;
    }
  }

  /** 
   * Set the policy for blocked execution to be to return without
   * executing the request.
   **/
  public void discardWhenBlocked() {
    setBlockedExecutionHandler(new DiscardWhenBlocked());
  }


  /** Class defining Abort action. **/
  protected class AbortWhenBlocked implements BlockedExecutionHandler {
    public boolean blockedAction(Runnable command) {
      throw new RuntimeException("Pool is blocked");
    }
  }

  /** 
   * Set the policy for blocked execution to be to
   * throw a RuntimeException.
   **/
  public void abortWhenBlocked() {
    setBlockedExecutionHandler(new AbortWhenBlocked());
  }


  /**
   * Class defining DiscardOldest action.  Under this policy, at most
   * one old unhandled task is discarded.  If the new task can then be
   * handed off, it is.  Otherwise, the new task is run in the current
   * thread (i.e., RunWhenBlocked is used as a backup policy.)
   **/
  protected class DiscardOldestWhenBlocked implements BlockedExecutionHandler {
    public boolean blockedAction(Runnable command) throws InterruptedException{
      handOff_.poll(0);
      if (!handOff_.offer(command, 0))
        command.run();
      return true;
    }
  }

  /** 
   * Set the policy for blocked execution to be to discard the oldest
   * unhandled request
   **/
  public void discardOldestWhenBlocked() {
    setBlockedExecutionHandler(new DiscardOldestWhenBlocked());
  }

  /**
   * Arrange for the given command to be executed by a thread in this
   * pool.  The method normally returns when the command has been
   * handed off for (possibly later) execution.
   **/
  public void execute(Runnable command) throws InterruptedException {
    for (;;) {
      synchronized(this) { 
        if (!shutdown_) {
          int size = poolSize_;

          // Ensure minimum number of threads
          if (size < minimumPoolSize_) {
            addThread(command);
            return;
          }
          
          // Try to give to existing thread
          if (handOff_.offer(command, 0)) { 
            return;
          }
          
          // If cannot handoff and still under maximum, create new thread
          if (size < maximumPoolSize_) {
            addThread(command);
            return;
          }
        }
      }

      // Cannot hand off and cannot create -- ask for help
      if (getBlockedExecutionHandler().blockedAction(command)) {
        return;
      }
    }
  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -