📄 线程池实现.txt
字号:
// ========================================================================
// Copyright (c) 1999 Mort Bay Consulting (Australia) Pty. Ltd.
// $Id: ThreadPool.java,v 1.2 2004/02/17 22:27:29 chochos Exp $
// ========================================================================
package org.mortbay.util;
import java.io.InterruptedIOException;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.Iterator;
/* ------------------------------------------------------------ */
/** A pool of threads.
* <p>
* Avoids the expense of thread creation by pooling threads after
* their run methods exit for reuse.
* <p>
* If the maximum pool size is reached, jobs wait for a free thread.
* By default there is no maximum pool size. Idle threads timeout
* and terminate until the minimum number of threads are running.
* <p>
* This implementation uses the run(Object) method to place a
* job on a queue, which is read by the getJob(timeout) method.
* Derived implementations may specialize getJob(timeout) to
* obtain jobs from other sources without queing overheads.
*
* @version $Id: ThreadPool.java,v 1.2 2004/02/17 22:27:29 chochos Exp $
* @author Juancarlo Aez <juancarlo@modelistica.com>
* @author Greg Wilkins <gregw@mortbay.com>
*/
public class ThreadPool
implements LifeCycle, Serializable
{
/* ------------------------------------------------------------ */
/** The number of times a null lock check should synchronize.
*/
public static int __nullLockChecks =
Integer.getInteger("THREADPOOL_NULL_LOCK_CHECKS",2).intValue();
/* ------------------------------------------------------------ */
static int __maxThreads =
Integer.getInteger("THREADPOOL_MAX_THREADS",256).intValue();
static int __minThreads =
Integer.getInteger("THREADPOOL_MIN_THREADS",2).intValue();
static String __threadClass =
System.getProperty("THREADPOOL_THREAD_CLASS");
/* ------------------------------------------------------------------- */
private int _maxThreads = __maxThreads;
private int _minThreads = __minThreads;
private int _maxIdleTimeMs=10000;
private int _maxStopTimeMs=-1;
private String _name;
private String _threadClassName;
private transient Class _threadClass;
private transient Constructor _constructThread;
private transient HashSet _threadSet;
private transient BlockingQueue _queue;
private transient int _queueChecks;
private transient int _threadId=0;
private transient HashSet _idleSet=new HashSet();
private transient boolean _running=false;
/* ------------------------------------------------------------------- */
/* Construct
*/
public ThreadPool()
{
try
{
if (__threadClass!=null)
_threadClass = Loader.loadClass(this.getClass(), __threadClass );
else
_threadClass = PoolThread.class;
Code.debug("Using thread class '", _threadClass.getName(),"'");
}
catch( Exception e )
{
Code.warning( "Invalid thread class (ignored) ",e );
_threadClass = PoolThread.class;
}
setThreadClass(_threadClass);
}
/* ------------------------------------------------------------------- */
/* Construct
* @param name Pool name
*/
public ThreadPool(String name)
{
this();
_name=name;
}
/* ------------------------------------------------------------ */
private void readObject(java.io.ObjectInputStream in)
throws java.io.IOException, ClassNotFoundException
{
in.defaultReadObject();
_idleSet=new HashSet();
if (_threadClass==null || !_threadClass.getName().equals(_threadClassName))
{
try
{
setThreadClass(Loader.loadClass(ThreadPool.class,_threadClassName));
}
catch (Exception e)
{
Code.warning(e);
throw new java.io.InvalidObjectException(e.toString());
}
}
}
/* ------------------------------------------------------------ */
/**
* @return The name of the ThreadPool.
*/
public String getName()
{
return _name;
}
/* ------------------------------------------------------------ */
/**
* @param name Name of the ThreadPool to use when naming Threads.
*/
public void setName(String name)
{
_name=name;
}
/* ------------------------------------------------------------ */
/** Set the Thread class.
* Sets the class used for threads in the thread pool. The class
* must have a constractor taking a Runnable.
* @param threadClas The class
* @exception IllegalStateException If the pool has already
* been started.
*/
public synchronized void setThreadClass(Class threadClass)
throws IllegalStateException
{
Code.debug("setThreadClass("+threadClass+")");
if (_running)
throw new IllegalStateException("Thread Pool Running");
_threadClass=threadClass;
_threadClassName=_threadClass.getName();
if(_threadClass == null ||
!Thread.class.isAssignableFrom( _threadClass ) )
{
Code.warning( "Invalid thread class (ignored) "+
_threadClass.getName() );
_threadClass = PoolThread.class;
}
try
{
Class[] args ={java.lang.Runnable.class};
_constructThread = _threadClass.getConstructor(args);
}
catch(Exception e)
{
Code.warning("Invalid thread class (ignored)",e);
setThreadClass(PoolThread.class);
}
if (_name==null)
{
_name=getClass().getName();
_name=_name.substring(_name.lastIndexOf('.')+1);
}
}
/* ------------------------------------------------------------ */
public Class getThreadClass()
{
return _threadClass;
}
/* ------------------------------------------------------------ */
/** Handle a job.
* Unless the job is an instance of Runnable, then
* this method must be specialized by a derived class.
* @param job The Job to handle. If it implements Runnable,
* this implementation calls run().
*/
protected void handle(Object job)
throws InterruptedException
{
if (job!=null && job instanceof Runnable)
((Runnable)job).run();
else
Code.warning("Invalid job: "+job);
}
/* ------------------------------------------------------------ */
/** Is the pool running jobs.
* @return True if start() has been called.
*/
public boolean isStarted()
{
return _running && _threadSet!=null;
}
/* ------------------------------------------------------------ */
/** Get the number of threads in the pool.
* @return Number of threads
*/
public int getThreads()
{
if (_threadSet==null)
return 0;
return _threadSet.size();
}
/* ------------------------------------------------------------ */
/** Get the number of threads in the pool.
* @return Number of threads
*/
public int getIdleThreads()
{
if (_idleSet==null)
return 0;
return _idleSet.size();
}
/* ------------------------------------------------------------ */
/** Get the minimum number of threads.
* @return minimum number of threads.
*/
public int getMinThreads()
{
return _minThreads;
}
/* ------------------------------------------------------------ */
/** Set the minimum number of threads.
* @param minThreads minimum number of threads
*/
public void setMinThreads(int minThreads)
{
_minThreads=minThreads;
}
/* ------------------------------------------------------------ */
/** Set the maximum number of threads.
* @return maximum number of threads.
*/
public int getMaxThreads()
{
return _maxThreads;
}
/* ------------------------------------------------------------ */
/** Set the maximum number of threads.
* @param maxThreads maximum number of threads.
*/
public void setMaxThreads(int maxThreads)
{
_maxThreads=maxThreads;
}
/* ------------------------------------------------------------ */
/** Get the maximum thread idle time.
* @return Max idle time in ms.
*/
public int getMaxIdleTimeMs()
{
return _maxIdleTimeMs;
}
/* ------------------------------------------------------------ */
/** Set the maximum thread idle time.
* Threads that are idle for longer than this period may be
* stopped.
* @param maxIdleTimeMs Max idle time in ms.
*/
public void setMaxIdleTimeMs(int maxIdleTimeMs)
{
_maxIdleTimeMs=maxIdleTimeMs;
}
/* ------------------------------------------------------------ */
/** Get the maximum thread stop time.
* Threads that do not stop within this time are interrupted and
* then discarded. If <0 the max idle time is used instead.
* @return Max stop time in ms.
*/
public int getMaxStopTimeMs()
{
return _maxStopTimeMs;
}
/* ------------------------------------------------------------ */
/** Set the maximum thread stop time.
* Threads that do not stop within this time are interrupted and
* then discarded. If <=0 the max idle time is used instead.
* @param maxIdleTimeMs Max stop time in ms.
*/
public void setMaxStopTimeMs(int maxStopTimeMs)
{
_maxStopTimeMs=maxStopTimeMs;
}
/* ------------------------------------------------------------ */
/* Start the ThreadPool.
* Construct the minimum number of threads.
*/
synchronized public void start()
throws Exception
{
if (_running)
return;
Code.debug("Start Pool ",_name);
// Start the threads
_running=true;
_threadSet=new HashSet(_maxThreads+_maxThreads/2+13);
for (int i=0;i<_minThreads;i++)
newThread();
}
/* ------------------------------------------------------------ */
/** Stop the ThreadPool.
* New jobs are no longer accepted,idle threads are interrupted
* and stopJob is called on active threads.
* The method then waits
* min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
* stop, at which time killJob is called.
*/
public void stop()
throws InterruptedException
{
Code.debug("Stop ThreadPool ",_name);
_running=false;
// setup timing for stop
long now = System.currentTimeMillis();
long stopped_at = now;
int wait_time = getMaxStopTimeMs();
if (wait_time<0)
wait_time = getMaxIdleTimeMs();
int sleep_time=wait_time/16;
if (sleep_time<=0)
sleep_time=100;
// If we have threads, interrupt or stop them.
if (_threadSet!=null && !_threadSet.isEmpty())
{
synchronized(this)
{
// for all threads
Iterator iter = _threadSet.iterator();
while(iter.hasNext())
{
Thread thread=(Thread)iter.next();
if (_idleSet.contains(thread))
{
// interrupt idle thread
thread.interrupt();
}
else
{
// request the job is stopped;
if (thread instanceof PoolThread)
stopJob(thread,((PoolThread)thread).getJob());
else
stopJob(thread,null);
}
Thread.yield();
}
}
}
// While we still have some threads and have not exceeded our
// wait time.
while (_threadSet!=null && !_threadSet.isEmpty() && now-stopped_at<=wait_time)
{
// wait for jobs to end, with backing off timer
if (sleep_time>2000)
Log.event("Stop waiting "+(sleep_time+999)/1000+"s ");
Thread.sleep(sleep_time);
now=System.currentTimeMillis();
sleep_time*=2;
if (now-stopped_at<sleep_time)
sleep_time=(int)(now-stopped_at);
}
// If we STILL have threads, interrupt or kill them.
if (_threadSet!=null && !_threadSet.isEmpty())
{
synchronized(this)
{
// for all threads
Iterator iter = _threadSet.iterator();
while(iter.hasNext())
{
Thread thread=(Thread)iter.next();
if (_idleSet.contains(thread))
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -