⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpoolimpl.java

📁 java1.6众多例子参考
💻 JAVA
字号:
/* * Copyright 2006 Sun Microsystems, Inc. All rights reserved. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */package com.sun.corba.se.impl.orbutil.threadpool;import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;import com.sun.corba.se.spi.orbutil.threadpool.Work;import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;import com.sun.corba.se.impl.orbutil.ORBConstants;import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl;import com.sun.corba.se.spi.monitoring.MonitoringConstants;import com.sun.corba.se.spi.monitoring.MonitoredObject;import com.sun.corba.se.spi.monitoring.MonitoringFactories;import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;public class ThreadPoolImpl implements ThreadPool{    private static int threadCounter = 0; // serial counter useful for debugging    private WorkQueue workQueue;        // Stores the number of available worker threads    private int availableWorkerThreads = 0;        // Stores the number of threads in the threadpool currently    private int currentThreadCount = 0;        // Minimum number of worker threads created at instantiation of the threadpool    private int minWorkerThreads = 0;        // Maximum number of worker threads in the threadpool    private int maxWorkerThreads = 0;        // Inactivity timeout value for worker threads to exit and stop running    private long inactivityTimeout;        // Indicates if the threadpool is bounded or unbounded    private boolean boundedThreadPool = false;        // Running count of the work items processed    // Set the value to 1 so that divide by zero is avoided in     // averageWorkCompletionTime()    private long processedCount = 1;        // Running aggregate of the time taken in millis to execute work items    // processed by the threads in the threadpool    private long totalTimeTaken = 0;    // Lock for protecting state when required    private Object lock = new Object();    // Name of the ThreadPool    private String name;    // MonitoredObject for ThreadPool    private MonitoredObject threadpoolMonitoredObject;        // ThreadGroup in which threads should be created    private ThreadGroup threadGroup ;    /**     * This constructor is used to create an unbounded threadpool     */    public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) {        inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT;        maxWorkerThreads = Integer.MAX_VALUE;        workQueue = new WorkQueueImpl(this);	threadGroup = tg ;	name = threadpoolName;	initializeMonitoring();    }     /**     * This constructor is used to create an unbounded threadpool     * in the ThreadGroup of the current thread     */    public ThreadPoolImpl(String threadpoolName) {	this( Thread.currentThread().getThreadGroup(), threadpoolName ) ;     }    /**     * This constructor is used to create bounded threadpool     */    public ThreadPoolImpl(int minSize, int maxSize, long timeout, 					    String threadpoolName)     {        minWorkerThreads = minSize;        maxWorkerThreads = maxSize;        inactivityTimeout = timeout;        boundedThreadPool = true;        workQueue = new WorkQueueImpl(this);	name = threadpoolName;        for (int i = 0; i < minWorkerThreads; i++) {            createWorkerThread();        }	initializeMonitoring();    }    // Setup monitoring for this threadpool    private void initializeMonitoring() {	// Get root monitored object	MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory().		createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null).		getRootMonitoredObject();	// Create the threadpool monitoring root	MonitoredObject threadPoolMonitoringObjectRoot = root.getChild(		    MonitoringConstants.THREADPOOL_MONITORING_ROOT);	if (threadPoolMonitoringObjectRoot == null) {	    threadPoolMonitoringObjectRoot =  MonitoringFactories.		    getMonitoredObjectFactory().createMonitoredObject(		    MonitoringConstants.THREADPOOL_MONITORING_ROOT,		    MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION);	    root.addChild(threadPoolMonitoringObjectRoot);	}	threadpoolMonitoredObject = MonitoringFactories.		    getMonitoredObjectFactory().		    createMonitoredObject(name,		    MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION);	threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject);	LongMonitoredAttributeBase b1 = new 	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS, 		    MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {		public Object getValue() {		    return new Long(ThreadPoolImpl.this.currentNumberOfThreads());		}	    };	threadpoolMonitoredObject.addAttribute(b1);	LongMonitoredAttributeBase b2 = new 	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS, 		    MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {		public Object getValue() {		    return new Long(ThreadPoolImpl.this.numberOfAvailableThreads());		}	    };	threadpoolMonitoredObject.addAttribute(b2);	LongMonitoredAttributeBase b3 = new 	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS, 		    MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) {		public Object getValue() {		    return new Long(ThreadPoolImpl.this.numberOfBusyThreads());		}	    };	threadpoolMonitoredObject.addAttribute(b3);	LongMonitoredAttributeBase b4 = new 	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME, 		    MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) {		public Object getValue() {		    return new Long(ThreadPoolImpl.this.averageWorkCompletionTime());		}	    };	threadpoolMonitoredObject.addAttribute(b4);	LongMonitoredAttributeBase b5 = new 	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT, 		    MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) {		public Object getValue() {		    return new Long(ThreadPoolImpl.this.currentProcessedCount());		}	    };	threadpoolMonitoredObject.addAttribute(b5);	// Add the monitored object for the WorkQueue		threadpoolMonitoredObject.addChild(		((WorkQueueImpl)workQueue).getMonitoredObject());    }    // Package private method to get the monitored object for this    // class    MonitoredObject getMonitoredObject() {	return threadpoolMonitoredObject;    }        public WorkQueue getAnyWorkQueue()    {	return workQueue;    }    public WorkQueue getWorkQueue(int queueId)	throws NoSuchWorkQueueException    {	if (queueId != 0)	    throw new NoSuchWorkQueueException();	return workQueue;    }    /**     * To be called from the workqueue when work is added to the     * workQueue. This method would create new threads if required     * or notify waiting threads on the queue for available work     */    void notifyForAvailableWork(WorkQueue aWorkQueue) {	synchronized (lock) {	    if (availableWorkerThreads == 0) {		createWorkerThread();	    } else {		aWorkQueue.notify();	    }	}    }        /**     * To be called from the workqueue to create worker threads when none     * available.     */    void createWorkerThread() {        WorkerThread thread;        	synchronized (lock) {	    if (boundedThreadPool) {		if (currentThreadCount < maxWorkerThreads) {		    thread = new WorkerThread(threadGroup, getName());		    currentThreadCount++;		} else {		    // REVIST - Need to create a thread to monitor the		    // the state for deadlock i.e. all threads waiting for		    // something which can be got from the item in the 		    // workqueue, but there is no thread available to		    // process that work item - DEADLOCK !!		    return;		}	    } else {		thread = new WorkerThread(threadGroup, getName());		currentThreadCount++;	    }	}                // The thread must be set to a daemon thread so the        // VM can exit if the only threads left are PooledThreads        // or other daemons.  We don't want to rely on the        // calling thread always being a daemon.        // Catch exceptions since setDaemon can cause a        // security exception to be thrown under netscape        // in the Applet mode        try {            thread.setDaemon(true);        } catch (Exception e) {	    // REVISIT - need to do some logging here	}        thread.start();    }        /**     * This method will return the minimum number of threads maintained     * by the threadpool.     */     public int minimumNumberOfThreads() {        return minWorkerThreads;    }        /**     * This method will return the maximum number of threads in the     * threadpool at any point in time, for the life of the threadpool     */     public int maximumNumberOfThreads() {        return maxWorkerThreads;    }        /**     * This method will return the time in milliseconds when idle     * threads in the threadpool are removed.     */     public long idleTimeoutForThreads() {        return inactivityTimeout;    }        /**     * This method will return the total number of threads currently in the     * threadpool. This method returns a value which is not synchronized.     */     public int currentNumberOfThreads() {	synchronized (lock) {	    return currentThreadCount;	}    }        /**     * This method will return the number of available threads in the     * threadpool which are waiting for work. This method returns a     * value which is not synchronized.     */     public int numberOfAvailableThreads() {	synchronized (lock) {	    return availableWorkerThreads;	}    }        /**     * This method will return the number of busy threads in the threadpool     * This method returns a value which is not synchronized.     */     public int numberOfBusyThreads() {	synchronized (lock) {	    return (currentThreadCount - availableWorkerThreads);	}    }        /**     * This method returns the average elapsed time taken to complete a Work     * item in milliseconds.     */    public long averageWorkCompletionTime() {	synchronized (lock) {	    return (totalTimeTaken / processedCount);	}    }        /**     * This method returns the number of Work items processed by the threadpool     */    public long currentProcessedCount() {	synchronized (lock) {	    return processedCount;	}    }    public String getName() {        return name;    }    /**     * This method will return the number of WorkQueues serviced by the threadpool.     */     public int numberOfWorkQueues() {        return 1;    }     private static synchronized int getUniqueThreadId() {        return ThreadPoolImpl.threadCounter++;    }    private class WorkerThread extends Thread    {        private Work currentWork;        private int threadId = 0; // unique id for the thread        // thread pool this WorkerThread belongs too        private String threadPoolName;	// name seen by Thread.getName()	private StringBuffer workerThreadName = new StringBuffer();        WorkerThread(ThreadGroup tg, String threadPoolName) {	    super(tg, "Idle");	    this.threadId = ThreadPoolImpl.getUniqueThreadId();            this.threadPoolName = threadPoolName;	    setName(composeWorkerThreadName(threadPoolName, "Idle"));        }                public void run() {            while (true) {                try {		    synchronized (lock) {			availableWorkerThreads++;		    }                                        // Get some work to do                    currentWork = ((WorkQueueImpl)workQueue).requestWork(inactivityTimeout);		    synchronized (lock) {			availableWorkerThreads--;			// It is possible in notifyForAvailableWork that the			// check for availableWorkerThreads = 0 may return			// false, because the availableWorkerThreads has not been			// decremented to zero before the producer thread added 			// work to the queue. This may create a deadlock, if the			// executing thread needs information which is in the work			// item queued in the workqueue, but has no thread to work			// on it since none was created because availableWorkerThreads = 0			// returned false.			// The following code will ensure that a thread is always available			// in those situations			if  ((availableWorkerThreads == 0) && 				(workQueue.workItemsInQueue() > 0)) {			    createWorkerThread();			}		    }                    // Set the thread name for debugging.	            setName(composeWorkerThreadName(threadPoolName,				      Integer.toString(this.threadId)));                    long start = System.currentTimeMillis();                    		    try {			// Do the work			currentWork.doWork();		    } catch (Throwable t) {			// Ignore all errors.			;		    }                                        long end = System.currentTimeMillis();                    		    synchronized (lock) {			totalTimeTaken += (end - start);			processedCount++;		    }		    // set currentWork to null so that the work item can be 		    // garbage collected		    currentWork = null;	            setName(composeWorkerThreadName(threadPoolName, "Idle"));                } catch (TimeoutException e) {                    // This thread timed out waiting for something to do.		    synchronized (lock) {			availableWorkerThreads--;			// This should for both bounded and unbounded case			if (currentThreadCount > minWorkerThreads) {			    currentThreadCount--;			    // This thread can exit.			    return;			} else {			    // Go back to waiting on workQueue			    continue;			}		    }                } catch (InterruptedException ie) {                    // InterruptedExceptions are                    // caught here.  Thus, threads can be forced out of                    // requestWork and so they have to reacquire the lock.                    // Other options include ignoring or                    // letting this thread die.                    // Ignoring for now. REVISIT		    synchronized (lock) {			availableWorkerThreads--;		    }                } catch (Throwable e) {                    // Ignore any exceptions that currentWork.process                    // accidently lets through, but let Errors pass.                    // Add debugging output?  REVISIT		    synchronized (lock) {			availableWorkerThreads--;		    }                }            }        }	private String composeWorkerThreadName(String poolName, String workerName) {            workerThreadName.setLength(0);	    workerThreadName.append("p: ").append(poolName);	    workerThreadName.append("; w: ").append(workerName);	    return workerThreadName.toString();	}    } // End of WorkerThread class}// End of file.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -