threadpoolimpl.java

来自「JAVA的一些源码 JAVA2 STANDARD EDITION DEVELO」· Java 代码 · 共 488 行 · 第 1/2 页

JAVA
488
字号
/* * Copyright 2005 Sun Microsystems, Inc. All rights reserved. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */package com.sun.corba.se.impl.orbutil.threadpool;import java.security.AccessController;import java.security.PrivilegedAction;import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;import com.sun.corba.se.spi.orbutil.threadpool.Work;import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;import com.sun.corba.se.impl.orbutil.ORBConstants;import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl;import com.sun.corba.se.spi.monitoring.MonitoringConstants;import com.sun.corba.se.spi.monitoring.MonitoredObject;import com.sun.corba.se.spi.monitoring.MonitoringFactories;import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;public class ThreadPoolImpl implements ThreadPool{    private static int threadCounter = 0; // serial counter useful for debugging    private WorkQueue workQueue;        // Stores the number of available worker threads    private int availableWorkerThreads = 0;        // Stores the number of threads in the threadpool currently    private int currentThreadCount = 0;        // Minimum number of worker threads created at instantiation of the threadpool    private int minWorkerThreads = 0;        // Maximum number of worker threads in the threadpool    private int maxWorkerThreads = 0;        // Inactivity timeout value for worker threads to exit and stop running    private long inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT ;        // Indicates if the threadpool is bounded or unbounded    private boolean boundedThreadPool = false;        // Running count of the work items processed    // Set the value to 1 so that divide by zero is avoided in     // averageWorkCompletionTime()    private long processedCount = 1;        // Running aggregate of the time taken in millis to execute work items    // processed by the threads in the threadpool    private long totalTimeTaken = 0;    // Lock for protecting state when required    private Object lock = new Object();    // Name of the ThreadPool    private String name;    // MonitoredObject for ThreadPool    private MonitoredObject threadpoolMonitoredObject;        // ThreadGroup in which threads should be created    private final ThreadGroup threadGroup ;    /**     * This constructor is used to create an unbounded threadpool     */    public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) {        maxWorkerThreads = Integer.MAX_VALUE;        workQueue = new WorkQueueImpl(this);	threadGroup = tg ;	name = threadpoolName;	initializeMonitoring();    }     /**     * This constructor is used to create an unbounded threadpool     * in the ThreadGroup of the current thread     */    public ThreadPoolImpl(String threadpoolName) {	this( Thread.currentThread().getThreadGroup(), threadpoolName ) ;     }    /**     * This constructor is used to create bounded threadpool     */    public ThreadPoolImpl(int minSize, int maxSize, long timeout, 					    String threadpoolName)     {        inactivityTimeout = timeout;        minWorkerThreads = minSize;        maxWorkerThreads = maxSize;        boundedThreadPool = true;        workQueue = new WorkQueueImpl(this);	threadGroup = Thread.currentThread().getThreadGroup() ;	name = threadpoolName;        for (int i = 0; i < minWorkerThreads; i++) {            createWorkerThread();        }	initializeMonitoring();    }    // Setup monitoring for this threadpool    private void initializeMonitoring() {	// Get root monitored object	MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory().		createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null).		getRootMonitoredObject();	// Create the threadpool monitoring root	MonitoredObject threadPoolMonitoringObjectRoot = root.getChild(		    MonitoringConstants.THREADPOOL_MONITORING_ROOT);	if (threadPoolMonitoringObjectRoot == null) {	    threadPoolMonitoringObjectRoot =  MonitoringFactories.		    getMonitoredObjectFactory().createMonitoredObject(		    MonitoringConstants.THREADPOOL_MONITORING_ROOT,		    MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION);	    root.addChild(threadPoolMonitoringObjectRoot);	}	threadpoolMonitoredObject = MonitoringFactories.		    getMonitoredObjectFactory().		    createMonitoredObject(name,		    MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION);	threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject);	LongMonitoredAttributeBase b1 = new 	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS, 		    MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {		public Object getValue() {		    return new Long(ThreadPoolImpl.this.currentNumberOfThreads());		}	    };	threadpoolMonitoredObject.addAttribute(b1);	LongMonitoredAttributeBase b2 = new 	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS, 		    MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {		public Object getValue() {		    return new Long(ThreadPoolImpl.this.numberOfAvailableThreads());		}	    };	threadpoolMonitoredObject.addAttribute(b2);	LongMonitoredAttributeBase b3 = new 	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS, 		    MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) {		public Object getValue() {		    return new Long(ThreadPoolImpl.this.numberOfBusyThreads());		}	    };	threadpoolMonitoredObject.addAttribute(b3);	LongMonitoredAttributeBase b4 = new 	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME, 		    MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) {		public Object getValue() {		    return new Long(ThreadPoolImpl.this.averageWorkCompletionTime());		}	    };	threadpoolMonitoredObject.addAttribute(b4);	LongMonitoredAttributeBase b5 = new 	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT, 		    MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) {		public Object getValue() {		    return new Long(ThreadPoolImpl.this.currentProcessedCount());		}	    };	threadpoolMonitoredObject.addAttribute(b5);	// Add the monitored object for the WorkQueue		threadpoolMonitoredObject.addChild(		((WorkQueueImpl)workQueue).getMonitoredObject());    }    // Package private method to get the monitored object for this    // class    MonitoredObject getMonitoredObject() {	return threadpoolMonitoredObject;    }        public WorkQueue getAnyWorkQueue()    {	return workQueue;    }    public WorkQueue getWorkQueue(int queueId)	throws NoSuchWorkQueueException    {	if (queueId != 0)	    throw new NoSuchWorkQueueException();	return workQueue;    }    /**     * To be called from the workqueue when work is added to the     * workQueue. This method would create new threads if required     * or notify waiting threads on the queue for available work     */    void notifyForAvailableWork(WorkQueue aWorkQueue) {	synchronized (lock) {	    if (availableWorkerThreads == 0) {		createWorkerThread();	    } else {		aWorkQueue.notify();	    }	}    }        /**     * To be called from the workqueue to create worker threads when none     * available.     */    void createWorkerThread() {	synchronized (lock) {	    final String name = getName() ;	      	    if (boundedThreadPool) {		if (currentThreadCount < maxWorkerThreads) {		    currentThreadCount++;		} else {		    // REVIST - Need to create a thread to monitor the		    // the state for deadlock i.e. all threads waiting for		    // something which can be got from the item in the 		    // workqueue, but there is no thread available to		    // process that work item - DEADLOCK !!		    return;		}	    } else {		currentThreadCount++;	    }	    // If we get here, we need to create a thread.	    AccessController.doPrivileged( 		new PrivilegedAction() {		    public Object run() {			// Thread creation needs to be in a doPrivileged block			// for two reasons:			// 1. The creation of a thread in a specific ThreadGroup			//    is a privileged operation.  Lack of a doPrivileged			//    block here causes an AccessControlException 

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?