📄 pooledexecutor.java
字号:
* pool.setKeepAliveTime(1000 * 60 * 5);
* pool.abortWhenBlocked();
* pool.createThreads(9);
* </pre>
* <li> An unbounded queue serviced by exactly 5 threads:
* <pre>
* pool = new PooledExecutor(new LinkedQueue());
* pool.setKeepAliveTime(-1); // live forever
* pool.createThreads(5);
* </pre>
* </ol>
*
* <p>
* <b>Usage notes.</b>
* <p>
*
* Pools do not mesh well with using thread-specific storage via
* java.lang.ThreadLocal. ThreadLocal relies on the identity of a
* thread executing a particular task. Pools use the same thread to
* perform different tasks. <p>
*
* If you need a policy not handled by the parameters in this class
* consider writing a subclass. <p>
*
* Version note: Previous versions of this class relied on
* ThreadGroups for aggregate control. This has been removed, and the
* method interruptAll added, to avoid differences in behavior across
* JVMs.
*
* <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
**/
public class PooledExecutor extends ThreadFactoryUser implements Executor {
/**
* The maximum pool size; used if not otherwise specified. Default
* value is essentially infinite (Integer.MAX_VALUE)
**/
public static final int DEFAULT_MAXIMUMPOOLSIZE = Integer.MAX_VALUE;
/**
* The minimum pool size; used if not otherwise specified. Default
* value is 1.
**/
public static final int DEFAULT_MINIMUMPOOLSIZE = 1;
/**
* The maximum time to keep worker threads alive waiting for new
* tasks; used if not otherwise specified. Default value is one
* minute (60000 milliseconds).
**/
public static final long DEFAULT_KEEPALIVETIME = 60 * 1000;
/** The maximum number of threads allowed in pool. **/
protected int maximumPoolSize_ = DEFAULT_MAXIMUMPOOLSIZE;
/** The minumum number of threads to maintain in pool. **/
protected int minimumPoolSize_ = DEFAULT_MINIMUMPOOLSIZE;
/** Current pool size. **/
protected int poolSize_ = 0;
/** The maximum time for an idle thread to wait for new task. **/
protected long keepAliveTime_ = DEFAULT_KEEPALIVETIME;
/**
* Shutdown flag - latches true when a shutdown method is called
* in order to disable queuing/handoffs of new tasks.
**/
protected boolean shutdown_ = false;
/**
* The channel used to hand off the command to a thread in the pool.
**/
protected final Channel handOff_;
/**
* The set of active threads, declared as a map from workers to
* their threads. This is needed by the interruptAll method. It
* may also be useful in subclasses that need to perform other
* thread management chores.
**/
protected final Map threads_;
/** The current handler for unserviceable requests. **/
protected BlockedExecutionHandler blockedExecutionHandler_;
/**
* Create a new pool with all default settings
**/
public PooledExecutor() {
this (new SynchronousChannel(), DEFAULT_MAXIMUMPOOLSIZE);
}
/**
* Create a new pool with all default settings except
* for maximum pool size.
**/
public PooledExecutor(int maxPoolSize) {
this(new SynchronousChannel(), maxPoolSize);
}
/**
* Create a new pool that uses the supplied Channel for queuing, and
* with all default parameter settings.
**/
public PooledExecutor(Channel channel) {
this(channel, DEFAULT_MAXIMUMPOOLSIZE);
}
/**
* Create a new pool that uses the supplied Channel for queuing, and
* with all default parameter settings except for maximum pool size.
**/
public PooledExecutor(Channel channel, int maxPoolSize) {
maximumPoolSize_ = maxPoolSize;
handOff_ = channel;
runWhenBlocked();
threads_ = new HashMap();
}
/**
* Return the maximum number of threads to simultaneously execute
* New unqueued requests will be handled according to the current
* blocking policy once this limit is exceeded.
**/
public synchronized int getMaximumPoolSize() {
return maximumPoolSize_;
}
/**
* Set the maximum number of threads to use. Decreasing the pool
* size will not immediately kill existing threads, but they may
* later die when idle.
* @exception IllegalArgumentException if less or equal to zero.
* (It is
* not considered an error to set the maximum to be less than than
* the minimum. However, in this case there are no guarantees
* about behavior.)
**/
public synchronized void setMaximumPoolSize(int newMaximum) {
if (newMaximum <= 0) throw new IllegalArgumentException();
maximumPoolSize_ = newMaximum;
}
/**
* Return the minimum number of threads to simultaneously execute.
* (Default value is 1). If fewer than the mininum number are
* running upon reception of a new request, a new thread is started
* to handle this request.
**/
public synchronized int getMinimumPoolSize() {
return minimumPoolSize_;
}
/**
* Set the minimum number of threads to use.
* @exception IllegalArgumentException if less than zero. (It is not
* considered an error to set the minimum to be greater than the
* maximum. However, in this case there are no guarantees about
* behavior.)
**/
public synchronized void setMinimumPoolSize(int newMinimum) {
if (newMinimum < 0) throw new IllegalArgumentException();
minimumPoolSize_ = newMinimum;
}
/**
* Return the current number of active threads in the pool. This
* number is just a snaphot, and may change immediately upon
* returning
**/
public synchronized int getPoolSize() {
return poolSize_;
}
/**
* Return the number of milliseconds to keep threads alive waiting
* for new commands. A negative value means to wait forever. A zero
* value means not to wait at all.
**/
public synchronized long getKeepAliveTime() {
return keepAliveTime_;
}
/**
* Set the number of milliseconds to keep threads alive waiting for
* new commands. A negative value means to wait forever. A zero
* value means not to wait at all.
**/
public synchronized void setKeepAliveTime(long msecs) {
keepAliveTime_ = msecs;
}
/** Get the handler for blocked execution **/
public synchronized BlockedExecutionHandler getBlockedExecutionHandler() {
return blockedExecutionHandler_;
}
/** Set the handler for blocked execution **/
public synchronized void setBlockedExecutionHandler(BlockedExecutionHandler h) {
blockedExecutionHandler_ = h;
}
/**
* Create and start a thread to handle a new command. Call only
* when holding lock.
**/
protected void addThread(Runnable command) {
Worker worker = new Worker(command);
Thread thread = getThreadFactory().newThread(worker);
threads_.put(worker, thread);
++poolSize_;
thread.start();
}
/**
* Create and start up to numberOfThreads threads in the pool.
* Return the number created. This may be less than the number
* requested if creating more would exceed maximum pool size bound.
**/
public int createThreads(int numberOfThreads) {
int ncreated = 0;
for (int i = 0; i < numberOfThreads; ++i) {
synchronized(this) {
if (poolSize_ < maximumPoolSize_) {
addThread(null);
++ncreated;
}
else
break;
}
}
return ncreated;
}
/**
* Interrupt all threads in the pool, causing them all to
* terminate. Assuming that executed tasks do not disable (clear)
* interruptions, each thread will terminate after processing its
* current task. Threads will terminate sooner if the executed tasks
* themselves respond to interrupts.
**/
public synchronized void interruptAll() {
for (Iterator it = threads_.values().iterator(); it.hasNext(); ) {
Thread t = (Thread)(it.next());
t.interrupt();
}
}
/**
* Interrupt all threads and disable construction of new
* threads. Any tasks entered after this point will be discarded. A
* shut down pool cannot be restarted.
*/
public void shutdownNow() {
shutdownNow(new DiscardWhenBlocked());
}
/**
* Interrupt all threads and disable construction of new
* threads. Any tasks entered after this point will be handled by
* the given BlockedExecutionHandler. A shut down pool cannot be
* restarted.
*/
public synchronized void shutdownNow(BlockedExecutionHandler handler) {
setBlockedExecutionHandler(handler);
shutdown_ = true; // don't allow new tasks
minimumPoolSize_ = maximumPoolSize_ = 0; // don't make new threads
interruptAll(); // interrupt all existing threads
}
/**
* Terminate threads after processing all elements currently in
* queue. Any tasks entered after this point will be discarded. A
* shut down pool cannot be restarted.
**/
public void shutdownAfterProcessingCurrentlyQueuedTasks() {
shutdownAfterProcessingCurrentlyQueuedTasks(new DiscardWhenBlocked());
}
/**
* Terminate threads after processing all elements currently in
* queue. Any tasks entered after this point will be handled by the
* given BlockedExecutionHandler. A shut down pool cannot be
* restarted.
**/
public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks(BlockedExecutionHandler handler) {
setBlockedExecutionHandler(handler);
shutdown_ = true;
if (poolSize_ == 0) // disable new thread construction when idle
minimumPoolSize_ = maximumPoolSize_ = 0;
}
/**
* Return true if a shutDown method has succeeded in terminating all
* threads.
*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -