📄 queuedthreadpool.java
字号:
/* ------------------------------------------------------------ */ /** * Delegated to the named or anonymous Pool. */ public void setDaemon(boolean daemon) { _daemon=daemon; } /* ------------------------------------------------------------ */ /** * @param lowThreads low resource threads threshhold */ public void setLowThreads(int lowThreads) { _lowThreads = lowThreads; } /* ------------------------------------------------------------ */ /** Set the maximum thread idle time. * Threads that are idle for longer than this period may be * stopped. * Delegated to the named or anonymous Pool. * @see #getMaxIdleTimeMs * @param maxIdleTimeMs Max idle time in ms. */ public void setMaxIdleTimeMs(int maxIdleTimeMs) { _maxIdleTimeMs=maxIdleTimeMs; } /* ------------------------------------------------------------ */ /** Set the maximum number of threads. * Delegated to the named or anonymous Pool. * @see #getMaxThreads * @param maxThreads maximum number of threads. */ public void setMaxThreads(int maxThreads) { if (isStarted() && maxThreads<_minThreads) throw new IllegalArgumentException("!minThreads<maxThreads"); _maxThreads=maxThreads; } /* ------------------------------------------------------------ */ /** Set the minimum number of threads. * Delegated to the named or anonymous Pool. * @see #getMinThreads * @param minThreads minimum number of threads */ public void setMinThreads(int minThreads) { if (isStarted() && (minThreads<=0 || minThreads>_maxThreads)) throw new IllegalArgumentException("!0<=minThreads<maxThreads"); _minThreads=minThreads; synchronized (_threadsLock) { while (isStarted() && _threads.size()<_minThreads) { newThread(); } } } /* ------------------------------------------------------------ */ /** * @param name Name of the BoundedThreadPool to use when naming Threads. */ public void setName(String name) { _name= name; } /* ------------------------------------------------------------ */ /** Set the priority of the pool threads. * @param priority the new thread priority. */ public void setThreadsPriority(int priority) { _priority=priority; } /* ------------------------------------------------------------ */ /* Start the BoundedThreadPool. * Construct the minimum number of threads. */ protected void doStart() throws Exception { if (_maxThreads<_minThreads || _minThreads<=0) throw new IllegalArgumentException("!0<minThreads<maxThreads"); _threads=new HashSet(); _idle=new ArrayList(); _jobs=new Runnable[_maxThreads]; for (int i=0;i<_minThreads;i++) { newThread(); } } /* ------------------------------------------------------------ */ /** Stop the BoundedThreadPool. * New jobs are no longer accepted,idle threads are interrupted * and stopJob is called on active threads. * The method then waits * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to * stop, at which time killJob is called. */ protected void doStop() throws Exception { super.doStop(); long start=System.currentTimeMillis(); for (int i=0;i<100;i++) { synchronized (_threadsLock) { Iterator iter = _threads.iterator(); while (iter.hasNext()) ((Thread)iter.next()).interrupt(); } Thread.yield(); if (_threads.size()==0 || (_maxStopTimeMs>0 && _maxStopTimeMs < (System.currentTimeMillis()-start))) break; try { Thread.sleep(i*100); } catch(InterruptedException e){} } // TODO perhaps force stops if (_threads.size()>0) Log.warn(_threads.size()+" threads could not be stopped"); synchronized (_joinLock) { _joinLock.notifyAll(); } } /* ------------------------------------------------------------ */ protected void newThread() { synchronized (_threadsLock) { if (_threads.size()<_maxThreads) { PoolThread thread =new PoolThread(); _threads.add(thread); thread.setName(thread.hashCode()+"@"+_name+"-"+_id++); thread.start(); } else if (!_warned) { _warned=true; Log.debug("Max threads for {}",this); } } } /* ------------------------------------------------------------ */ /** Stop a Job. * This method is called by the Pool if a job needs to be stopped. * The default implementation does nothing and should be extended by a * derived thread pool class if special action is required. * @param thread The thread allocated to the job, or null if no thread allocated. * @param job The job object passed to run. */ protected void stopJob(Thread thread, Object job) { thread.interrupt(); } /* ------------------------------------------------------------ */ /** Pool Thread class. * The PoolThread allows the threads job to be * retrieved and active status to be indicated. */ public class PoolThread extends Thread { Runnable _job=null; /* ------------------------------------------------------------ */ PoolThread() { setDaemon(_daemon); setPriority(_priority); } /* ------------------------------------------------------------ */ /** BoundedThreadPool run. * Loop getting jobs and handling them until idle or stopped. */ public void run() { boolean idle=false; Runnable job=null; try { while (isRunning()) { // Run any job that we have. if (job!=null) { final Runnable todo=job; job=null; idle=false; todo.run(); } synchronized(_lock) { // is there a queued job? if (_queued>0) { _queued--; job=_jobs[_nextJob++]; if (_nextJob==_jobs.length) _nextJob=0; continue; } // Should we shrink? final int threads=_threads.size(); if (threads>_minThreads && (threads>_maxThreads || _idle.size()>_spawnOrShrinkAt)) { long now = System.currentTimeMillis(); if ((now-_lastShrink)>getMaxIdleTimeMs()) { _lastShrink=now; _idle.remove(this); return; } } if (!idle) { // Add ourselves to the idle set. _idle.add(this); idle=true; } } // We are idle // wait for a dispatched job synchronized (this) { if (_job==null) this.wait(getMaxIdleTimeMs()); job=_job; _job=null; } } } catch (InterruptedException e) { Log.ignore(e); } finally { synchronized (_lock) { _idle.remove(this); } synchronized (_threadsLock) { _threads.remove(this); } synchronized (this) { job=_job; } // we died with a job! reschedule it if (job!=null) { QueuedThreadPool.this.dispatch(job); } } } /* ------------------------------------------------------------ */ void dispatch(Runnable job) { synchronized (this) { _job=job; this.notify(); } } } private class Lock{}}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -