threadpoolimpl.java
来自「JAVA的一些源码 JAVA2 STANDARD EDITION DEVELO」· Java 代码 · 共 488 行 · 第 1/2 页
JAVA
488 行
/* * Copyright 2005 Sun Microsystems, Inc. All rights reserved. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */package com.sun.corba.se.impl.orbutil.threadpool;import java.security.AccessController;import java.security.PrivilegedAction;import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;import com.sun.corba.se.spi.orbutil.threadpool.Work;import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;import com.sun.corba.se.impl.orbutil.ORBConstants;import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl;import com.sun.corba.se.spi.monitoring.MonitoringConstants;import com.sun.corba.se.spi.monitoring.MonitoredObject;import com.sun.corba.se.spi.monitoring.MonitoringFactories;import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;public class ThreadPoolImpl implements ThreadPool{ private static int threadCounter = 0; // serial counter useful for debugging private WorkQueue workQueue; // Stores the number of available worker threads private int availableWorkerThreads = 0; // Stores the number of threads in the threadpool currently private int currentThreadCount = 0; // Minimum number of worker threads created at instantiation of the threadpool private int minWorkerThreads = 0; // Maximum number of worker threads in the threadpool private int maxWorkerThreads = 0; // Inactivity timeout value for worker threads to exit and stop running private long inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT ; // Indicates if the threadpool is bounded or unbounded private boolean boundedThreadPool = false; // Running count of the work items processed // Set the value to 1 so that divide by zero is avoided in // averageWorkCompletionTime() private long processedCount = 1; // Running aggregate of the time taken in millis to execute work items // processed by the threads in the threadpool private long totalTimeTaken = 0; // Lock for protecting state when required private Object lock = new Object(); // Name of the ThreadPool private String name; // MonitoredObject for ThreadPool private MonitoredObject threadpoolMonitoredObject; // ThreadGroup in which threads should be created private final ThreadGroup threadGroup ; /** * This constructor is used to create an unbounded threadpool */ public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) { maxWorkerThreads = Integer.MAX_VALUE; workQueue = new WorkQueueImpl(this); threadGroup = tg ; name = threadpoolName; initializeMonitoring(); } /** * This constructor is used to create an unbounded threadpool * in the ThreadGroup of the current thread */ public ThreadPoolImpl(String threadpoolName) { this( Thread.currentThread().getThreadGroup(), threadpoolName ) ; } /** * This constructor is used to create bounded threadpool */ public ThreadPoolImpl(int minSize, int maxSize, long timeout, String threadpoolName) { inactivityTimeout = timeout; minWorkerThreads = minSize; maxWorkerThreads = maxSize; boundedThreadPool = true; workQueue = new WorkQueueImpl(this); threadGroup = Thread.currentThread().getThreadGroup() ; name = threadpoolName; for (int i = 0; i < minWorkerThreads; i++) { createWorkerThread(); } initializeMonitoring(); } // Setup monitoring for this threadpool private void initializeMonitoring() { // Get root monitored object MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory(). createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null). getRootMonitoredObject(); // Create the threadpool monitoring root MonitoredObject threadPoolMonitoringObjectRoot = root.getChild( MonitoringConstants.THREADPOOL_MONITORING_ROOT); if (threadPoolMonitoringObjectRoot == null) { threadPoolMonitoringObjectRoot = MonitoringFactories. getMonitoredObjectFactory().createMonitoredObject( MonitoringConstants.THREADPOOL_MONITORING_ROOT, MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION); root.addChild(threadPoolMonitoringObjectRoot); } threadpoolMonitoredObject = MonitoringFactories. getMonitoredObjectFactory(). createMonitoredObject(name, MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION); threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject); LongMonitoredAttributeBase b1 = new LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS, MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) { public Object getValue() { return new Long(ThreadPoolImpl.this.currentNumberOfThreads()); } }; threadpoolMonitoredObject.addAttribute(b1); LongMonitoredAttributeBase b2 = new LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS, MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) { public Object getValue() { return new Long(ThreadPoolImpl.this.numberOfAvailableThreads()); } }; threadpoolMonitoredObject.addAttribute(b2); LongMonitoredAttributeBase b3 = new LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS, MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) { public Object getValue() { return new Long(ThreadPoolImpl.this.numberOfBusyThreads()); } }; threadpoolMonitoredObject.addAttribute(b3); LongMonitoredAttributeBase b4 = new LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME, MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) { public Object getValue() { return new Long(ThreadPoolImpl.this.averageWorkCompletionTime()); } }; threadpoolMonitoredObject.addAttribute(b4); LongMonitoredAttributeBase b5 = new LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT, MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) { public Object getValue() { return new Long(ThreadPoolImpl.this.currentProcessedCount()); } }; threadpoolMonitoredObject.addAttribute(b5); // Add the monitored object for the WorkQueue threadpoolMonitoredObject.addChild( ((WorkQueueImpl)workQueue).getMonitoredObject()); } // Package private method to get the monitored object for this // class MonitoredObject getMonitoredObject() { return threadpoolMonitoredObject; } public WorkQueue getAnyWorkQueue() { return workQueue; } public WorkQueue getWorkQueue(int queueId) throws NoSuchWorkQueueException { if (queueId != 0) throw new NoSuchWorkQueueException(); return workQueue; } /** * To be called from the workqueue when work is added to the * workQueue. This method would create new threads if required * or notify waiting threads on the queue for available work */ void notifyForAvailableWork(WorkQueue aWorkQueue) { synchronized (lock) { if (availableWorkerThreads == 0) { createWorkerThread(); } else { aWorkQueue.notify(); } } } /** * To be called from the workqueue to create worker threads when none * available. */ void createWorkerThread() { synchronized (lock) { final String name = getName() ; if (boundedThreadPool) { if (currentThreadCount < maxWorkerThreads) { currentThreadCount++; } else { // REVIST - Need to create a thread to monitor the // the state for deadlock i.e. all threads waiting for // something which can be got from the item in the // workqueue, but there is no thread available to // process that work item - DEADLOCK !! return; } } else { currentThreadCount++; } // If we get here, we need to create a thread. AccessController.doPrivileged( new PrivilegedAction() { public Object run() { // Thread creation needs to be in a doPrivileged block // for two reasons: // 1. The creation of a thread in a specific ThreadGroup // is a privileged operation. Lack of a doPrivileged // block here causes an AccessControlException
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?