📄 simplethreadpool.java
字号:
/**
* <p>
* Terminate any worker threads in this thread group.
* </p>
*
* <p>
* Jobs currently in progress will complete.
* </p>
*/
public void shutdown() {
shutdown(true);
}
/**
* <p>
* Terminate any worker threads in this thread group.
* </p>
*
* <p>
* Jobs currently in progress will complete.
* </p>
*/
public void shutdown(boolean waitForJobsToComplete) {
synchronized (nextRunnableLock) {
isShutdown = true;
// signal each worker thread to shut down
Iterator workerThreads = workers.iterator();
while(workerThreads.hasNext()) {
WorkerThread wt = (WorkerThread) workerThreads.next();
wt.shutdown();
availWorkers.remove(wt);
}
// Give waiting (wait(1000)) worker threads a chance to shut down.
// Active worker threads will shut down after finishing their
// current job.
nextRunnableLock.notifyAll();
if (waitForJobsToComplete == true) {
// wait for hand-off in runInThread to complete...
while(handoffPending) {
try { nextRunnableLock.wait(100); } catch(Throwable t) {}
}
// Wait until all worker threads are shut down
while (busyWorkers.size() > 0) {
WorkerThread wt = (WorkerThread) busyWorkers.getFirst();
try {
getLog().debug(
"Waiting for thread " + wt.getName()
+ " to shut down");
// note: with waiting infinite time the
// application may appear to 'hang'.
nextRunnableLock.wait(2000);
} catch (InterruptedException ex) {
}
}
int activeCount = threadGroup.activeCount();
if (activeCount > 0) {
getLog().info(
"There are still " + activeCount + " worker threads active."
+ " See javadoc runInThread(Runnable) for a possible explanation");
}
getLog().debug("shutdown complete");
}
}
}
/**
* <p>
* Run the given <code>Runnable</code> object in the next available
* <code>Thread</code>. If while waiting the thread pool is asked to
* shut down, the Runnable is executed immediately within a new additional
* thread.
* </p>
*
* @param runnable
* the <code>Runnable</code> to be added.
*/
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}
synchronized (nextRunnableLock) {
handoffPending = true;
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the pool).
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}
return true;
}
public int blockForAvailableThreads() {
synchronized(nextRunnableLock) {
while((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
return availWorkers.size();
}
}
protected void makeAvailable(WorkerThread wt) {
synchronized(nextRunnableLock) {
if(!isShutdown) {
availWorkers.add(wt);
}
busyWorkers.remove(wt);
nextRunnableLock.notifyAll();
}
}
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* WorkerThread Class.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
/**
* <p>
* A Worker loops, waiting to execute tasks.
* </p>
*/
class WorkerThread extends Thread {
// A flag that signals the WorkerThread to terminate.
private boolean run = true;
private SimpleThreadPool tp;
private Runnable runnable = null;
/**
* <p>
* Create a worker thread and start it. Waiting for the next Runnable,
* executing it, and waiting for the next Runnable, until the shutdown
* flag is set.
* </p>
*/
WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
int prio, boolean isDaemon) {
this(tp, threadGroup, name, prio, isDaemon, null);
}
/**
* <p>
* Create a worker thread, start it, execute the runnable and terminate
* the thread (one time execution).
* </p>
*/
WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
int prio, boolean isDaemon, Runnable runnable) {
super(threadGroup, name);
this.tp = tp;
this.runnable = runnable;
setPriority(prio);
setDaemon(isDaemon);
}
/**
* <p>
* Signal the thread that it should terminate.
* </p>
*/
void shutdown() {
run = false;
// Javadoc mentions that it interrupts blocked I/O operations as
// well. Hence the job will most likely fail. I think we should
// shut the work thread gracefully, by letting the job finish
// uninterrupted. See SimpleThreadPool.shutdown()
//interrupt();
}
public void run(Runnable newRunnable) {
synchronized(this) {
if(runnable != null) {
throw new IllegalStateException("Already running a Runnable!");
}
runnable = newRunnable;
this.notifyAll();
}
}
/**
* <p>
* Loop, executing targets as they are received.
* </p>
*/
public void run() {
boolean runOnce = (runnable != null);
boolean ran = false;
while (run) {
try {
synchronized(this) {
while (runnable == null && run) {
this.wait(500);
}
}
if (runnable != null) {
ran = true;
runnable.run();
}
} catch (InterruptedException unblock) {
// do nothing (loop will terminate if shutdown() was called
try {
getLog().error("worker threat got 'interrupt'ed.", unblock);
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
} catch (Exception exceptionInRunnable) {
try {
getLog().error("Error while executing the Runnable: ",
exceptionInRunnable);
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
} finally {
runnable = null;
// repair the thread in case the runnable mucked it up...
if(getPriority() != tp.getThreadPriority()) {
setPriority(tp.getThreadPriority());
}
if (runOnce) {
run = false;
} else if(ran) {
ran = false;
makeAvailable(this);
}
}
}
//if (log.isDebugEnabled())
try {
getLog().debug("WorkerThread is shutting down");
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -