📄 pooledexecutor.java
字号:
shutdown_ = true; if (poolSize_ == 0) // disable new thread construction when idle minimumPoolSize_ = maximumPoolSize_ = 0; } /** * Return true if a shutDown method has succeeded in terminating all * threads. */ public synchronized boolean isTerminatedAfterShutdown() { return shutdown_ && poolSize_ == 0; } /** * Wait for a shutdown pool to fully terminate, or until the timeout * has expired. This method may only be called <em>after</em> * invoking shutdownNow or * shutdownAfterProcessingCurrentlyQueuedTasks. * * @param maxWaitTime the maximum time in milliseconds to wait * @return true if the pool has terminated within the max wait period * @exception IllegalStateException if shutdown has not been requested * @exception InterruptedException if the current thread has been interrupted in the course of waiting */ public synchronized boolean awaitTerminationAfterShutdown(long maxWaitTime) throws InterruptedException { if (!shutdown_) throw new IllegalStateException(); if (poolSize_ == 0) return true; long waitTime = maxWaitTime; if (waitTime <= 0) return false; long start = System.currentTimeMillis(); for (;;) { wait(waitTime); if (poolSize_ == 0) return true; waitTime = maxWaitTime - (System.currentTimeMillis() - start); if (waitTime <= 0) return false; } } /** * Wait for a shutdown pool to fully terminate. This method may * only be called <em>after</em> invoking shutdownNow or * shutdownAfterProcessingCurrentlyQueuedTasks. * * @exception IllegalStateException if shutdown has not been requested * @exception InterruptedException if the current thread has been interrupted in the course of waiting */ public synchronized void awaitTerminationAfterShutdown() throws InterruptedException { if (!shutdown_) throw new IllegalStateException(); while (poolSize_ > 0) wait(); } /** * Remove all unprocessed tasks from pool queue, and return them in * a java.util.List. Thsi method should be used only when there are * not any active clients of the pool. Otherwise you face the * possibility that the method will loop pulling out tasks as * clients are putting them in. This method can be useful after * shutting down a pool (via shutdownNow) to determine whether there * are any pending tasks that were not processed. You can then, for * example execute all unprocessed commands via code along the lines * of: * * <pre> * List tasks = pool.drain(); * for (Iterator it = tasks.iterator(); it.hasNext();) * ( (Runnable)(it.next()) ).run(); * </pre> **/ public List drain() { boolean wasInterrupted = false; Vector tasks = new Vector(); for (;;) { try { Object x = handOff_.poll(0); if (x == null) break; else tasks.addElement(x); } catch (InterruptedException ex) { wasInterrupted = true; // postpone re-interrupt until drained } } if (wasInterrupted) Thread.currentThread().interrupt(); return tasks; } /** * Cleanup method called upon termination of worker thread. **/ protected synchronized void workerDone(Worker w) { threads_.remove(w); if (--poolSize_ == 0 && shutdown_) { maximumPoolSize_ = minimumPoolSize_ = 0; // disable new threads notifyAll(); // notify awaitTerminationAfterShutdown } } /** * Get a task from the handoff queue, or null if shutting down. **/ protected Runnable getTask() throws InterruptedException { long waitTime; synchronized(this) { if (poolSize_ > maximumPoolSize_) // Cause to die if too many threads return null; waitTime = (shutdown_)? 0 : keepAliveTime_; } if (waitTime >= 0) return (Runnable)(handOff_.poll(waitTime)); else return (Runnable)(handOff_.take()); } /** * Class defining the basic run loop for pooled threads. **/ protected class Worker implements Runnable { protected Runnable firstTask_; protected Worker(Runnable firstTask) { firstTask_ = firstTask; } public void run() { try { Runnable task = firstTask_; firstTask_ = null; // enable GC if (task != null) { task.run(); task = null; } while ( (task = getTask()) != null) { task.run(); task = null; } } catch (InterruptedException ex) { } // fall through finally { workerDone(this); } } } /** * Class for actions to take when execute() blocks. Uses Strategy * pattern to represent different actions. You can add more in * subclasses, and/or create subclasses of these. If so, you will * also want to add or modify the corresponding methods that set the * current blockedExectionHandler_. **/ public interface BlockedExecutionHandler { /** * Return true if successfully handled so, execute should * terminate; else return false if execute loop should be retried. **/ boolean blockedAction(Runnable command) throws InterruptedException; } /** Class defining Run action. **/ protected class RunWhenBlocked implements BlockedExecutionHandler { public boolean blockedAction(Runnable command) { command.run(); return true; } } /** * Set the policy for blocked execution to be that the current * thread executes the command if there are no available threads in * the pool. **/ public void runWhenBlocked() { setBlockedExecutionHandler(new RunWhenBlocked()); } /** Class defining Wait action. **/ protected class WaitWhenBlocked implements BlockedExecutionHandler { public boolean blockedAction(Runnable command) throws InterruptedException{ handOff_.put(command); return true; } } /** * Set the policy for blocked execution to be to wait until a thread * is available. **/ public void waitWhenBlocked() { setBlockedExecutionHandler(new WaitWhenBlocked()); } /** Class defining Discard action. **/ protected class DiscardWhenBlocked implements BlockedExecutionHandler { public boolean blockedAction(Runnable command) { return true; } } /** * Set the policy for blocked execution to be to return without * executing the request. **/ public void discardWhenBlocked() { setBlockedExecutionHandler(new DiscardWhenBlocked()); } /** Class defining Abort action. **/ protected class AbortWhenBlocked implements BlockedExecutionHandler { public boolean blockedAction(Runnable command) { throw new RuntimeException("Pool is blocked"); } } /** * Set the policy for blocked execution to be to * throw a RuntimeException. **/ public void abortWhenBlocked() { setBlockedExecutionHandler(new AbortWhenBlocked()); } /** * Class defining DiscardOldest action. Under this policy, at most * one old unhandled task is discarded. If the new task can then be * handed off, it is. Otherwise, the new task is run in the current * thread (i.e., RunWhenBlocked is used as a backup policy.) **/ protected class DiscardOldestWhenBlocked implements BlockedExecutionHandler { public boolean blockedAction(Runnable command) throws InterruptedException{ handOff_.poll(0); if (!handOff_.offer(command, 0)) command.run(); return true; } } /** * Set the policy for blocked execution to be to discard the oldest * unhandled request **/ public void discardOldestWhenBlocked() { setBlockedExecutionHandler(new DiscardOldestWhenBlocked()); } /** * Arrange for the given command to be executed by a thread in this * pool. The method normally returns when the command has been * handed off for (possibly later) execution. **/ public void execute(Runnable command) throws InterruptedException { for (;;) { synchronized(this) { if (!shutdown_) { int size = poolSize_; // Ensure minimum number of threads if (size < minimumPoolSize_) { addThread(command); return; } // Try to give to existing thread if (handOff_.offer(command, 0)) { return; } // If cannot handoff and still under maximum, create new thread if (size < maximumPoolSize_) { addThread(command); return; } } } // Cannot hand off and cannot create -- ask for help if (getBlockedExecutionHandler().blockedAction(command)) { return; } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -