threadpoolexecutor.java
来自「SRI international 发布的OAA框架软件」· Java 代码 · 共 1,576 行 · 第 1/4 页
JAVA
1,576 行
void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
/**
* Create and return a new thread running firstTask as its first
* task. Call only while holding mainLock
* @param firstTask the task the new thread should run first (or
* null if none)
* @return the new thread, or null if threadFactory fails to create thread
*/
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
/**
* Create and start a new thread running firstTask as its first
* task, only if fewer than corePoolSize threads are running.
* @param firstTask the task the new thread should run first (or
* null if none)
* @return true if successful.
*/
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
/**
* Create and start a new thread only if fewer than maximumPoolSize
* threads are running. The new thread runs as its first task the
* next task in queue, or if there is none, the given task.
* @param firstTask the task the new thread should run first (or
* null if none)
* @return null on failure, else the first task to be run by new thread.
*/
private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
Runnable next = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize) {
next = (Runnable)workQueue.poll();
if (next == null)
next = firstTask;
t = addThread(next);
}
} finally {
mainLock.unlock();
}
if (t == null)
return null;
t.start();
return next;
}
/**
* Get the next task for a worker thread to run.
* @return the task
*/
Runnable getTask() {
for (;;) {
try {
switch(runState) {
case RUNNING: {
// untimed wait if core and not allowing core timeout
if (poolSize <= corePoolSize && !allowCoreThreadTimeOut)
return (Runnable)workQueue.take();
long timeout = keepAliveTime;
if (timeout <= 0) // die immediately for 0 timeout
return null;
Runnable r = (Runnable)workQueue.poll(timeout, TimeUnit.NANOSECONDS);
if (r != null)
return r;
if (poolSize > corePoolSize || allowCoreThreadTimeOut)
return null; // timed out
// Else, after timeout, the pool shrank. Retry
break;
}
case SHUTDOWN: {
// Help drain queue
Runnable r = (Runnable)workQueue.poll();
if (r != null)
return r;
// Check if can terminate
if (workQueue.isEmpty()) {
interruptIdleWorkers();
return null;
}
// Else there could still be delayed tasks in queue.
return (Runnable)workQueue.take();
}
case STOP:
return null;
default:
Assert.assert_(false);
}
} catch(InterruptedException ie) {
// On interruption, re-check runstate
}
}
}
/**
* Wake up all threads that might be waiting for tasks.
*/
void interruptIdleWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Iterator w = workers.iterator(); w.hasNext();)
((Worker)w.next()).interruptIfIdle();
} finally {
mainLock.unlock();
}
}
/**
* Perform bookkeeping for a terminated worker thread.
* @param w the worker
*/
void workerDone(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
if (--poolSize > 0)
return;
// Else, this is the last thread. Deal with potential shutdown.
int state = runState;
Assert.assert_(state != TERMINATED);
if (state != STOP) {
// If there are queued tasks but no threads, create
// replacement thread. We must create it initially
// idle to avoid orphaned tasks in case addThread
// fails. This also handles case of delayed tasks
// that will sometime later become runnable.
if (!workQueue.isEmpty()) {
Thread t = addThread(null);
if (t != null)
t.start();
return;
}
// Otherwise, we can exit without replacement
if (state == RUNNING)
return;
}
// Either state is STOP, or state is SHUTDOWN and there is
// no work to do. So we can terminate.
termination.signalAll();
runState = TERMINATED;
// fall through to call terminate() outside of lock.
} finally {
mainLock.unlock();
}
Assert.assert_(runState == TERMINATED);
terminated();
}
/**
* Worker threads
*/
private class Worker implements Runnable {
/**
* The runLock is acquired and released surrounding each task
* execution. It mainly protects against interrupts that are
* intended to cancel the worker thread from instead
* interrupting the task being run.
*/
private final ReentrantLock runLock = new ReentrantLock();
/**
* Initial task to run before entering run loop
*/
private Runnable firstTask;
/**
* Per thread completed task counter; accumulated
* into completedTaskCount upon termination.
*/
volatile long completedTasks;
/**
* Thread this worker is running in. Acts as a final field,
* but cannot be set until thread is created.
*/
Thread thread;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
boolean isActive() {
return runLock.isLocked();
}
/**
* Interrupt thread if not running a task
*/
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {
try {
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
/**
* Interrupt thread even if running a task.
*/
void interruptNow() {
thread.interrupt();
}
/**
* Run a single task between before/after methods.
*/
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
// Abort now if immediate cancel. Otherwise, we have
// committed to run this task.
if (runState == STOP)
return;
Thread.interrupted(); // clear interrupt status on entry
boolean ran = false;
beforeExecute(thread, task);
try {
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch(RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
// Else the exception occurred within
// afterExecute itself in which case we don't
// want to call it again.
throw ex;
}
} finally {
runLock.unlock();
}
}
/**
* Main run loop
*/
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null; // unnecessary but can help GC
}
} finally {
workerDone(this);
}
}
}
// Public methods
/**
* Creates a new <tt>ThreadPoolExecutor</tt> with the given
* initial parameters and default thread factory and handler. It
* may be more convenient to use one of the {@link Executors}
* factory methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the
* pool, even if they are idle.
* @param maximumPoolSize the maximum number of threads to allow in the
* pool.
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the keepAliveTime
* argument.
* @param workQueue the queue to use for holding tasks before they
* are executed. This queue will hold only the <tt>Runnable</tt>
* tasks submitted by the <tt>execute</tt> method.
* @throws IllegalArgumentException if corePoolSize, or
* keepAliveTime less than zero, or if maximumPoolSize less than or
* equal to zero, or if corePoolSize greater than maximumPoolSize.
* @throws NullPointerException if <tt>workQueue</tt> is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/**
* Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the
* pool, even if they are idle.
* @param maximumPoolSize the maximum number of threads to allow in the
* pool.
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the keepAliveTime
* argument.
* @param workQueue the queue to use for holding tasks before they
* are executed. This queue will hold only the <tt>Runnable</tt>
* tasks submitted by the <tt>execute</tt> method.
* @param threadFactory the factory to use when the executor
* creates a new thread.
* @throws IllegalArgumentException if corePoolSize, or
* keepAliveTime less than zero, or if maximumPoolSize less than or
* equal to zero, or if corePoolSize greater than maximumPoolSize.
* @throws NullPointerException if <tt>workQueue</tt>
* or <tt>threadFactory</tt> are null.
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
/**
* Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the
* pool, even if they are idle.
* @param maximumPoolSize the maximum number of threads to allow in the
* pool.
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the keepAliveTime
* argument.
* @param workQueue the queue to use for holding tasks before they
* are executed. This queue will hold only the <tt>Runnable</tt>
* tasks submitted by the <tt>execute</tt> method.
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached.
* @throws IllegalArgumentException if corePoolSize, or
* keepAliveTime less than zero, or if maximumPoolSize less than or
* equal to zero, or if corePoolSize greater than maximumPoolSize.
* @throws NullPointerException if <tt>workQueue</tt>
* or <tt>handler</tt> are null.
*/
public ThreadPoolExecutor(int corePoolSize,
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?