📄 simplethreadpool.java
字号:
} /** * <p> * Terminate any worker threads in this thread group. * </p> * * <p> * Jobs currently in progress will complete. * </p> */ public void shutdown() { shutdown(true); } /** * <p> * Terminate any worker threads in this thread group. * </p> * * <p> * Jobs currently in progress will complete. * </p> */ public void shutdown(boolean waitForJobsToComplete) { synchronized (nextRunnableLock) { isShutdown = true; // signal each worker thread to shut down Iterator workerThreads = workers.iterator(); while(workerThreads.hasNext()) { WorkerThread wt = (WorkerThread) workerThreads.next(); wt.shutdown(); availWorkers.remove(wt); } // Give waiting (wait(1000)) worker threads a chance to shut down. // Active worker threads will shut down after finishing their // current job. nextRunnableLock.notifyAll(); if (waitForJobsToComplete == true) { // wait for hand-off in runInThread to complete... while(handoffPending) try { nextRunnableLock.wait(100); } catch(Throwable t) {} // Wait until all worker threads are shut down while (busyWorkers.size() > 0) { WorkerThread wt = (WorkerThread) busyWorkers.getFirst(); try { getLog().debug( "Waiting for thread " + wt.getName() + " to shut down"); // note: with waiting infinite time the // application may appear to 'hang'. nextRunnableLock.wait(2000); } catch (InterruptedException ex) { } } int activeCount = threadGroup.activeCount(); if (activeCount > 0) { getLog().info( "There are still " + activeCount + " worker threads active." + " See javadoc runInThread(Runnable) for a possible explanation"); } getLog().debug("shutdown complete"); } } } /** * <p> * Run the given <code>Runnable</code> object in the next available * <code>Thread</code>. If while waiting the thread pool is asked to * shut down, the Runnable is executed immediately within a new additional * thread. * </p> * * @param runnable * the <code>Runnable</code> to be added. */ public boolean runInThread(Runnable runnable) { if (runnable == null) { return false; } synchronized (nextRunnableLock) { handoffPending = true; // Wait until a worker thread is available while ((availWorkers.size() < 1) && !isShutdown) { try { nextRunnableLock.wait(500); } catch (InterruptedException ignore) { } } if (!isShutdown) { WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable); } else { // If the thread pool is going down, execute the Runnable // within a new additional worker thread (no thread from the pool). WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable); busyWorkers.add(wt); workers.add(wt); wt.start(); } nextRunnableLock.notifyAll(); handoffPending = false; } return true; } public int blockForAvailableThreads() { synchronized(nextRunnableLock) { while((availWorkers.size() < 1 || handoffPending) && !isShutdown) { try { nextRunnableLock.wait(500); } catch (InterruptedException ignore) { } } return availWorkers.size(); } } protected void makeAvailable(WorkerThread wt) { synchronized(nextRunnableLock) { if(!isShutdown) availWorkers.add(wt); busyWorkers.remove(wt); nextRunnableLock.notifyAll(); } } /* * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * * WorkerThread Class. * * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ /** * <p> * A Worker loops, waiting to execute tasks. * </p> */ class WorkerThread extends Thread { // A flag that signals the WorkerThread to terminate. private boolean run = true; private SimpleThreadPool tp; private Runnable runnable = null; /** * <p> * Create a worker thread and start it. Waiting for the next Runnable, * executing it, and waiting for the next Runnable, until the shutdown * flag is set. * </p> */ WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name, int prio, boolean isDaemon) { this(tp, threadGroup, name, prio, isDaemon, null); } /** * <p> * Create a worker thread, start it, execute the runnable and terminate * the thread (one time execution). * </p> */ WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name, int prio, boolean isDaemon, Runnable runnable) { super(threadGroup, name); this.tp = tp; this.runnable = runnable; setPriority(prio); setDaemon(isDaemon); } /** * <p> * Signal the thread that it should terminate. * </p> */ void shutdown() { run = false; // Javadoc mentions that it interrupts blocked I/O operations as // well. Hence the job will most likely fail. I think we should // shut the work thread gracefully, by letting the job finish // uninterrupted. See SimpleThreadPool.shutdown() //interrupt(); } public void run(Runnable newRunnable) { synchronized(this) { if(runnable != null) throw new IllegalStateException("Already running a Runnable!"); runnable = newRunnable; this.notifyAll(); } } /** * <p> * Loop, executing targets as they are received. * </p> */ public void run() { boolean runOnce = (runnable != null); boolean ran = false; while (run) { try { synchronized(this) { while (runnable == null && run) { this.wait(500); } } if (runnable != null) { ran = true; runnable.run(); } } catch (InterruptedException unblock) { // do nothing (loop will terminate if shutdown() was called try { getLog().error("worker threat got 'interrupt'ed.", unblock); } catch(Exception e) { // ignore to help with a tomcat glitch } } catch (Exception exceptionInRunnable) { try { getLog().error("Error while executing the Runnable: ", exceptionInRunnable); } catch(Exception e) { // ignore to help with a tomcat glitch } } finally { runnable = null; // repair the thread in case the runnable mucked it up... if(getPriority() != tp.getThreadPriority()) setPriority(tp.getThreadPriority()); if (runOnce) { run = false; } else if(ran) { ran = false; makeAvailable(this); } } } //if (log.isDebugEnabled()) try { getLog().debug("WorkerThread is shutting down"); } catch(Exception e) { // ignore to help with a tomcat glitch } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -