📄 threadpool.java
字号:
package com.lanx.app.mail.pool;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import org.apache.log4j.Logger;
/**
* 线程池 创建线程池,销毁线程池,添加新任务
*
* @author obullxl
*/
public final class ThreadPool {
private static Logger logger = Logger.getLogger(ThreadPool.class);
private static Logger taskLogger = Logger.getLogger("TaskLogger");
private static boolean debug = taskLogger.isDebugEnabled();
//private static boolean debug = taskLogger.isInfoEnabled();
/* 单例 */
private static ThreadPool instance = ThreadPool.getInstance();
// public static final int SYSTEM_BUSY_TASK_COUNT = 150;
/* 默认池中线程数 */
private int worker_num = 5;
/* 已经处理的任务数 */
private int taskCounter = 0;
// public static boolean systemIsBusy = false;
private List<Task> taskQueue = Collections.synchronizedList(new LinkedList<Task>());
/* 池中的所有线程 */
public PoolWorker[] workers;
private ThreadPool() {
init();
}
private ThreadPool(int pool_worker_num) {
worker_num = pool_worker_num;
init();
}
private void init() {
workers = new PoolWorker[worker_num];
for (int i = 0; i < worker_num; i++) {
workers[i] = new PoolWorker(i);
}
}
public static synchronized ThreadPool getInstance() {
return getInstance(5);
}
public static synchronized ThreadPool getInstance(int pool_worker_num) {
if (instance == null)
return new ThreadPool(pool_worker_num);
return instance;
}
/**
* 增加新的任务 每增加一个新任务,都要唤醒任务队列
*
* @param newTask
*/
public void addTask(Task newTask) {
synchronized (taskQueue) {
newTask.setTaskId(++taskCounter);
newTask.setSubmitTime(new Date());
taskQueue.add(newTask);
/* 唤醒队列, 开始执行 */
taskQueue.notifyAll();
}
logger.info("Submit Task<" + newTask.getTaskId() + ">: " + newTask.info());
}
/**
* 批量增加新任务
*
* @param taskes
*/
public void batchAddTask(Task[] taskes) {
if (taskes == null || taskes.length == 0) {
return;
}
synchronized (taskQueue) {
for (int i = 0; i < taskes.length; i++) {
if (taskes[i] == null) {
continue;
}
taskes[i].setTaskId(++taskCounter);
taskes[i].setSubmitTime(new Date());
taskQueue.add(taskes[i]);
}
/* 唤醒队列, 开始执行 */
taskQueue.notifyAll();
}
for (int i = 0; i < taskes.length; i++) {
if (taskes[i] == null) {
continue;
}
logger.info("Submit Task<" + taskes[i].getTaskId() + ">: " + taskes[i].info());
}
}
/**
* 线程池信息
*
* @return
*/
public String getInfo() {
StringBuffer sb = new StringBuffer();
sb.append("\nTask Queue Size:").append(taskQueue.size());
for (int i = 0; i < workers.length; i++) {
sb.append("\nWorker ").append(i).append(" is ")
.append((workers[i].isWaiting()) ? "Waiting." : "Running.");
}
return sb.toString();
}
/**
* 销毁线程池
*/
public synchronized void destroy() {
for (int i = 0; i < worker_num; i++) {
workers[i].stopWorker();
workers[i] = null;
}
taskQueue.clear();
System.exit(1);
}
/**
* 池中工作线程
*
* @author obullxl
*/
class PoolWorker extends Thread {
private int index = -1;
/* 该工作线程是否有效 */
private boolean isRunning = true;
/* 该工作线程是否可以执行新任务 */
private boolean isWaiting = true;
public PoolWorker(int index) {
this.index = index;
start();
}
public void stopWorker() {
this.isRunning = false;
}
public boolean isWaiting() {
return this.isWaiting;
}
/**
* 循环执行任务 这也许是线程池的关键所在
*/
public void run() {
while (isRunning) {
Task task = null;
synchronized (taskQueue) {
while (taskQueue.isEmpty()) {
try {
/* 任务队列为空,则等待有新任务加入从而被唤醒 */
taskQueue.wait(20);
} catch (InterruptedException ie) {
logger.error(ie);
}
}
/* 取出任务执行 */
task = (Task) taskQueue.remove(0);
}
taskLogger.info("debug = " + debug);
taskLogger.info("task = " + task);
if (task != null) {
isWaiting = false;
if (debug) {
task.setBeginExceuteTime(new Date());
taskLogger.debug("Worker<" + index + "> start execute Task<" + task.getTaskId() + ">");
if (task.getBeginExceuteTime().getTime() - task.getSubmitTime().getTime() > 1000)
taskLogger.debug("longer waiting time. " + task.info() + ",<" + index + ">,time:" + (task.getFinishTime().getTime() - task.getBeginExceuteTime().getTime()));
}
/* 该任务是否需要立即执行 */
if (task.needExecuteImmediate()) {
Thread thread = new Thread(task);
//thread.setPriority(task.getPriority());
thread.start();
} else {
task.run();
}
if (debug) {
task.setFinishTime(new Date());
taskLogger.debug("Worker<" + index + "> finish task<" + task.getTaskId() + ">");
if (task.getFinishTime().getTime() - task.getBeginExceuteTime().getTime() > 1000)
taskLogger.debug("longer execution time. " + task.info() + ",<" + index + ">,time:" + (task.getFinishTime().getTime() - task.getBeginExceuteTime().getTime()));
}
isWaiting = true;
task = null;
}
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -