taskengine.java
来自「Jive是基于JSP/JAVA技术构架的一个大型BBS论坛系统,这是Jive论坛」· Java 代码 · 共 384 行
JAVA
384 行
/* * $RCSfile: TaskEngine.java,v $ * $Revision: 1.9 $ * $Date: 2002/06/24 14:51:44 $ * * Copyright (C) 1999-2001 CoolServlets, Inc. All rights reserved. * * This software is the proprietary information of CoolServlets, Inc. * Use is subject to license terms. */package com.jivesoftware.forum.util;import com.jivesoftware.util.CacheFactory;import com.jivesoftware.forum.JiveGlobals;import java.util.*;/** * A class to manage the execution of tasks in the Jive system. A TaskEngine * object accepts Runnable objects and queues them for execution by * worker threads. Optionally, a priority may be assigned to each task. Tasks with a * higher priority are taken from the queue first.<p> */public class TaskEngine { public static final int HIGH_PRIORITY = 2; public static final int MEDIUM_PRIORITY = 1; public static final int LOW_PRIORITY = 0; /** * A queue of tasks to be executed. */ private static PriorityQueue taskQueue = null; /** * A thread group for all workers. */ private static ThreadGroup threadGroup; /** * An array of worker threads. */ private static TaskEngineWorker [] workers = null; /** * A Timer to perform periodic tasks. */ private static Timer taskTimer = null; private static Object lock = new Object(); private static long newWorkerTimestamp = CacheFactory.currentTime; private static long busyTimestamp = CacheFactory.currentTime; static { // Initialize the task timer and make it a deamon. taskTimer = new java.util.Timer(true); taskQueue = new PriorityQueue(); threadGroup = new ThreadGroup("Task Engine Workers"); // Use 10 worker threads by default. workers = new TaskEngineWorker[5]; for (int i=0; i<workers.length; i++) { workers[i] = new TaskEngineWorker("Task Engine Worker " + i); workers[i].setDaemon(true); workers[i].start(); } } private TaskEngine() { // Not instantiable. } /** * Adds a task to the task queue. The task will be executed immediately * provided there is a free worker thread to execute it. Otherwise, it * will execute as soon as a worker thread becomes available.<p> * * @param task the task to execute */ public static void addTask(Runnable task) { addTask(MEDIUM_PRIORITY, task); } /** * Adds a task to the task queue. The task will be executed immediately * provided there is a free worker thread to execute it. Otherwise, it * will execute as soon as a worker thread becomes available.<p> * * @param priority the priority of the task in the queue. * @param task the task to execute */ public static void addTask(int priority, Runnable task) { synchronized(lock) { taskQueue.enqueue(priority, task); // Notify a worker thread of the enqueue. lock.notify(); } } /** * Schedules a task to be run once after a specified delay. * * @param task task to be scheduled. * @param date the date in milliseconds at which the task is to be executed. * @return a TimerTask object which can be used to track execution of the * task. */ public static TimerTask scheduleTask(Runnable task, Date date) { return scheduleTask(MEDIUM_PRIORITY, task, date); } /** * Schedules a task to be run once after a specified delay. * * @param priority the priority of the task in the queue. * @param task task to be scheduled. * @param date the date in milliseconds at which the task is to be executed. * @return a TimerTask object which can be used to track execution of the * task. */ public static TimerTask scheduleTask(int priority, Runnable task, Date date) { TimerTask timerTask = new ScheduledTask(priority, task); taskTimer.schedule(timerTask, date); return timerTask; } /** * Schedules a task to periodically run. This is useful for tasks such as * updating search indexes, deleting old data at periodic intervals, etc. * * @param task task to be scheduled. * @param delay delay in milliseconds before task is to be executed. * @param period time in milliseconds between successive task executions. * @return a TimerTask object which can be used to track executions of the * task and to cancel subsequent executions. */ public static TimerTask scheduleTask(Runnable task, long delay, long period) { return scheduleTask(MEDIUM_PRIORITY, task, delay, period); } /** * Schedules a task to periodically run. This is useful for tasks such as * updating search indexes, deleting old data at periodic intervals, etc. * * @param priority the priority of the task in the queue. * @param task task to be scheduled. * @param delay delay in milliseconds before task is to be executed. * @param period time in milliseconds between successive task executions. * @return a TimerTask object which can be used to track executions of the * task and to cancel subsequent executions. */ public static TimerTask scheduleTask(int priority, Runnable task, long delay, long period) { TimerTask timerTask = new ScheduledTask(priority, task); taskTimer.scheduleAtFixedRate(timerTask, delay, period); return timerTask; } /** * Return the next task in the queue. If no task is available, this method * will block until a task is added to the queue. * * @return a <code>Task</code> object */ private static Runnable nextTask() { synchronized(lock) { // Block until we have another object in the queue to execute. while (taskQueue.isEmpty()) { try { lock.wait(); } catch (InterruptedException ie) { } } // Now, grow or shrink the worker pool as necessary. boolean busy = taskQueue.size() > Math.ceil(workers.length/2); if (busy) { // Update the busy timestamp. busyTimestamp = CacheFactory.currentTime; // Attempt to add another worker to handle the load. addWorker(); } else { // Attempt to remove a worker. removeWorker(); } return (Runnable)taskQueue.dequeue(); } } /** * Adds a new worker to handle load. New workers are added at most once ever two seconds * and only up to thirty workers. */ private static void addWorker() { // Only add a new worker if it's been at least 2 seconds since the last time. if (workers.length < 30 && CacheFactory.currentTime > newWorkerTimestamp + 2000) { int newSize = workers.length + 1; int lastIndex = newSize-1; TaskEngineWorker[] newWorkers = new TaskEngineWorker[newSize]; for (int i=0; i<workers.length; i++) { newWorkers[i] = workers[i]; } newWorkers[lastIndex] = new TaskEngineWorker("Task Engine Worker " + lastIndex); newWorkers[lastIndex].setDaemon(true); newWorkers[lastIndex].start(); // Finally, switch in new data structure. workers = newWorkers; newWorkerTimestamp = CacheFactory.currentTime; } } /** * Removes a worker if load is lower than the necessary number of workers. Workers are removed * at once every five seconds, down to a minimum of three workers. */ private static void removeWorker() { // Only remove a worker if at least 5 seconds have passed since we were last busy. if (workers.length > 3 && CacheFactory.currentTime > busyTimestamp + JiveGlobals.SECOND * 5) { // First, stop the last worker. workers[workers.length-1].stopWorker(); // Create a new worker array one elment smaller than the current one. int newSize = workers.length-1; TaskEngineWorker[] newWorkers = new TaskEngineWorker[newSize]; // Copy in elements up to newSize. for (int i=0; i<newSize; i++) { newWorkers[i] = workers[i]; } workers = newWorkers; // Update the busy timestamp so that another worker won't be removed for a bit. busyTimestamp = CacheFactory.currentTime; } } /** * A worker thread class which executes <code>Task</code> objects. */ private static class TaskEngineWorker extends Thread { private boolean done = false; public TaskEngineWorker(String name) { super(threadGroup, name); } /** * Stops the worker. */ public void stopWorker() { done = true; } /** * Get tasks from the task engine. The call to get another task will * block until there is an available task to execute. */ public void run() { while (!done) { try { nextTask().run(); } catch (Exception e) { e.printStackTrace(); } } } } /** * A subclass of TimerClass that passes along a Runnable to the task engine * when the scheduled task is run. */ private static class ScheduledTask extends TimerTask { private int priority; private Runnable task; public ScheduledTask(int priority, Runnable task) { this.priority = priority; this.task = task; } public void run() { // Put the task into the queue to be run as soon as possible by a // worker. addTask(priority, task); } } /** * A simple priority queue that only allows for elements with one of three priorities: * TaskEngine.HIGH_PRIORITY, TaskEngine.MEDIUM_PRIORITY, and TaskEngine.LOW_PRIORITY. A * small deviation is made from the standard priority queue behavior to prevent lower * priority elements from languishing in the queue forever: during every dequeue operation, * one element is moved from the low priority to medium priority, and one item is moved from * medium priority to high priority.<p> * * This class is not thread-safe, so external synchronization should be used. */ private static class PriorityQueue { private LinkedList high = new LinkedList(); private LinkedList medium = new LinkedList(); private LinkedList low = new LinkedList(); /** * Adds an object to the queue with the specified priority. Valid priority values are: * TaskEngine.HIGH_PRIORITY, TaskEngine.MEDIUM_PRIORITY, and TaskEngine.LOW_PRIORITY. Any * value higher than TaskEngine.HIGH_PRIORITY will be added as high priority, and any * value lower than TaskEngine.LOW_PRIORITY will be added as low priority. * * @param priority the priority of the object in the queue. * @param object the value to add to the queue. */ public void enqueue(int priority, Object object) { if (priority > HIGH_PRIORITY) { priority = HIGH_PRIORITY; } else if (priority < LOW_PRIORITY) { priority = LOW_PRIORITY; } switch (priority) { case HIGH_PRIORITY: high.addFirst(object); break; case MEDIUM_PRIORITY: medium.addFirst(object); break; case LOW_PRIORITY: low.addFirst(object); break; } } /** * Returns true if the queue is empty. * * @return true if the queue is empty. */ public boolean isEmpty() { return high.isEmpty() && medium.isEmpty() && low.isEmpty(); } /** * Returns the number of elements in the queue. * * @return the number of elements in the queue. */ public int size() { return high.size() + medium.size() + low.size(); } /** * Removes and returns the highest priority element from the queue. * * @return the next element from the queue. */ public Object dequeue() { Object object = null; if (!high.isEmpty()) { object = high.removeLast(); } else if (!medium.isEmpty()) { object = medium.removeLast(); } else if (!low.isEmpty()) { object = low.removeLast(); } else { throw new NoSuchElementException("Queue is empty."); } // To prevent items from never being run once added to the queue, we move one element // from each of the lower priority lists to the higher priority lists. if (!low.isEmpty()) { medium.addFirst(low.removeLast()); } if (!medium.isEmpty()) { high.addFirst(medium.removeLast()); } return object; } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?