📄 线程池实现.txt
字号:
// interrupt idle thread
thread.interrupt();
}
else
{
// request the job is killed;
if (thread instanceof PoolThread)
killJob(thread,((PoolThread)thread).getJob());
else
killJob(thread,null);
}
}
}
Thread.yield();
}
Thread.yield();
if (_threadSet!=null && !_threadSet.isEmpty())
{
_threadSet.clear();
_threadSet=null;
Code.warning("All threads could not be stopped or killed");
}
}
/* ------------------------------------------------------------ */
/** Stop a job.
* Called by stop() to encourage a active job to stop.
* Implementations of this method are under no obligation to
* interrupt active work and the default implementation waits for
* the job to complete.
* The default implementation interrupts inactive PoolThreads.
* @param thread The Thread running the job
* @param job The job, or null if it cannot be determined
*/
protected void stopJob(Thread thread, Object job)
{
if (thread instanceof PoolThread)
{
PoolThread poolThread = (PoolThread)thread;
if (!poolThread.isActive())
{
Log.event("Interrupt inactive "+thread);
thread.interrupt();
return;
}
}
Log.event("Wait for "+thread);
}
/* ------------------------------------------------------------ */
/** Kill a job.
* Called by stop() to finally discard a job that has not stopped.
* Implementations of this method should make all reasonable
* attempts to interrupt the job and free any resources held.
* The default implementation interrupts all threads.
* @param thread The Thread running the job
* @param job The job, or null if it cannot be determined
*/
protected void killJob(Thread thread,Object job)
{
Log.event("Interrupt "+thread);
thread.interrupt();
}
/* ------------------------------------------------------------ */
/* Start a new Thread.
*/
private synchronized void newThread()
throws InvocationTargetException,IllegalAccessException,InstantiationException
{
Runnable runner = new JobRunner();
Object[] args = {runner};
Thread thread=
(Thread)_constructThread.newInstance(args);
thread.setName(_name+"-"+(_threadId++));
_threadSet.add(thread);
thread.start();
}
/* ------------------------------------------------------------ */
/** Join the ThreadPool.
* Wait for all threads to complete.
* @exception java.lang.InterruptedException
*/
final public void join()
throws java.lang.InterruptedException
{
while(_threadSet!=null && _threadSet.size()>0)
{
Thread thread=null;
synchronized(this)
{
Iterator iter=_threadSet.iterator();
if(iter.hasNext())
thread=(Thread)iter.next();
}
if (thread!=null)
thread.join();
}
}
/* ------------------------------------------------------------ */
/** Get a job.
* This method is called by the ThreadPool to get jobs.
* The call blocks until a job is available.
* The default implementation removes jobs from the BlockingQueue
* used by the run(Object) method. Derived implementations of
* ThreadPool may specialize this method to obtain jobs from other
* sources.
* @param idleTimeoutMs The timout to wait for a job.
* @return Job or null if no job available after timeout.
* @exception InterruptedException
* @exception InterruptedIOException
*/
protected Object getJob(int idleTimeoutMs)
throws InterruptedException, InterruptedIOException
{
if (_queue==null || _queueChecks<__nullLockChecks)
{
synchronized(this)
{
if (_queue==null)
_queue=new BlockingQueue(_maxThreads);
_queueChecks++;
}
}
return _queue.get(idleTimeoutMs);
}
/* ------------------------------------------------------------ */
/** Run job.
* Give a job to the pool. The job is passed via a BlockingQueue
* with the same capacity as the ThreadPool.
* @param job. If the job is derived from Runnable, the run method
* is called, otherwise it is passed as the argument to the handle
* method.
*/
public void run(Object job)
throws InterruptedException
{
if (!_running)
throw new IllegalStateException("Not started");
if (job==null)
{
Code.warning("Null Job");
return;
}
if (_queue==null || _queueChecks<2)
{
synchronized(this)
{
if (_queue==null)
_queue=new BlockingQueue(_maxThreads);
_queueChecks++;
}
}
_queue.put(job);
}
/* ------------------------------------------------------------ */
/** Pool Thread run class.
* This class or derivations of it are recommended for use with
* the ThreadPool. The PoolThread allows the threads job to be
* retrieved and active status to be indicated.
*/
public static class PoolThread extends Thread
{
JobRunner _jobRunner;
boolean _active=true;
/* ------------------------------------------------------------ */
public PoolThread(Runnable r)
{
super(r);
_jobRunner=(JobRunner)r;
}
/* ------------------------------------------------------------ */
public String toString()
{
return _jobRunner.toString();
}
/* ------------------------------------------------------------ */
public Object getJob()
{
return _jobRunner.getJob();
}
/* ------------------------------------------------------------ */
/** Set active state.
* @param active
*/
public void setActive(boolean active)
{
_active=active;
}
/* ------------------------------------------------------------ */
/** Is the PoolThread active.
* A PoolThread handling a job, may set it's own active state.
* Implementation of of the ThreadPool.stopJob method should
* attempt to wait for active threads to complete.
* @return True if thread is active.
*/
public boolean isActive()
{
return _active;
}
}
/* ------------------------------------------------------------ */
/** Pool Thread run class.
*/
private class JobRunner
implements Runnable
{
Object _job;
int _runs;
Thread _thread;
String _threadName;
/* ------------------------------------------------------------ */
Object getJob()
{
return _job;
}
/* -------------------------------------------------------- */
/** ThreadPool run.
* Loop getting jobs and handling them until idle or stopped.
*/
public void run()
{
_thread=Thread.currentThread();
_threadName=_thread.getName();
_runs=0;
if (Code.verbose(9))
Code.debug( "Start thread in ", _name );
try{
jobloop:
while(_running)
{
// clear interrupts
Thread.interrupted();
_job=null;
try
{
// increment accepting count
synchronized(ThreadPool.this){_idleSet.add(_thread);}
// wait for a job
_job=ThreadPool.this.getJob(_maxIdleTimeMs);
}
catch(InterruptedException e)
{
Code.ignore(e);
}
catch(InterruptedIOException e)
{
Code.ignore(e);
}
catch(Exception e)
{
Code.warning(e);
}
finally
{
synchronized(ThreadPool.this)
{
_idleSet.remove(_thread);
// If we are still running
if (_running)
{
// If we have a job
if (_job!=null)
{
// If not more threads accepting - start one
if (_idleSet.size()==0 &&
_threadSet.size()<_maxThreads)
{
try{newThread();}
catch(Exception e){Code.warning(e);}
}
}
else
{
// No Job, are we still needed?
if (_threadSet.size()>_minThreads &&
_idleSet.size()>0)
{
if (Code.verbose(99))
Code.debug("Idle death: "+_thread);
break jobloop; // Break from the running loop
}
}
}
}
}
// handle the job
if (_running && _job!=null)
{
try
{
// Tag thread if debugging
if (Code.debug())
{
_thread.setName(_threadName+"/"+_runs++);
if (Code.verbose(99))
Code.debug("Handling ",_job);
}
// handle the job
handle(_job);
}
catch (Exception e)
{
Code.warning(e);
}
finally
{
_job=null;
}
}
}
}
finally
{
synchronized(ThreadPool.this)
{
if (_threadSet!=null)
_threadSet.remove(_thread);
}
if (Code.verbose(9))
Code.debug("Stopped thread in ", _name);
}
}
public String toString()
{
Object j=_job;
return
_threadName+"|"+_runs+"|"+((j==null)?"NoJob":j.toString());
}
}
/* ------------------------------------------------------------ */
/** Get the number of threads in the pool.
* @return Number of threads
* @deprecated use getThreads
*/
public int getSize()
{
if (_threadSet==null)
return 0;
return _threadSet.size();
}
/* ------------------------------------------------------------ */
/** Get the minimum number of threads.
* @return minimum number of threads.
* @deprecated use getMinThreads
*/
public int getMinSize()
{
return _minThreads;
}
/* ------------------------------------------------------------ */
/** Set the minimum number of threads.
* @param minThreads minimum number of threads
* @deprecated use setMinThreads
*/
public void setMinSize(int minThreads)
{
_minThreads=minThreads;
}
/* ------------------------------------------------------------ */
/** Set the maximum number of threads.
* @return maximum number of threads.
* @deprecated use getMaxThreads
*/
public int getMaxSize()
{
return _maxThreads;
}
/* ------------------------------------------------------------ */
/** Set the maximum number of threads.
* @param maxThreads maximum number of threads.
* @deprecated use setMaxThreads
*/
public void setMaxSize(int maxThreads)
{
_maxThreads=maxThreads;
s}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -