📄 pooledexecutor.java
字号:
public synchronized boolean isTerminatedAfterShutdown() {
return shutdown_ && poolSize_ == 0;
}
/**
* Wait for a shutdown pool to fully terminate, or until the timeout
* has expired. This method may only be called <em>after</em>
* invoking shutdownNow or
* shutdownAfterProcessingCurrentlyQueuedTasks.
*
* @param maxWaitTime the maximum time in milliseconds to wait
* @return true if the pool has terminated within the max wait period
* @exception IllegalStateException if shutdown has not been requested
* @exception InterruptedException if the current thread has been interrupted in the course of waiting
*/
public synchronized boolean awaitTerminationAfterShutdown(long maxWaitTime) throws InterruptedException {
if (!shutdown_)
throw new IllegalStateException();
if (poolSize_ == 0)
return true;
long waitTime = maxWaitTime;
if (waitTime <= 0)
return false;
long start = System.currentTimeMillis();
for (;;) {
wait(waitTime);
if (poolSize_ == 0)
return true;
waitTime = maxWaitTime - (System.currentTimeMillis() - start);
if (waitTime <= 0)
return false;
}
}
/**
* Wait for a shutdown pool to fully terminate. This method may
* only be called <em>after</em> invoking shutdownNow or
* shutdownAfterProcessingCurrentlyQueuedTasks.
*
* @exception IllegalStateException if shutdown has not been requested
* @exception InterruptedException if the current thread has been interrupted in the course of waiting
*/
public synchronized void awaitTerminationAfterShutdown() throws InterruptedException {
if (!shutdown_)
throw new IllegalStateException();
while (poolSize_ > 0)
wait();
}
/**
* Remove all unprocessed tasks from pool queue, and return them in
* a java.util.List. Thsi method should be used only when there are
* not any active clients of the pool. Otherwise you face the
* possibility that the method will loop pulling out tasks as
* clients are putting them in. This method can be useful after
* shutting down a pool (via shutdownNow) to determine whether there
* are any pending tasks that were not processed. You can then, for
* example execute all unprocessed commands via code along the lines
* of:
*
* <pre>
* List tasks = pool.drain();
* for (Iterator it = tasks.iterator(); it.hasNext();)
* ( (Runnable)(it.next()) ).run();
* </pre>
**/
public List drain() {
boolean wasInterrupted = false;
Vector tasks = new Vector();
for (;;) {
try {
Object x = handOff_.poll(0);
if (x == null)
break;
else
tasks.addElement(x);
}
catch (InterruptedException ex) {
wasInterrupted = true; // postpone re-interrupt until drained
}
}
if (wasInterrupted) Thread.currentThread().interrupt();
return tasks;
}
/**
* Cleanup method called upon termination of worker thread.
**/
protected synchronized void workerDone(Worker w) {
threads_.remove(w);
if (--poolSize_ == 0 && shutdown_) {
maximumPoolSize_ = minimumPoolSize_ = 0; // disable new threads
notifyAll(); // notify awaitTerminationAfterShutdown
}
// Create a replacement if needed
if (poolSize_ == 0 || poolSize_ < minimumPoolSize_) {
try {
Runnable r = (Runnable)(handOff_.poll(0));
if (r != null && !shutdown_) // just consume task if shut down
addThread(r);
} catch(InterruptedException ie) {
return;
}
}
}
/**
* Get a task from the handoff queue, or null if shutting down.
**/
protected Runnable getTask() throws InterruptedException {
long waitTime;
synchronized(this) {
if (poolSize_ > maximumPoolSize_) // Cause to die if too many threads
return null;
waitTime = (shutdown_)? 0 : keepAliveTime_;
}
if (waitTime >= 0)
return (Runnable)(handOff_.poll(waitTime));
else
return (Runnable)(handOff_.take());
}
/**
* Class defining the basic run loop for pooled threads.
**/
protected class Worker implements Runnable {
protected Runnable firstTask_;
protected Worker(Runnable firstTask) { firstTask_ = firstTask; }
public void run() {
try {
Runnable task = firstTask_;
firstTask_ = null; // enable GC
if (task != null) {
task.run();
task = null;
}
while ( (task = getTask()) != null) {
task.run();
task = null;
}
}
catch (InterruptedException ex) { } // fall through
finally {
workerDone(this);
}
}
}
/**
* Class for actions to take when execute() blocks. Uses Strategy
* pattern to represent different actions. You can add more in
* subclasses, and/or create subclasses of these. If so, you will
* also want to add or modify the corresponding methods that set the
* current blockedExectionHandler_.
**/
public interface BlockedExecutionHandler {
/**
* Return true if successfully handled so, execute should
* terminate; else return false if execute loop should be retried.
**/
boolean blockedAction(Runnable command) throws InterruptedException;
}
/** Class defining Run action. **/
protected class RunWhenBlocked implements BlockedExecutionHandler {
public boolean blockedAction(Runnable command) {
command.run();
return true;
}
}
/**
* Set the policy for blocked execution to be that the current
* thread executes the command if there are no available threads in
* the pool.
**/
public void runWhenBlocked() {
setBlockedExecutionHandler(new RunWhenBlocked());
}
/** Class defining Wait action. **/
protected class WaitWhenBlocked implements BlockedExecutionHandler {
public boolean blockedAction(Runnable command) throws InterruptedException{
synchronized(PooledExecutor.this) {
if (shutdown_)
return true;
}
handOff_.put(command);
return true;
}
}
/**
* Set the policy for blocked execution to be to wait until a thread
* is available, unless the pool has been shut down, in which case
* the action is discarded.
**/
public void waitWhenBlocked() {
setBlockedExecutionHandler(new WaitWhenBlocked());
}
/** Class defining Discard action. **/
protected class DiscardWhenBlocked implements BlockedExecutionHandler {
public boolean blockedAction(Runnable command) {
return true;
}
}
/**
* Set the policy for blocked execution to be to return without
* executing the request.
**/
public void discardWhenBlocked() {
setBlockedExecutionHandler(new DiscardWhenBlocked());
}
/** Class defining Abort action. **/
protected class AbortWhenBlocked implements BlockedExecutionHandler {
public boolean blockedAction(Runnable command) {
throw new RuntimeException("Pool is blocked");
}
}
/**
* Set the policy for blocked execution to be to
* throw a RuntimeException.
**/
public void abortWhenBlocked() {
setBlockedExecutionHandler(new AbortWhenBlocked());
}
/**
* Class defining DiscardOldest action. Under this policy, at most
* one old unhandled task is discarded. If the new task can then be
* handed off, it is. Otherwise, the new task is run in the current
* thread (i.e., RunWhenBlocked is used as a backup policy.)
**/
protected class DiscardOldestWhenBlocked implements BlockedExecutionHandler {
public boolean blockedAction(Runnable command) throws InterruptedException{
handOff_.poll(0);
if (!handOff_.offer(command, 0))
command.run();
return true;
}
}
/**
* Set the policy for blocked execution to be to discard the oldest
* unhandled request
**/
public void discardOldestWhenBlocked() {
setBlockedExecutionHandler(new DiscardOldestWhenBlocked());
}
/**
* Arrange for the given command to be executed by a thread in this
* pool. The method normally returns when the command has been
* handed off for (possibly later) execution.
**/
public void execute(Runnable command) throws InterruptedException {
for (;;) {
synchronized(this) {
if (!shutdown_) {
int size = poolSize_;
// Ensure minimum number of threads
if (size < minimumPoolSize_) {
addThread(command);
return;
}
// Try to give to existing thread
if (handOff_.offer(command, 0)) {
return;
}
// If cannot handoff and still under maximum, create new thread
if (size < maximumPoolSize_) {
addThread(command);
return;
}
}
}
// Cannot hand off and cannot create -- ask for help
if (getBlockedExecutionHandler().blockedAction(command)) {
return;
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -