⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpoolimpl.java

📁 一个实用工具类
💻 JAVA
字号:
/* * Copyright (C) butor.com. All rights reserved. * * This software is published under the terms of the GNU Library General * Public License (GNU LGPL), a copy of which has been included with this * distribution in the LICENSE.txt file.  */package org.butor.resourcePool.threadPoolService;import org.butor.config.Config;import org.butor.config.lowlevel.IProperties;import org.butor.fwService.FwServiceImpl;import org.butor.log.Log;import java.util.Vector;/** * This class is used to implement a thread pool * There are three queues in this object. * <pre> * +-+-+   +-+ The f_waitQ contains the ThreadPoolThread * | | |...| | that are waiting to receive a task * +-+-+   +-+ * * +-+-+   +-+ The f_busyQ contains the ThreadPoolThread * | | |...| | that are currently processing a task * +-+-+   +-+ * * +-+-+   +-+ The f_waitingTasksQ contains the tasks * | | |...| | that are waiting to be processed * +-+-+   +-+ * </pre> * When an object call the method run(aTask) the execution * trace goes like this:<br> * . It puts the task to be processed in the f_waitingTasksQ;<br> * . It then checks the f_waitQ is empty and the maximum number of *   threads has not been reached, it spawns a new thread that is *   put in the f_waitQ;<br> * . It then calls checkAndProcessTask();<br> * <br> * When a thread is done processing a task:<br> * . It move itself from f_busyQ to f_waitQ<br> * . It then calls checkAndProcessTask();<br> * <br> * checkAndProcessTask() does:<br> * . If there is a thread in the f_waitQ and there is a task in *   f_waitingTasksQ:<br> * . It assigns the first task of f_waitingTasksQ to the first thread *   in f_waitQ<br> * . The thread is moved to f_busyQ<br> */public class ThreadPoolImpl extends FwServiceImpl implements IThreadPool {	/**	 * Maximum number of threads in the thread pool	 */	protected int f_maxThreads;	/**	 * True if we are shuting down	 */	protected boolean f_shutdown;	/**	 * True if we are shuting down fast.  Currently there is	 * no distinction between a "fast" and "regular" shutdown	 */	protected boolean f_fastShutdown;	/**	 * Group of thread that process the tasks	 */	protected ThreadGroup f_threadGroup;	/**	 * Name of the tread pool	 */	protected String f_poolName;	/**	 * ID of the next thread in the pool	 */	protected int f_threadId;	/**	 * Paramater name that contain the thread pool name	 */	public String THREAD_POOL_NAME_CFG = "name";	/**	 * Default thread pool name	 */	public String THREAD_POOL_NAME_CFG_DEFAULT = "THREAD_POOL";		/**	 * Paramater name that contain the maximum number of threads	 * in the pool	 */	public String THREAD_POOL_MAX_THREADS = "max_threads";	/**	 * Default maximum number of threads in the pool	 */	public int    THREAD_POOL_MAX_THREADS_DEFAULT = 50;		/**	 * Paramater name that specify if the all the ThreadPoolThread must be	 * daemon threads	 */	public String THREAD_POOL_IS_DAEMON = "is_daemon";	/**	 * Default value that specify if all the ThreadPoolThread must be	 * daemon threads	 */	public boolean THREAD_POOL_IS_DAEMON_DEFAULT = true;	/**	 * Queue of thread currently processing tasks	 */	protected Vector f_busyQ;	/**	 * Queue of tasks waiting to be processed	 */	protected Vector f_waitingTasksQ;	/*	 * Currently, Vector are used to implement f_waitQ, f_busyQ	 * and f_waitingTasksQ.  This is preliminary!  It would be	 * easy to devise more optimal data structures.	 *	 * For f_waitQ, we need to:	 *   . Insert at the end of the Q	 *   . Remove the first element of the Q	 *   . Get the Q length	 *	 * For f_waitingTasksQ, we need to:	 *   . Insert at the end of the Q	 *   . Remove the first element of the Q	 *   . Get the Q length	 *	 * For f_busyQ, we need to:	 *   . Insert at the end of the Q	 *   . Remove an arbitrary element of the Q	 *   . Get the Q length	 */	 	/**	 * Queue of thread waiting to process tasks	 */	protected Vector f_waitQ;/** * This method check if there is a thread idle and * a task to do, if so, it will assign the first tasks * to the first idle threads */protected void checkAndProcessTask() {	Log.logStr(Log.LOG_LEVEL_MEDIUM, this, Log.LOG_TYPE_INFO, "checkAndProcessTask",		"Task Q size: " + f_waitingTasksQ.size() + " Wait Q size: " + f_waitQ.size());		synchronized (f_waitingTasksQ) {		synchronized (f_waitQ) {			/*			 * While there is more tasks to do and more idle threads			 * dispatch the tasks to the threads			 */			while ((f_waitingTasksQ.size() > 0) &&  (f_waitQ.size() > 0)) {				Runnable aTask = getFirstTask();				ThreadPoolThread aThread = reuseThread();				if (null != aThread) {					putThreadInBusyQ(aThread, aTask);				}			}		}	}}/** * This method is called by ThreadPoolThread objects * when they are done working on a task * * @param aThread com.cjc.common.resourcesMgmt.threadPool.ThreadPoolThread * the thread that has finished its task */public void doneTask(ThreadPoolThread aThread) {	Log.logStr(Log.LOG_LEVEL_FULL, this, Log.LOG_TYPE_INFO, "doneTask",		"Task Q size: " + f_waitingTasksQ.size() + " " + aThread + " is done");		int i = -1;	/*	 * Ok, first off, we should find the thread in	 * the f_busyQ, we must remove it from this Q	 */	synchronized (f_busyQ) {		 i = f_busyQ.indexOf(aThread);		 if (0 <= i) {			f_busyQ.removeElementAt(i);		 } else {			Log.logStr(this, Log.LOG_TYPE_ERROR, "doneTask",				 "Thread is not in busy list!!!: " + aThread);		 }	}	/*	 * If the thread was found (we check this only for	 * sanity), we must find it!  Then we put it in the	 * f_waitQ Q	 */	if (0 <= i) {		putThreadInWaitQ(aThread);	}	/*	 * We then verify if tasks could be assigned to	 * this newly idle thread	 */	checkAndProcessTask();}/** * This method returns the first task to process * * @return java.lang.Runnable The first task to process * or null if there is nothing to do */protected Runnable getFirstTask() {	Runnable aTask = null;		synchronized (f_waitingTasksQ) {		if (f_waitingTasksQ.size() > 0) {			aTask = (Runnable)f_waitingTasksQ.elementAt(0);			f_waitingTasksQ.removeElementAt(0);		}	}				return aTask;}/** * This method returns the number of threads managed by * this pool * * @return int */protected int getNbThreads() {	return f_waitQ.size() + f_busyQ.size();}/** * Creates an array containing all the threads managed by this * pool * * @return ThreadPoolThread[] The list of threads managed by this * pool */protected ThreadPoolThread[] getThreads() {	int i;	int j;		Thread[] threadListRaw = new Thread[f_threadGroup.activeCount()];	ThreadPoolThread[] threadList = new ThreadPoolThread[f_threadGroup.activeCount()];		f_threadGroup.enumerate(threadListRaw);	for (i = 0, j = 0; i < threadListRaw.length; i++) {		if (threadListRaw[i] instanceof ThreadPoolThread) {			threadList[j++] = (ThreadPoolThread)threadListRaw[i];		} else {			Log.logStr(this, Log.LOG_TYPE_ERROR, "",				"Thread [" + threadListRaw[i] +				"] does not belong to group " + f_poolName);		}	}	if (i != j) {		ThreadPoolThread[] threadListTemp = new ThreadPoolThread[j];		for (i = 0; i < j; i++) {			threadListTemp[i] = threadList[i];		}		threadList = threadListTemp;	}		return threadList;}/** * This method is called to initialize the thread pool. */public void init() {	Log.logStr(this, Log.LOG_TYPE_INFO, "init", "...");		// get service config.	IProperties serviceProps = Config.getPropertyList(		FwServiceImpl.PROPERTY_PREFIX_SERVICE +getServiceName());	if (serviceProps == null) {		Log.logStr(			this,			Log.LOG_TYPE_ERROR,			"init()",			"Missing property list [" 				+ FwServiceImpl.PROPERTY_PREFIX_SERVICE +getServiceName()				+ "]");		f_poolName = THREAD_POOL_NAME_CFG_DEFAULT;		f_maxThreads = THREAD_POOL_MAX_THREADS_DEFAULT;		} else {		f_poolName = serviceProps.getProperty(THREAD_POOL_NAME_CFG,											THREAD_POOL_NAME_CFG_DEFAULT);				f_maxThreads = serviceProps.getPropertyInt(THREAD_POOL_MAX_THREADS,											THREAD_POOL_MAX_THREADS_DEFAULT);	}	f_threadGroup = new ThreadGroup(f_poolName);			if ((f_maxThreads <= 0) || (f_maxThreads > 10000)) {		throw new IllegalArgumentException(			"Number of threads is too low or too high " + f_maxThreads);	}}/** * This method is called automatically by the service class * monitor.  It is used in this context to verify the sanity * of the ThreadPoolThread objects managed by this pool. */public void isAlive() {	/*	 * No need to synchronize or lock the queues, it is	 * non intrusive and if a thread is currently being moved	 * from the f_busyQ to the f_waitQ or vice-versa, then	 * it will be checked in the next isAlive() call	 */	iterateOverAllThreadInQ(f_waitQ);	iterateOverAllThreadInQ(f_busyQ);}/** * Returns true if the ThreadPoolThread objects should * be daemon threads, otherwise false * * @return boolean */protected boolean isDaemonThreads() {	return Config.getPropertyBoolean(THREAD_POOL_IS_DAEMON,		THREAD_POOL_IS_DAEMON_DEFAULT);}/** * This method is called by the isAlive() method, it iterates * over a thread Q (f_busyQ or f_waitQ), and check if each * of the threads are alive.  It does not guarantee that * all the threads will be checked!  A thread can be passing * from one queue to the other, or an insertion or deletion * from the queue can prevent this.  But in this context it * is enough, the next isAlive() call will check the threads * that have not been checked in the previous call. * * @param threadQ java.util.Vector The thread Q to check */protected void iterateOverAllThreadInQ(Vector threadQ) {	int i = 0;	ThreadPoolThread aThread = null;	/*	 * The Q length can be modified while we are iterating	 * over it, so we don't use an Enumeration, which is not	 * thread safe nor do we use the number of elements in	 * the Q which can change.  So we simply use the index	 * of the elements in the queue until an	 * ArrayIndexOutOfBoundsException is raised.	 *	 * It's ugly but it works!	 */	try {		while (true) {			aThread = (ThreadPoolThread)threadQ.elementAt(i);			/*			 * If the thread is not alive any more, then we			 * simply remove it from our Q			 */			if (!aThread.isAlive()) {				Log.logStr(this, Log.LOG_TYPE_WARN, "isAlive",					"Found a dead thread! " + aThread);				threadQ.removeElementAt(i);			} else {				i++;			}		}	} catch (ArrayIndexOutOfBoundsException e) {		// It's normal here, we don't do anything	}}/** * Inserts a new task in the f_waitingTasksQ * queue * * @param aTask java.lang.Runnable the new task to process */protected void putTaskInTaskQ(Runnable aTask) {	Log.logStr(Log.LOG_LEVEL_MEDIUM, this, Log.LOG_TYPE_INFO, "putTaskInTaskQ",		"Adding " + aTask + " Q size: " + f_waitingTasksQ.size());		synchronized (f_waitingTasksQ) {		f_waitingTasksQ.addElement(aTask);	}}/** * Inserts a thread in the f_busyQ queue.  The thread * must come from the f_waitQ queue.  The thread is assigned * a new task. * * @param aThread com.cjc.common.resourcesMgmt.threadPool.ThreadPoolThread * the thread to move in the f_busyQ and is assigned the task * @param aTask java.lang.Runnable the new task assigned to the thread */protected void putThreadInBusyQ(ThreadPoolThread aThread, Runnable aTask) {	Log.logStr(Log.LOG_LEVEL_FULL, this, Log.LOG_TYPE_INFO, "putThreadInBusyQ",		"Task Q size: " + f_waitingTasksQ.size() +		" Busy Q size: " + f_busyQ.size() + " " +		aThread + " will process " + aTask);		synchronized (f_busyQ) {		f_busyQ.addElement(aThread);		aThread.setTask(aTask);	}}/** * This method is called by two events, when a thread * has done processing a task or a new thread is created. The * thread is put int the f_waitQ queue * * @param aThread com.cjc.common.resourcesMgmt.threadPool.ThreadPoolThread * the thread to put in the Q */protected void putThreadInWaitQ(ThreadPoolThread aThread) {	Log.logStr(Log.LOG_LEVEL_FULL, this, Log.LOG_TYPE_INFO, "putThreadInWaitQ",		"Task Q size: " + f_waitingTasksQ.size() +		" Wait Q size: " + f_waitQ.size() + " " +		aThread + " is becoming idle");		synchronized (f_waitQ) {		f_waitQ.addElement(aThread);	}}/** * This method attempt to get the first thread * waiting in f_waitQ.  The thread is removed from the queue. * * @return com.cjc.common.resourcesMgmt.threadPool.ThreadPoolThread The * first thread in the f_waitQ or null if the queue is empty */protected ThreadPoolThread reuseThread() {	ThreadPoolThread aThread = null;		synchronized (f_waitQ) {		if (f_waitQ.size() > 0) {			aThread = (ThreadPoolThread)f_waitQ.elementAt(0);			f_waitQ.removeElementAt(0);		}	}		Log.logStr(Log.LOG_LEVEL_FULL, this, Log.LOG_TYPE_INFO, "reuseThread",		"Wait Q size: " + f_waitQ.size() + " " +		aThread + " will work");			return aThread;}/** * This method is called by the clients that want to process * a task.  The task is put in the f_waitingTasksQ and if * a thread is available in the f_waitQ or the pool has not * reached the maximum number of threads, the task is assigned * to a thread. * * @param task java.lang.Runnable The new task to process */public void run(Runnable task) {	if (null == task) {		Log.logStr(this, Log.LOG_TYPE_WARN, "run", "assert(task == null)");				return;	}		Log.logStr(Log.LOG_LEVEL_LOW, this, Log.LOG_TYPE_INFO, "run",		task.toString());		putTaskInTaskQ(task);	synchronized (f_waitQ) {		if (f_waitQ.size() <= 0) {			if (getNbThreads() < f_maxThreads) {				spawnThread();			}		}	}	checkAndProcessTask();}/** * This method is used to shut down every threads * managed by this pool. */public void shutdown() {	this.shutdown(true);}/** * This method is used to shut down every threads * managed by this pool. * * @param isFast boolean Ignored for the moment */public void shutdown(boolean isFast) {	f_shutdown = true;	unregisterFromService();	ThreadPoolThread[] threadList = getThreads();	for (int i = 0; i < threadList.length; i++) {		threadList[i].shutdown();	}}/** * This method is used to create a new ThreadPoolThread * and put it in f_waitQ. */protected void spawnThread() {	Log.logStr(Log.LOG_LEVEL_LOW, this, Log.LOG_TYPE_INFO, "spawnThread",		"Spawn " + f_poolName + f_threadId);		ThreadPoolThread poolThread = new ThreadPoolThread(this, f_threadGroup,										f_poolName + f_threadId++);		poolThread.setDaemon(isDaemonThreads());		poolThread.start();	synchronized (f_waitQ) {			f_waitQ.addElement(poolThread);	}}public ThreadPoolImpl() {	f_waitQ = new Vector();	f_busyQ = new Vector();	f_waitingTasksQ = new Vector();		f_poolName = null;	f_maxThreads = THREAD_POOL_MAX_THREADS_DEFAULT;	f_shutdown = false;	f_fastShutdown = false;	f_threadGroup = null;	f_threadId = 1;}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -