📄 threadpool.java
字号:
package com.usstinfo.skyeyecore.threadpool;
import java.util.Collection;
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;
/** *//**
* 线程池
* @author Leo Chang QQ:278475286
* 2008-8-8
*/
public class ThreadPool extends Timer{
public static final double LIMIT_IDLE = 0.75;
public static final double LIMIT_BUSY = 1.25;
public static final int POOL_STATE_HEALTHY = 0;
public static final int POOL_STATE_IDLE = 1;
public static final int POOL_STATE_BUSY = 2;
private static ThreadPool _instance = null;
protected int maxPoolSize = 10;
protected int initPoolSize = 2;
protected int tatolTaskNum = 0;
protected Vector tasksQueue = new Vector();
protected Vector allThreads = new Vector();
protected Vector idleThreads = new Vector();
protected boolean initialized = false;
protected boolean isRunnable = true;
protected int poolState = 0;
public static ThreadPool GetIntance(int maxPoolSize, int initPoolSize) {
if(_instance==null) {
_instance = new ThreadPool(maxPoolSize,initPoolSize);
}
return _instance;
}
public static ThreadPool GetIntance() {
if(_instance==null) {
_instance = new ThreadPool();
}
return _instance;
}
// 默认无参数构造函数。
protected ThreadPool() {
for(int i=0; i<initPoolSize; i++) {
PooledThread thread = new PooledThread(this);
thread.start();
}
initialized = true;
this.scheduleAtFixedRate(new TimerTask(){
@Override
public void run() {
// TODO 自动生成方法存根
TimerEvent();
}
}, 1000, 1000);
}
protected ThreadPool(int maxPoolSize, int initPoolSize) {
if(maxPoolSize>0)
this.maxPoolSize = maxPoolSize;
if(initPoolSize>0)
this.initPoolSize = initPoolSize;
if(this.initPoolSize>this.maxPoolSize)
this.initPoolSize = this.maxPoolSize;
for(int i=0; i<initPoolSize; i++) {
PooledThread thread = new PooledThread(this);
thread.start();
}
initialized = true;
this.scheduleAtFixedRate(new TimerTask(){
@Override
public void run() {
// TODO 自动生成方法存根
TimerEvent();
}
}, 1000, 1000);
}
private void TimerEvent() {
// 评定线程池健康状况
if(tatolTaskNum/allThreads.size()<LIMIT_IDLE)
poolState = POOL_STATE_IDLE;
else if(tatolTaskNum/allThreads.size()>LIMIT_BUSY){
poolState = POOL_STATE_BUSY;
}else
poolState = POOL_STATE_HEALTHY;
switch(poolState){
case POOL_STATE_IDLE:
{
// 杀掉线程池中过余的线程。
while(allThreads.size()>initPoolSize&&tatolTaskNum/allThreads.size()<LIMIT_IDLE) {
PooledThread th = getIdleThread(false);
if(th!=null){
th.kill();
}else{
break;
}
}
}
break;
case POOL_STATE_BUSY:
{
// 此处不用生成新线程,在加入新任务时会新增线程。
/*if(allThreads.size() < maxPoolSize) {
PooledThread thread = new PooledThread(this);
thread.start();
}*/
}
break;
case POOL_STATE_HEALTHY:
break;
default:
break;
}
}
public void setMaxPoolSize(int maxPoolSize) {
//System.out.println("重设最大线程数,最大线程数=" + maxPoolSize);
this.maxPoolSize = maxPoolSize;
if(maxPoolSize < getPoolSize())
setPoolSize(maxPoolSize);
}
/** *//**
* 重设当前线程数
* 若需杀掉某线程,线程不会立刻杀掉,而会等到线程中的事务处理完成
* 但此方法会立刻从线程池中移除该线程,不会等待事务处理结束
* @param size
*/
public void setPoolSize(int size) {
if(!initialized) {
initPoolSize = size;
return;
}else if(size > tatolTaskNum) {
for(int i=tatolTaskNum; i<size && i<maxPoolSize; i++) {
PooledThread thread = new PooledThread(this);
thread.start();
}
}else if(size < tatolTaskNum) {
while(getPoolSize() > size) {
PooledThread th = (PooledThread)allThreads.get(0);
th.kill();
}
}
//System.out.println("重设线程数,线程数=" + threads.size());
}
public int getPoolSize() {
return allThreads.size();
}
protected void threadCreated(PooledThread th) {
synchronized(allThreads) {
allThreads.add(th);
}
}
protected void threadDead(PooledThread th) {
synchronized(allThreads) {
threadIdleStateChange(th,false); // 如果空闲现成队列中还有引用,也清除掉。
allThreads.remove(th);
}
}
protected void threadIdleStateChange(PooledThread th,boolean isIdle) {
synchronized(idleThreads) {
if(isIdle)
idleThreads.add(th);
else
idleThreads.remove(th);
}
}
public PooledThread getIdleThread(boolean creatable) {
synchronized(idleThreads) {
if(!idleThreads.isEmpty())
return (PooledThread)idleThreads.remove(0);
// 新建一个线程。
if(creatable && getPoolSize() < maxPoolSize) {
PooledThread thread = new PooledThread(this);
thread.start();
return thread;
}
return null;
}
}
public ThreadTask getTaskToRun() {
synchronized(tasksQueue) {
if(tasksQueue.size()>0)
return (ThreadTask)tasksQueue.remove(0);
else
return null;
}
}
public void processTask(ThreadTask task) {
synchronized(tasksQueue) {
tasksQueue.add(task);
}
tatolTaskNum++;
// 默认的线程启动后都是等待状态,激活一个等待的闲置线程,让他自己去拿任务执行。
PooledThread th = getIdleThread(true);
if(th!=null)
th.activeThread();
}
public boolean processTasksInSingleThread(ThreadTask[] tasks) {
boolean isNewOrEmptyThreadToProcess = false;
PooledThread th = getIdleThread(true);
synchronized(tasksQueue) {
if(th!=null) {
if(th.isTaskQueueEmpty()) // 有空线程可以立即执行任务。
{
isNewOrEmptyThreadToProcess = true;
th.putTasks(tasks);
// 如果是交给单个线程处理,线程池任务总是只加1,这些任务当成打包处理。
tatolTaskNum++;
}else{
tasksQueue.add(tasks);
// 如果没有交给单个线程,线程池任务数加上实际数量。
tatolTaskNum+=tasks.length;
}
th.activeThread();
}else{
tasksQueue.add(tasks); // 暂时没有空线程来执行,先加入线程池任务队列中等待。
// 如果没有交给单个线程,线程池任务数加上实际数量。
tatolTaskNum+=tasks.length;
}
}
return isNewOrEmptyThreadToProcess;
}
public void taskCompleted(){
tatolTaskNum--;
}
public void taskReturnedFromThread(Vector tasks) {
tatolTaskNum--;
for(Iterator itr=tasks.iterator(); itr.hasNext();) {
ThreadTask task = (ThreadTask)itr.next();
processTask(task);
}
}
public int getMaxPoolSize() {
return maxPoolSize;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -