⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.java

📁 Java线程池完整代码
💻 JAVA
字号:
package com.usstinfo.skyeyecore.threadpool;

import java.util.Collection;
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;

/** *//**
 * 线程池
 * @author Leo Chang QQ:278475286
 * 2008-8-8
 */
public class ThreadPool extends Timer{
   
	public static final double LIMIT_IDLE = 0.75;
	public static final double LIMIT_BUSY = 1.25;
	public static final int POOL_STATE_HEALTHY 	= 0;
	public static final int POOL_STATE_IDLE 		= 1;
	public static final int POOL_STATE_BUSY 		= 2;
	
	private static ThreadPool _instance = null;
	protected int 			maxPoolSize		= 10;
    protected int 			initPoolSize	= 2;
	protected int 			tatolTaskNum 	= 0;
	protected Vector 		tasksQueue 		= new Vector();
    protected Vector 		allThreads		= new Vector();
    protected Vector 		idleThreads		= new Vector();
    protected boolean 	initialized 	= false;
    protected boolean 	isRunnable 		= true;
    protected int	 		poolState		= 0;
 
    public static ThreadPool GetIntance(int maxPoolSize, int initPoolSize) {
    	
    	if(_instance==null) {
    		_instance = new ThreadPool(maxPoolSize,initPoolSize);
    	}
    	
    	return _instance;
    }
    
    public static ThreadPool GetIntance() {
    	if(_instance==null) {
    		_instance = new ThreadPool();
    	}
    	
    	return _instance;
    }
   
    // 默认无参数构造函数。
    protected ThreadPool() {
    	for(int i=0; i<initPoolSize; i++) {
            PooledThread thread = new PooledThread(this);
            thread.start();
        }
        
        initialized = true;
        
        this.scheduleAtFixedRate(new TimerTask(){
			@Override
			public void run() {
				// TODO 自动生成方法存根
				TimerEvent();
			}
			
		}, 1000, 1000);
    }
    protected ThreadPool(int maxPoolSize, int initPoolSize) {
    	if(maxPoolSize>0)
    		this.maxPoolSize = maxPoolSize;
    	if(initPoolSize>0)
    		this.initPoolSize = initPoolSize;
    	if(this.initPoolSize>this.maxPoolSize)
    		this.initPoolSize = this.maxPoolSize;
    	
    	 for(int i=0; i<initPoolSize; i++) {
             PooledThread thread = new PooledThread(this);
             thread.start();
         }
         
         initialized = true;
         
         this.scheduleAtFixedRate(new TimerTask(){
 			@Override
 			public void run() {
 				// TODO 自动生成方法存根
 				TimerEvent();
 			}
 			
 		}, 1000, 1000);
    }
 
	private void TimerEvent() {
    	
		// 评定线程池健康状况
		
		if(tatolTaskNum/allThreads.size()<LIMIT_IDLE)
			poolState = POOL_STATE_IDLE;
		else if(tatolTaskNum/allThreads.size()>LIMIT_BUSY){
			poolState = POOL_STATE_BUSY;
		}else
			poolState = POOL_STATE_HEALTHY;


		
		switch(poolState){
		case POOL_STATE_IDLE:
		{
			// 杀掉线程池中过余的线程。
			while(allThreads.size()>initPoolSize&&tatolTaskNum/allThreads.size()<LIMIT_IDLE) {
				PooledThread th = getIdleThread(false);
				if(th!=null){
					th.kill();
				}else{
					break;
				}
			}
		}
			break;
		case POOL_STATE_BUSY:
		{
			// 此处不用生成新线程,在加入新任务时会新增线程。
			/*if(allThreads.size() < maxPoolSize) {
                PooledThread thread = new PooledThread(this);
                thread.start();
			}*/
		}
			break;
		case POOL_STATE_HEALTHY:
			break;
		default:
			break;
		}
	}
    
    public void setMaxPoolSize(int maxPoolSize) {
        //System.out.println("重设最大线程数,最大线程数=" + maxPoolSize);
        this.maxPoolSize = maxPoolSize;
        if(maxPoolSize < getPoolSize())
            setPoolSize(maxPoolSize);
    }
   
    /** *//**
     * 重设当前线程数
     * 若需杀掉某线程,线程不会立刻杀掉,而会等到线程中的事务处理完成
     * 但此方法会立刻从线程池中移除该线程,不会等待事务处理结束
     * @param size
     */
    public void setPoolSize(int size) {
        if(!initialized) {
            initPoolSize = size;
            return;
        }else if(size > tatolTaskNum) {
            for(int i=tatolTaskNum; i<size && i<maxPoolSize; i++) {
                PooledThread thread = new PooledThread(this);
                thread.start();
            }
        }else if(size < tatolTaskNum) {
            while(getPoolSize() > size) {
                PooledThread th = (PooledThread)allThreads.get(0);
                th.kill();
            }
        }
       
        //System.out.println("重设线程数,线程数=" + threads.size());
    }
   
    public int getPoolSize() {
    		return allThreads.size();
    }
    
    protected void threadCreated(PooledThread th) {
    	synchronized(allThreads) {
    		allThreads.add(th);
    	}
    }
    
    protected void threadDead(PooledThread th) {
    	synchronized(allThreads) {
    		threadIdleStateChange(th,false);	//	如果空闲现成队列中还有引用,也清除掉。
    		allThreads.remove(th);
    	}
    }
    protected void threadIdleStateChange(PooledThread th,boolean isIdle) {
    	synchronized(idleThreads) {
    		if(isIdle)
        		idleThreads.add(th);
        	else
        		idleThreads.remove(th);
    	}
    }
 
    public PooledThread getIdleThread(boolean creatable) {
    	synchronized(idleThreads) {
    		if(!idleThreads.isEmpty())
    			return (PooledThread)idleThreads.remove(0);
           
            // 新建一个线程。
            if(creatable && getPoolSize() < maxPoolSize) {
                PooledThread thread = new PooledThread(this);
                thread.start();
                return thread;
            }

            return null;
		}
    }
    
    public ThreadTask getTaskToRun() {
    	synchronized(tasksQueue) {
    		if(tasksQueue.size()>0)
                return (ThreadTask)tasksQueue.remove(0);
            else
                return null;
    	}
    }
    
    public void processTask(ThreadTask task) {
    	synchronized(tasksQueue) {
        	tasksQueue.add(task);
    	}
    	tatolTaskNum++;
    	// 默认的线程启动后都是等待状态,激活一个等待的闲置线程,让他自己去拿任务执行。
    	PooledThread th = getIdleThread(true);
    	if(th!=null)
    		th.activeThread();
    	
    }

    public boolean processTasksInSingleThread(ThreadTask[] tasks) {
    	boolean isNewOrEmptyThreadToProcess = false;
        PooledThread th = getIdleThread(true);
       
        synchronized(tasksQueue) {
        	if(th!=null) {
            	if(th.isTaskQueueEmpty())			// 有空线程可以立即执行任务。
            	{
            		isNewOrEmptyThreadToProcess = true;
            		th.putTasks(tasks);
            		// 如果是交给单个线程处理,线程池任务总是只加1,这些任务当成打包处理。
            		tatolTaskNum++;
            	}else{
            		tasksQueue.add(tasks);
            		// 如果没有交给单个线程,线程池任务数加上实际数量。
            		tatolTaskNum+=tasks.length;
            	}
            	
                th.activeThread();
            }else{
            	tasksQueue.add(tasks);  	// 暂时没有空线程来执行,先加入线程池任务队列中等待。
            	// 如果没有交给单个线程,线程池任务数加上实际数量。
        		tatolTaskNum+=tasks.length;
            }
    	}
        
        return isNewOrEmptyThreadToProcess;
    }

    public void taskCompleted(){
    	tatolTaskNum--;
    }
    
    public void taskReturnedFromThread(Vector tasks) {
    	tatolTaskNum--;
    	for(Iterator itr=tasks.iterator(); itr.hasNext();) {
            ThreadTask task = (ThreadTask)itr.next();
            processTask(task);
        }
    	
    }

	public int getMaxPoolSize() {
		return maxPoolSize;
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -