📄 threadpool.java
字号:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tomcat.util.threads;
import java.util.*;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.res.StringManager;
/**
* A thread pool that is trying to copy the apache process management.
*
* Should we remove this in favor of Doug Lea's thread package?
*
* @author Gal Shachor
* @author Yoav Shapira <yoavs@apache.org>
*/
public class ThreadPool {
private static Log log = LogFactory.getLog(ThreadPool.class);
private static StringManager sm =
StringManager.getManager("org.apache.tomcat.util.threads.res");
private static boolean logfull=true;
/*
* Default values ...
*/
public static final int MAX_THREADS = 200;
public static final int MAX_THREADS_MIN = 10;
public static final int MAX_SPARE_THREADS = 50;
public static final int MIN_SPARE_THREADS = 4;
public static final int WORK_WAIT_TIMEOUT = 60*1000;
/*
* Where the threads are held.
*/
protected ControlRunnable[] pool = null;
/*
* A monitor thread that monitors the pool for idel threads.
*/
protected MonitorRunnable monitor;
/*
* Max number of threads that you can open in the pool.
*/
protected int maxThreads;
/*
* Min number of idel threads that you can leave in the pool.
*/
protected int minSpareThreads;
/*
* Max number of idel threads that you can leave in the pool.
*/
protected int maxSpareThreads;
/*
* Number of threads in the pool.
*/
protected int currentThreadCount;
/*
* Number of busy threads in the pool.
*/
protected int currentThreadsBusy;
/*
* Flag that the pool should terminate all the threads and stop.
*/
protected boolean stopThePool;
/* Flag to control if the main thread is 'daemon' */
protected boolean isDaemon=true;
/** The threads that are part of the pool.
* Key is Thread, value is the ControlRunnable
*/
protected Hashtable threads=new Hashtable();
protected Vector listeners=new Vector();
/** Name of the threadpool
*/
protected String name = "TP";
/**
* Sequence.
*/
protected int sequence = 1;
/**
* Thread priority.
*/
protected int threadPriority = Thread.NORM_PRIORITY;
/**
* Constructor.
*/
public ThreadPool() {
maxThreads = MAX_THREADS;
maxSpareThreads = MAX_SPARE_THREADS;
minSpareThreads = MIN_SPARE_THREADS;
currentThreadCount = 0;
currentThreadsBusy = 0;
stopThePool = false;
}
/** Create a ThreadPool instance.
*
* @param jmx UNUSED
* @return ThreadPool instance. If JMX support is requested, you need to
* call register() in order to set a name.
*/
public static ThreadPool createThreadPool(boolean jmx) {
return new ThreadPool();
}
public synchronized void start() {
stopThePool=false;
currentThreadCount = 0;
currentThreadsBusy = 0;
adjustLimits();
pool = new ControlRunnable[maxThreads];
openThreads(minSpareThreads);
if (maxSpareThreads < maxThreads) {
monitor = new MonitorRunnable(this);
}
}
public MonitorRunnable getMonitor() {
return monitor;
}
/**
* Sets the thread priority for current
* and future threads in this pool.
*
* @param threadPriority The new priority
* @throws IllegalArgumentException If the specified
* priority is less than Thread.MIN_PRIORITY or
* more than Thread.MAX_PRIORITY
*/
public synchronized void setThreadPriority(int threadPriority) {
if(log.isDebugEnabled())
log.debug(getClass().getName() +
": setPriority(" + threadPriority + "): here.");
if (threadPriority < Thread.MIN_PRIORITY) {
throw new IllegalArgumentException("new priority < MIN_PRIORITY");
} else if (threadPriority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException("new priority > MAX_PRIORITY");
}
// Set for future threads
this.threadPriority = threadPriority;
Enumeration currentThreads = getThreads();
Thread t = null;
while(currentThreads.hasMoreElements()) {
t = (Thread) currentThreads.nextElement();
t.setPriority(threadPriority);
}
}
/**
* Returns the priority level of current and
* future threads in this pool.
*
* @return The priority
*/
public int getThreadPriority() {
return threadPriority;
}
public void setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
}
public int getMaxThreads() {
return maxThreads;
}
public void setMinSpareThreads(int minSpareThreads) {
this.minSpareThreads = minSpareThreads;
}
public int getMinSpareThreads() {
return minSpareThreads;
}
public void setMaxSpareThreads(int maxSpareThreads) {
this.maxSpareThreads = maxSpareThreads;
}
public int getMaxSpareThreads() {
return maxSpareThreads;
}
public int getCurrentThreadCount() {
return currentThreadCount;
}
public int getCurrentThreadsBusy() {
return currentThreadsBusy;
}
public boolean isDaemon() {
return isDaemon;
}
public static int getDebug() {
return 0;
}
/** The default is true - the created threads will be
* in daemon mode. If set to false, the control thread
* will not be daemon - and will keep the process alive.
*/
public void setDaemon( boolean b ) {
isDaemon=b;
}
public boolean getDaemon() {
return isDaemon;
}
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
public int getSequence() {
return sequence++;
}
public void addThread( Thread t, ControlRunnable cr ) {
threads.put( t, cr );
for( int i=0; i<listeners.size(); i++ ) {
ThreadPoolListener tpl=(ThreadPoolListener)listeners.elementAt(i);
tpl.threadStart(this, t);
}
}
public void removeThread( Thread t ) {
threads.remove(t);
for( int i=0; i<listeners.size(); i++ ) {
ThreadPoolListener tpl=(ThreadPoolListener)listeners.elementAt(i);
tpl.threadEnd(this, t);
}
}
public void addThreadPoolListener( ThreadPoolListener tpl ) {
listeners.addElement( tpl );
}
public Enumeration getThreads(){
return threads.keys();
}
public void run(Runnable r) {
ControlRunnable c = findControlRunnable();
c.runIt(r);
}
//
// You may wonder what you see here ... basically I am trying
// to maintain a stack of threads. This way locality in time
// is kept and there is a better chance to find residues of the
// thread in memory next time it runs.
//
/**
* Executes a given Runnable on a thread in the pool, block if needed.
*/
public void runIt(ThreadPoolRunnable r) {
if(null == r) {
throw new NullPointerException();
}
ControlRunnable c = findControlRunnable();
c.runIt(r);
}
private ControlRunnable findControlRunnable() {
ControlRunnable c=null;
if ( stopThePool ) {
throw new IllegalStateException();
}
// Obtain a free thread from the pool.
synchronized(this) {
while (currentThreadsBusy == currentThreadCount) {
// All threads are busy
if (currentThreadCount < maxThreads) {
// Not all threads were open,
// Open new threads up to the max number of idel threads
int toOpen = currentThreadCount + minSpareThreads;
openThreads(toOpen);
} else {
logFull(log, currentThreadCount, maxThreads);
// Wait for a thread to become idel.
try {
this.wait();
}
// was just catch Throwable -- but no other
// exceptions can be thrown by wait, right?
// So we catch and ignore this one, since
// it'll never actually happen, since nowhere
// do we say pool.interrupt().
catch(InterruptedException e) {
log.error("Unexpected exception", e);
}
if( log.isDebugEnabled() ) {
log.debug("Finished waiting: CTC="+currentThreadCount +
", CTB=" + currentThreadsBusy);
}
// Pool was stopped. Get away of the pool.
if( stopThePool) {
break;
}
}
}
// Pool was stopped. Get away of the pool.
if(0 == currentThreadCount || stopThePool) {
throw new IllegalStateException();
}
// If we are here it means that there is a free thread. Take it.
int pos = currentThreadCount - currentThreadsBusy - 1;
c = pool[pos];
pool[pos] = null;
currentThreadsBusy++;
}
return c;
}
private static void logFull(Log loghelper, int currentThreadCount,
int maxThreads) {
if( logfull ) {
log.error(sm.getString("threadpool.busy",
new Integer(currentThreadCount),
new Integer(maxThreads)));
logfull=false;
} else if( log.isDebugEnabled() ) {
log.debug("All threads are busy " + currentThreadCount + " " +
maxThreads );
}
}
/**
* Stop the thread pool
*/
public synchronized void shutdown() {
if(!stopThePool) {
stopThePool = true;
if (monitor != null) {
monitor.terminate();
monitor = null;
}
for(int i = 0; i < currentThreadCount - currentThreadsBusy; i++) {
try {
pool[i].terminate();
} catch(Throwable t) {
/*
* Do nothing... The show must go on, we are shutting
* down the pool and nothing should stop that.
*/
log.error("Ignored exception while shutting down thread pool", t);
}
}
currentThreadsBusy = currentThreadCount = 0;
pool = null;
notifyAll();
}
}
/**
* Called by the monitor thread to harvest idle threads.
*/
protected synchronized void checkSpareControllers() {
if(stopThePool) {
return;
}
if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {
int toFree = currentThreadCount -
currentThreadsBusy -
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -