📄 threadpool.java
字号:
* down the pool and nothing should stop that. */ log.error("Ignored exception while shutting down thread pool", t); } } currentThreadsBusy = currentThreadCount = 0; pool = null; notifyAll(); } } /** * Called by the monitor thread to harvest idle threads. */ protected synchronized void checkSpareControllers() { if(stopThePool) { return; } if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) { int toFree = currentThreadCount - currentThreadsBusy - maxSpareThreads; for(int i = 0 ; i < toFree ; i++) { ControlRunnable c = pool[currentThreadCount - currentThreadsBusy - 1]; c.terminate(); pool[currentThreadCount - currentThreadsBusy - 1] = null; currentThreadCount --; } } } /** * Returns the thread to the pool. * Called by threads as they are becoming idel. */ protected synchronized void returnController(ControlRunnable c) { if(0 == currentThreadCount || stopThePool) { c.terminate(); return; } // atomic currentThreadsBusy--; pool[currentThreadCount - currentThreadsBusy - 1] = c; notify(); } /** * Inform the pool that the specific thread finish. * * Called by the ControlRunnable.run() when the runnable * throws an exception. */ protected synchronized void notifyThreadEnd(ControlRunnable c) { currentThreadsBusy--; currentThreadCount --; notify(); } /* * Checks for problematic configuration and fix it. * The fix provides reasonable settings for a single CPU * with medium load. */ protected void adjustLimits() { if(maxThreads <= 0) { maxThreads = MAX_THREADS; } if(maxSpareThreads >= maxThreads) { maxSpareThreads = maxThreads; } if(maxSpareThreads <= 0) { if(1 == maxThreads) { maxSpareThreads = 1; } else { maxSpareThreads = maxThreads/2; } } if(minSpareThreads > maxSpareThreads) { minSpareThreads = maxSpareThreads; } if(minSpareThreads <= 0) { if(1 == maxSpareThreads) { minSpareThreads = 1; } else { minSpareThreads = maxSpareThreads/2; } } } /** Create missing threads. * * @param toOpen Total number of threads we'll have open */ protected void openThreads(int toOpen) { if(toOpen > maxThreads) { toOpen = maxThreads; } for(int i = currentThreadCount ; i < toOpen ; i++) { pool[i - currentThreadsBusy] = new ControlRunnable(this); } currentThreadCount = toOpen; } /** @deprecated */ void log( String s ) { log.info(s); //loghelper.flush(); } /** * Periodically execute an action - cleanup in this case */ public static class MonitorRunnable implements Runnable { ThreadPool p; Thread t; int interval=WORK_WAIT_TIMEOUT; boolean shouldTerminate; MonitorRunnable(ThreadPool p) { this.p=p; this.start(); } public void start() { shouldTerminate = false; t = new Thread(this); t.setDaemon(p.getDaemon() ); t.setName( "MonitorRunnable" ); t.start(); } public void setInterval(int i ) { this.interval=i; } public void run() { while(true) { try { // Sleep for a while. synchronized(this) { this.wait(WORK_WAIT_TIMEOUT); } // Check if should terminate. // termination happens when the pool is shutting down. if(shouldTerminate) { break; } // Harvest idle threads. p.checkSpareControllers(); } catch(Throwable t) { ThreadPool.log.error("Unexpected exception", t); } } } public void stop() { this.terminate(); } /** Stop the monitor */ public synchronized void terminate() { shouldTerminate = true; this.notify(); } } /** * A Thread object that executes various actions ( ThreadPoolRunnable ) * under control of ThreadPool */ public static class ControlRunnable implements Runnable { /** * ThreadPool where this thread will be returned */ ThreadPool p; /** * The thread that executes the actions */ Thread t; /** * The method that is executed in this thread */ ThreadPoolRunnable toRun; /** * Stop this thread */ boolean shouldTerminate; /** * Activate the execution of the action */ boolean shouldRun; /** * Per thread data - can be used only if all actions are * of the same type. * A better mechanism is possible ( that would allow association of * thread data with action type ), but right now it's enough. */ boolean noThData; Object thData[]=null; /** * Start a new thread, with no method in it */ ControlRunnable(ThreadPool p) { toRun = null; shouldTerminate = false; shouldRun = false; this.p = p; t = new ThreadWithAttributes(p, this); t.setDaemon(true); t.start(); p.addThread( t, this ); noThData=true; thData=null; } public void run() { try { while(true) { try { /* Wait for work. */ synchronized(this) { if(!shouldRun && !shouldTerminate) { this.wait(); } } if(toRun == null ) { if( p.log.isDebugEnabled()) p.log.debug( "No toRun ???"); } if( shouldTerminate ) { if( p.log.isDebugEnabled()) p.log.debug( "Terminate"); break; } /* Check if should execute a runnable. */ try { if(noThData) { if(p.log.isDebugEnabled()) p.log.debug( "Getting new thread data"); thData=toRun.getInitData(); noThData = false; } if(shouldRun) { toRun.runIt(thData); } } catch(Throwable t) { p.log.error("Caught exception executing " + toRun.toString() + ", terminating thread", t); /* * The runnable throw an exception (can be even a ThreadDeath), * signalling that the thread die. * * The meaning is that we should release the thread from * the pool. */ shouldTerminate = true; shouldRun = false; p.notifyThreadEnd(this); } finally { if(shouldRun) { shouldRun = false; /* * Notify the pool that the thread is now idle. */ p.returnController(this); } } /* * Check if should terminate. * termination happens when the pool is shutting down. */ if(shouldTerminate) { break; } } catch(InterruptedException ie) { /* for the wait operation */ // can never happen, since we don't call interrupt p.log.error("Unexpected exception", ie); } } } finally { p.removeThread(Thread.currentThread()); } } /** Run a task * * @param toRun */ public synchronized void runIt(ThreadPoolRunnable toRun) { this.toRun = toRun; // Do not re-init, the whole idea is to run init only once per // thread - the pool is supposed to run a single task, that is // initialized once. // noThData = true; shouldRun = true; this.notify(); } public void stop() { this.terminate(); } public void kill() { t.stop(); } public synchronized void terminate() { shouldTerminate = true; this.notify(); } } /** Interface to allow applications to be notified when * a threads are created and stopped. */ public static interface ThreadPoolListener { public void threadStart( ThreadPool tp, Thread t); public void threadEnd( ThreadPool tp, Thread t); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -