📄 threadpoolimpl.java
字号:
/* * Copyright (C) butor.com. All rights reserved. * * This software is published under the terms of the GNU Library General * Public License (GNU LGPL), a copy of which has been included with this * distribution in the LICENSE.txt file. */package org.butor.resourcePool.threadPoolService;import org.butor.config.Config;import org.butor.config.lowlevel.IProperties;import org.butor.fwService.FwServiceImpl;import org.butor.log.Log;import java.util.Vector;/** * This class is used to implement a thread pool * There are three queues in this object. * <pre> * +-+-+ +-+ The f_waitQ contains the ThreadPoolThread * | | |...| | that are waiting to receive a task * +-+-+ +-+ * * +-+-+ +-+ The f_busyQ contains the ThreadPoolThread * | | |...| | that are currently processing a task * +-+-+ +-+ * * +-+-+ +-+ The f_waitingTasksQ contains the tasks * | | |...| | that are waiting to be processed * +-+-+ +-+ * </pre> * When an object call the method run(aTask) the execution * trace goes like this:<br> * . It puts the task to be processed in the f_waitingTasksQ;<br> * . It then checks the f_waitQ is empty and the maximum number of * threads has not been reached, it spawns a new thread that is * put in the f_waitQ;<br> * . It then calls checkAndProcessTask();<br> * <br> * When a thread is done processing a task:<br> * . It move itself from f_busyQ to f_waitQ<br> * . It then calls checkAndProcessTask();<br> * <br> * checkAndProcessTask() does:<br> * . If there is a thread in the f_waitQ and there is a task in * f_waitingTasksQ:<br> * . It assigns the first task of f_waitingTasksQ to the first thread * in f_waitQ<br> * . The thread is moved to f_busyQ<br> */public class ThreadPoolImpl extends FwServiceImpl implements IThreadPool { /** * Maximum number of threads in the thread pool */ protected int f_maxThreads; /** * True if we are shuting down */ protected boolean f_shutdown; /** * True if we are shuting down fast. Currently there is * no distinction between a "fast" and "regular" shutdown */ protected boolean f_fastShutdown; /** * Group of thread that process the tasks */ protected ThreadGroup f_threadGroup; /** * Name of the tread pool */ protected String f_poolName; /** * ID of the next thread in the pool */ protected int f_threadId; /** * Paramater name that contain the thread pool name */ public String THREAD_POOL_NAME_CFG = "name"; /** * Default thread pool name */ public String THREAD_POOL_NAME_CFG_DEFAULT = "THREAD_POOL"; /** * Paramater name that contain the maximum number of threads * in the pool */ public String THREAD_POOL_MAX_THREADS = "max_threads"; /** * Default maximum number of threads in the pool */ public int THREAD_POOL_MAX_THREADS_DEFAULT = 50; /** * Paramater name that specify if the all the ThreadPoolThread must be * daemon threads */ public String THREAD_POOL_IS_DAEMON = "is_daemon"; /** * Default value that specify if all the ThreadPoolThread must be * daemon threads */ public boolean THREAD_POOL_IS_DAEMON_DEFAULT = true; /** * Queue of thread currently processing tasks */ protected Vector f_busyQ; /** * Queue of tasks waiting to be processed */ protected Vector f_waitingTasksQ; /* * Currently, Vector are used to implement f_waitQ, f_busyQ * and f_waitingTasksQ. This is preliminary! It would be * easy to devise more optimal data structures. * * For f_waitQ, we need to: * . Insert at the end of the Q * . Remove the first element of the Q * . Get the Q length * * For f_waitingTasksQ, we need to: * . Insert at the end of the Q * . Remove the first element of the Q * . Get the Q length * * For f_busyQ, we need to: * . Insert at the end of the Q * . Remove an arbitrary element of the Q * . Get the Q length */ /** * Queue of thread waiting to process tasks */ protected Vector f_waitQ;/** * This method check if there is a thread idle and * a task to do, if so, it will assign the first tasks * to the first idle threads */protected void checkAndProcessTask() { Log.logStr(Log.LOG_LEVEL_MEDIUM, this, Log.LOG_TYPE_INFO, "checkAndProcessTask", "Task Q size: " + f_waitingTasksQ.size() + " Wait Q size: " + f_waitQ.size()); synchronized (f_waitingTasksQ) { synchronized (f_waitQ) { /* * While there is more tasks to do and more idle threads * dispatch the tasks to the threads */ while ((f_waitingTasksQ.size() > 0) && (f_waitQ.size() > 0)) { Runnable aTask = getFirstTask(); ThreadPoolThread aThread = reuseThread(); if (null != aThread) { putThreadInBusyQ(aThread, aTask); } } } }}/** * This method is called by ThreadPoolThread objects * when they are done working on a task * * @param aThread com.cjc.common.resourcesMgmt.threadPool.ThreadPoolThread * the thread that has finished its task */public void doneTask(ThreadPoolThread aThread) { Log.logStr(Log.LOG_LEVEL_FULL, this, Log.LOG_TYPE_INFO, "doneTask", "Task Q size: " + f_waitingTasksQ.size() + " " + aThread + " is done"); int i = -1; /* * Ok, first off, we should find the thread in * the f_busyQ, we must remove it from this Q */ synchronized (f_busyQ) { i = f_busyQ.indexOf(aThread); if (0 <= i) { f_busyQ.removeElementAt(i); } else { Log.logStr(this, Log.LOG_TYPE_ERROR, "doneTask", "Thread is not in busy list!!!: " + aThread); } } /* * If the thread was found (we check this only for * sanity), we must find it! Then we put it in the * f_waitQ Q */ if (0 <= i) { putThreadInWaitQ(aThread); } /* * We then verify if tasks could be assigned to * this newly idle thread */ checkAndProcessTask();}/** * This method returns the first task to process * * @return java.lang.Runnable The first task to process * or null if there is nothing to do */protected Runnable getFirstTask() { Runnable aTask = null; synchronized (f_waitingTasksQ) { if (f_waitingTasksQ.size() > 0) { aTask = (Runnable)f_waitingTasksQ.elementAt(0); f_waitingTasksQ.removeElementAt(0); } } return aTask;}/** * This method returns the number of threads managed by * this pool * * @return int */protected int getNbThreads() { return f_waitQ.size() + f_busyQ.size();}/** * Creates an array containing all the threads managed by this * pool * * @return ThreadPoolThread[] The list of threads managed by this * pool */protected ThreadPoolThread[] getThreads() { int i; int j; Thread[] threadListRaw = new Thread[f_threadGroup.activeCount()]; ThreadPoolThread[] threadList = new ThreadPoolThread[f_threadGroup.activeCount()]; f_threadGroup.enumerate(threadListRaw); for (i = 0, j = 0; i < threadListRaw.length; i++) { if (threadListRaw[i] instanceof ThreadPoolThread) { threadList[j++] = (ThreadPoolThread)threadListRaw[i]; } else { Log.logStr(this, Log.LOG_TYPE_ERROR, "", "Thread [" + threadListRaw[i] + "] does not belong to group " + f_poolName); } } if (i != j) { ThreadPoolThread[] threadListTemp = new ThreadPoolThread[j]; for (i = 0; i < j; i++) { threadListTemp[i] = threadList[i]; } threadList = threadListTemp; } return threadList;}/** * This method is called to initialize the thread pool. */public void init() { Log.logStr(this, Log.LOG_TYPE_INFO, "init", "..."); // get service config. IProperties serviceProps = Config.getPropertyList( FwServiceImpl.PROPERTY_PREFIX_SERVICE +getServiceName()); if (serviceProps == null) { Log.logStr( this, Log.LOG_TYPE_ERROR, "init()", "Missing property list [" + FwServiceImpl.PROPERTY_PREFIX_SERVICE +getServiceName() + "]"); f_poolName = THREAD_POOL_NAME_CFG_DEFAULT; f_maxThreads = THREAD_POOL_MAX_THREADS_DEFAULT; } else { f_poolName = serviceProps.getProperty(THREAD_POOL_NAME_CFG, THREAD_POOL_NAME_CFG_DEFAULT); f_maxThreads = serviceProps.getPropertyInt(THREAD_POOL_MAX_THREADS, THREAD_POOL_MAX_THREADS_DEFAULT); } f_threadGroup = new ThreadGroup(f_poolName); if ((f_maxThreads <= 0) || (f_maxThreads > 10000)) { throw new IllegalArgumentException( "Number of threads is too low or too high " + f_maxThreads); }}/** * This method is called automatically by the service class * monitor. It is used in this context to verify the sanity * of the ThreadPoolThread objects managed by this pool. */public void isAlive() { /* * No need to synchronize or lock the queues, it is * non intrusive and if a thread is currently being moved * from the f_busyQ to the f_waitQ or vice-versa, then * it will be checked in the next isAlive() call */ iterateOverAllThreadInQ(f_waitQ); iterateOverAllThreadInQ(f_busyQ);}/** * Returns true if the ThreadPoolThread objects should * be daemon threads, otherwise false * * @return boolean */protected boolean isDaemonThreads() { return Config.getPropertyBoolean(THREAD_POOL_IS_DAEMON, THREAD_POOL_IS_DAEMON_DEFAULT);}/** * This method is called by the isAlive() method, it iterates * over a thread Q (f_busyQ or f_waitQ), and check if each * of the threads are alive. It does not guarantee that * all the threads will be checked! A thread can be passing * from one queue to the other, or an insertion or deletion * from the queue can prevent this. But in this context it * is enough, the next isAlive() call will check the threads * that have not been checked in the previous call. * * @param threadQ java.util.Vector The thread Q to check */protected void iterateOverAllThreadInQ(Vector threadQ) { int i = 0; ThreadPoolThread aThread = null; /* * The Q length can be modified while we are iterating * over it, so we don't use an Enumeration, which is not * thread safe nor do we use the number of elements in * the Q which can change. So we simply use the index * of the elements in the queue until an * ArrayIndexOutOfBoundsException is raised. * * It's ugly but it works! */ try { while (true) { aThread = (ThreadPoolThread)threadQ.elementAt(i); /* * If the thread is not alive any more, then we * simply remove it from our Q */ if (!aThread.isAlive()) { Log.logStr(this, Log.LOG_TYPE_WARN, "isAlive", "Found a dead thread! " + aThread); threadQ.removeElementAt(i); } else { i++; } } } catch (ArrayIndexOutOfBoundsException e) { // It's normal here, we don't do anything }}/** * Inserts a new task in the f_waitingTasksQ * queue * * @param aTask java.lang.Runnable the new task to process */protected void putTaskInTaskQ(Runnable aTask) { Log.logStr(Log.LOG_LEVEL_MEDIUM, this, Log.LOG_TYPE_INFO, "putTaskInTaskQ", "Adding " + aTask + " Q size: " + f_waitingTasksQ.size()); synchronized (f_waitingTasksQ) { f_waitingTasksQ.addElement(aTask); }}/** * Inserts a thread in the f_busyQ queue. The thread * must come from the f_waitQ queue. The thread is assigned * a new task. * * @param aThread com.cjc.common.resourcesMgmt.threadPool.ThreadPoolThread * the thread to move in the f_busyQ and is assigned the task * @param aTask java.lang.Runnable the new task assigned to the thread */protected void putThreadInBusyQ(ThreadPoolThread aThread, Runnable aTask) { Log.logStr(Log.LOG_LEVEL_FULL, this, Log.LOG_TYPE_INFO, "putThreadInBusyQ", "Task Q size: " + f_waitingTasksQ.size() + " Busy Q size: " + f_busyQ.size() + " " + aThread + " will process " + aTask); synchronized (f_busyQ) { f_busyQ.addElement(aThread); aThread.setTask(aTask); }}/** * This method is called by two events, when a thread * has done processing a task or a new thread is created. The * thread is put int the f_waitQ queue * * @param aThread com.cjc.common.resourcesMgmt.threadPool.ThreadPoolThread * the thread to put in the Q */protected void putThreadInWaitQ(ThreadPoolThread aThread) { Log.logStr(Log.LOG_LEVEL_FULL, this, Log.LOG_TYPE_INFO, "putThreadInWaitQ", "Task Q size: " + f_waitingTasksQ.size() + " Wait Q size: " + f_waitQ.size() + " " + aThread + " is becoming idle"); synchronized (f_waitQ) { f_waitQ.addElement(aThread); }}/** * This method attempt to get the first thread * waiting in f_waitQ. The thread is removed from the queue. * * @return com.cjc.common.resourcesMgmt.threadPool.ThreadPoolThread The * first thread in the f_waitQ or null if the queue is empty */protected ThreadPoolThread reuseThread() { ThreadPoolThread aThread = null; synchronized (f_waitQ) { if (f_waitQ.size() > 0) { aThread = (ThreadPoolThread)f_waitQ.elementAt(0); f_waitQ.removeElementAt(0); } } Log.logStr(Log.LOG_LEVEL_FULL, this, Log.LOG_TYPE_INFO, "reuseThread", "Wait Q size: " + f_waitQ.size() + " " + aThread + " will work"); return aThread;}/** * This method is called by the clients that want to process * a task. The task is put in the f_waitingTasksQ and if * a thread is available in the f_waitQ or the pool has not * reached the maximum number of threads, the task is assigned * to a thread. * * @param task java.lang.Runnable The new task to process */public void run(Runnable task) { if (null == task) { Log.logStr(this, Log.LOG_TYPE_WARN, "run", "assert(task == null)"); return; } Log.logStr(Log.LOG_LEVEL_LOW, this, Log.LOG_TYPE_INFO, "run", task.toString()); putTaskInTaskQ(task); synchronized (f_waitQ) { if (f_waitQ.size() <= 0) { if (getNbThreads() < f_maxThreads) { spawnThread(); } } } checkAndProcessTask();}/** * This method is used to shut down every threads * managed by this pool. */public void shutdown() { this.shutdown(true);}/** * This method is used to shut down every threads * managed by this pool. * * @param isFast boolean Ignored for the moment */public void shutdown(boolean isFast) { f_shutdown = true; unregisterFromService(); ThreadPoolThread[] threadList = getThreads(); for (int i = 0; i < threadList.length; i++) { threadList[i].shutdown(); }}/** * This method is used to create a new ThreadPoolThread * and put it in f_waitQ. */protected void spawnThread() { Log.logStr(Log.LOG_LEVEL_LOW, this, Log.LOG_TYPE_INFO, "spawnThread", "Spawn " + f_poolName + f_threadId); ThreadPoolThread poolThread = new ThreadPoolThread(this, f_threadGroup, f_poolName + f_threadId++); poolThread.setDaemon(isDaemonThreads()); poolThread.start(); synchronized (f_waitQ) { f_waitQ.addElement(poolThread); }}public ThreadPoolImpl() { f_waitQ = new Vector(); f_busyQ = new Vector(); f_waitingTasksQ = new Vector(); f_poolName = null; f_maxThreads = THREAD_POOL_MAX_THREADS_DEFAULT; f_shutdown = false; f_fastShutdown = false; f_threadGroup = null; f_threadId = 1;}}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -