📄 pooledexecutor.java
字号:
* pool = new PooledExecutor(new BoundedBuffer(10), 100); * pool.setMinimumPoolSize(4); * pool.setKeepAliveTime(1000 * 60 * 5); * pool.waitWhenBlocked(); * pool.createThreads(9); * </pre> * <li> An unbounded queue serviced by exactly 5 threads: * <pre> * pool = new PooledExecutor(new LinkedQueue()); * pool.setKeepAliveTime(-1); // live forever * pool.createThreads(5); * </pre> * </ol> * * <p> * <b>Usage notes.</b> * <p> * * Pools do not mesh well with using thread-specific storage via * java.lang.ThreadLocal. ThreadLocal relies on the identity of a * thread executing a particular task. Pools use the same thread to * perform different tasks. <p> * * If you need a policy not handled by the parameters in this class * consider writing a subclass. <p> * * Version note: Previous versions of this class relied on * ThreadGroups for aggregate control. This has been removed, and the * method interruptAll added, to avoid differences in behavior across * JVMs. * * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] **/public class PooledExecutor extends ThreadFactoryUser implements Executor { /** * The maximum pool size; used if not otherwise specified. Default * value is essentially infinite (Integer.MAX_VALUE) **/ public static final int DEFAULT_MAXIMUMPOOLSIZE = Integer.MAX_VALUE; /** * The minimum pool size; used if not otherwise specified. Default * value is 1. **/ public static final int DEFAULT_MINIMUMPOOLSIZE = 1; /** * The maximum time to keep worker threads alive waiting for new * tasks; used if not otherwise specified. Default value is one * minute (60000 milliseconds). **/ public static final long DEFAULT_KEEPALIVETIME = 60 * 1000; /** The maximum number of threads allowed in pool. **/ protected int maximumPoolSize_ = DEFAULT_MAXIMUMPOOLSIZE; /** The minumum number of threads to maintain in pool. **/ protected int minimumPoolSize_ = DEFAULT_MINIMUMPOOLSIZE; /** Current pool size. **/ protected int poolSize_ = 0; /** The maximum time for an idle thread to wait for new task. **/ protected long keepAliveTime_ = DEFAULT_KEEPALIVETIME; /** * Shutdown flag - latches true when a shutdown method is called * in order to disable queuing/handoffs of new tasks. **/ protected boolean shutdown_ = false; /** * The channel used to hand off the command to a thread in the pool. **/ protected final Channel handOff_; /** * The set of active threads, declared as a map from workers to * their threads. This is needed by the interruptAll method. It * may also be useful in subclasses that need to perform other * thread management chores. **/ protected final Map threads_; /** The current handler for unserviceable requests. **/ protected BlockedExecutionHandler blockedExecutionHandler_; /** * Create a new pool with all default settings **/ public PooledExecutor() { this (new SynchronousChannel(), DEFAULT_MAXIMUMPOOLSIZE); } /** * Create a new pool with all default settings except * for maximum pool size. **/ public PooledExecutor(int maxPoolSize) { this(new SynchronousChannel(), maxPoolSize); } /** * Create a new pool that uses the supplied Channel for queuing, and * with all default parameter settings. **/ public PooledExecutor(Channel channel) { this(channel, DEFAULT_MAXIMUMPOOLSIZE); } /** * Create a new pool that uses the supplied Channel for queuing, and * with all default parameter settings except for maximum pool size. **/ public PooledExecutor(Channel channel, int maxPoolSize) { maximumPoolSize_ = maxPoolSize; handOff_ = channel; runWhenBlocked(); threads_ = new HashMap(); } /** * Return the maximum number of threads to simultaneously execute * New unqueued requests will be handled according to the current * blocking policy once this limit is exceeded. **/ public synchronized int getMaximumPoolSize() { return maximumPoolSize_; } /** * Set the maximum number of threads to use. Decreasing the pool * size will not immediately kill existing threads, but they may * later die when idle. * @exception IllegalArgumentException if less or equal to zero. * (It is * not considered an error to set the maximum to be less than than * the minimum. However, in this case there are no guarantees * about behavior.) **/ public synchronized void setMaximumPoolSize(int newMaximum) { if (newMaximum <= 0) throw new IllegalArgumentException(); maximumPoolSize_ = newMaximum; } /** * Return the minimum number of threads to simultaneously execute. * (Default value is 1). If fewer than the mininum number are * running upon reception of a new request, a new thread is started * to handle this request. **/ public synchronized int getMinimumPoolSize() { return minimumPoolSize_; } /** * Set the minimum number of threads to use. * @exception IllegalArgumentException if less than zero. (It is not * considered an error to set the minimum to be greater than the * maximum. However, in this case there are no guarantees about * behavior.) **/ public synchronized void setMinimumPoolSize(int newMinimum) { if (newMinimum < 0) throw new IllegalArgumentException(); minimumPoolSize_ = newMinimum; } /** * Return the current number of active threads in the pool. This * number is just a snaphot, and may change immediately upon * returning **/ public synchronized int getPoolSize() { return poolSize_; } /** * Return the number of milliseconds to keep threads alive waiting * for new commands. A negative value means to wait forever. A zero * value means not to wait at all. **/ public synchronized long getKeepAliveTime() { return keepAliveTime_; } /** * Set the number of milliseconds to keep threads alive waiting for * new commands. A negative value means to wait forever. A zero * value means not to wait at all. **/ public synchronized void setKeepAliveTime(long msecs) { keepAliveTime_ = msecs; } /** Get the handler for blocked execution **/ public synchronized BlockedExecutionHandler getBlockedExecutionHandler() { return blockedExecutionHandler_; } /** Set the handler for blocked execution **/ public synchronized void setBlockedExecutionHandler(BlockedExecutionHandler h) { blockedExecutionHandler_ = h; } /** * Create and start a thread to handle a new command. Call only * when holding lock. **/ protected void addThread(Runnable command) { Worker worker = new Worker(command); Thread thread = getThreadFactory().newThread(worker); threads_.put(worker, thread); ++poolSize_; thread.start(); } /** * Create and start up to numberOfThreads threads in the pool. * Return the number created. This may be less than the number * requested if creating more would exceed maximum pool size bound. **/ public int createThreads(int numberOfThreads) { int ncreated = 0; for (int i = 0; i < numberOfThreads; ++i) { synchronized(this) { if (poolSize_ < maximumPoolSize_) { addThread(null); ++ncreated; } else break; } } return ncreated; } /** * Interrupt all threads in the pool, causing them all to * terminate. Assuming that executed tasks do not disable (clear) * interruptions, each thread will terminate after processing its * current task. Threads will terminate sooner if the executed tasks * themselves respond to interrupts. **/ public synchronized void interruptAll() { for (Iterator it = threads_.values().iterator(); it.hasNext(); ) { Thread t = (Thread)(it.next()); t.interrupt(); } } /** * Interrupt all threads and disable construction of new * threads. Any tasks entered after this point will be discarded. A * shut down pool cannot be restarted. */ public void shutdownNow() { shutdownNow(new DiscardWhenBlocked()); } /** * Interrupt all threads and disable construction of new * threads. Any tasks entered after this point will be handled by * the given BlockedExecutionHandler. A shut down pool cannot be * restarted. */ public synchronized void shutdownNow(BlockedExecutionHandler handler) { setBlockedExecutionHandler(handler); shutdown_ = true; // don't allow new tasks minimumPoolSize_ = maximumPoolSize_ = 0; // don't make new threads interruptAll(); // interrupt all existing threads } /** * Terminate threads after processing all elements currently in * queue. Any tasks entered after this point will be discarded. A * shut down pool cannot be restarted. **/ public void shutdownAfterProcessingCurrentlyQueuedTasks() { shutdownAfterProcessingCurrentlyQueuedTasks(new DiscardWhenBlocked()); } /** * Terminate threads after processing all elements currently in * queue. Any tasks entered after this point will be handled by the * given BlockedExecutionHandler. A shut down pool cannot be * restarted. **/ public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks(BlockedExecutionHandler handler) { setBlockedExecutionHandler(handler);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -