⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 smartthreadpool.cs

📁 线程池实例,1.1版本,用于代替.net自带线程池
💻 CS
📖 第 1 页 / 共 3 页
字号:
// Ami Bar
// amibar@gmail.com
//
// Smart thread pool in C#.
// 7 Aug 2004 - Initial release
// 14 Sep 2004 - Bug fixes 
// 15 Oct 2004 - Added new features
//		- Work items return result.
//		- Support waiting synchronization for multiple work items.
//		- Work items can be cancelled.
//		- Passage of the caller thread抯 context to the thread in the pool.
//		- Minimal usage of WIN32 handles.
//		- Minor bug fixes.
// 26 Dec 2004 - Changes:
//		- Removed static constructors.
//      - Added finalizers.
//		- Changed Exceptions so they are serializable.
//		- Fixed the bug in one of the SmartThreadPool constructors.
//		- Changed the SmartThreadPool.WaitAll() so it will support any number of waiters. 
//        The SmartThreadPool.WaitAny() is still limited by the .NET Framework.
//		- Added PostExecute with options on which cases to call it.
//      - Added option to dispose of the state objects.
//      - Added a WaitForIdle() method that waits until the work items queue is empty.
//      - Added an STPStartInfo class for the initialization of the thread pool.
//      - Changed exception handling so if a work item throws an exception it 
//        is rethrown at GetResult(), rather then firing an UnhandledException event.
//        Note that PostExecute exception are always ignored.
// 25 Mar 2005 - Changes:
//		- Fixed lost of work items bug
// 3 Jul 2005: Changes.
//      - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed. 
// 16 Aug 2005: Changes.
//		- Fixed bug where the InUseThreads becomes negative when canceling work items. 
//
// 31 Jan 2006 - Changes:
//		- Added work items priority
//		- Removed support of chained delegates in callbacks and post executes (nobody really use this)
//		- Added work items groups
//		- Added work items groups idle event
//		- Changed SmartThreadPool.WaitAll() behavior so when it gets empty array
//		  it returns true rather then throwing an exception.
//		- Added option to start the STP and the WIG as suspended
//		- Exception behavior changed, the real exception is returned by an 
//		  inner exception
//		- Added option to keep the Http context of the caller thread. (Thanks to Steven T.)
//		- Added performance counters
//		- Added priority to the threads in the pool
//
// 13 Feb 2006 - Changes:
//		- Added a call to the dispose of the Performance Counter so
//		  their won't be a Performance Counter leak.
//		- Added exception catch in case the Performance Counters cannot 
//		  be created.

using System;
using System.Security;
using System.Threading;
using System.Collections;
using System.Diagnostics;
using System.Runtime.CompilerServices;

using Amib.Threading.Internal;

namespace Amib.Threading
{
	#region SmartThreadPool class
	/// <summary>
	/// Smart thread pool class.
	/// </summary>
	public class SmartThreadPool : IWorkItemsGroup, IDisposable
	{
		#region Default Constants

		/// <summary>
		/// Default minimum number of threads the thread pool contains. (0)
		/// </summary>
		public const int DefaultMinWorkerThreads = 0;

		/// <summary>
		/// Default maximum number of threads the thread pool contains. (25)
		/// </summary>
		public const int DefaultMaxWorkerThreads = 25;

		/// <summary>
		/// Default idle timeout in milliseconds. (One minute)
		/// </summary>
		public const int DefaultIdleTimeout = 60*1000; // One minute

		/// <summary>
		/// Indicate to copy the security context of the caller and then use it in the call. (false)
		/// </summary>
		public const bool DefaultUseCallerCallContext = false; 

		/// <summary>
		/// Indicate to copy the HTTP context of the caller and then use it in the call. (false)
		/// </summary>
		public const bool DefaultUseCallerHttpContext = false;

		/// <summary>
		/// Indicate to dispose of the state objects if they support the IDispose interface. (false)
		/// </summary>
		public const bool DefaultDisposeOfStateObjects = false; 

		/// <summary>
		/// The default option to run the post execute
		/// </summary>
		public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always;

		/// <summary>
		/// The default post execute method to run. 
		/// When null it means not to call it.
		/// </summary>
		public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback = null;

		/// <summary>
		/// The default work item priority
		/// </summary>
		public const WorkItemPriority DefaultWorkItemPriority = WorkItemPriority.Normal;

		/// <summary>
		/// The default is to work on work items as soon as they arrive
		/// and not to wait for the start.
		/// </summary>
		public const bool DefaultStartSuspended = false;

		/// <summary>
		/// The default is not to use the performance counters
		/// </summary>
		public static readonly string DefaultPerformanceCounterInstanceName = null;

		/// <summary>
		/// The default thread priority
		/// </summary>
		public const ThreadPriority DefaultThreadPriority = ThreadPriority.Normal;

		#endregion

		#region Member Variables

		/// <summary>
		/// Contains the name of this instance of SmartThreadPool.
		/// Can be changed by the user.
		/// </summary>
		private string _name = "SmartThreadPool";

		/// <summary>
		/// Hashtable of all the threads in the thread pool.
		/// </summary>
		private Hashtable _workerThreads = Hashtable.Synchronized(new Hashtable());

		/// <summary>
		/// Queue of work items.
		/// </summary>
		private WorkItemsQueue _workItemsQueue = new WorkItemsQueue();

		/// <summary>
		/// Count the work items handled.
		/// Used by the performance counter.
		/// </summary>
		private long _workItemsProcessed = 0;

		/// <summary>
		/// Number of threads that currently work (not idle).
		/// </summary>
		private int _inUseWorkerThreads = 0;

		/// <summary>
		/// Start information to use. 
		/// It is simpler than providing many constructors.
		/// </summary>
		private STPStartInfo _stpStartInfo = new STPStartInfo();

		/// <summary>
		/// Total number of work items that are stored in the work items queue 
		/// plus the work items that the threads in the pool are working on.
		/// </summary>
		private int _currentWorkItemsCount = 0;

		/// <summary>
		/// Signaled when the thread pool is idle, i.e. no thread is busy
		/// and the work items queue is empty
		/// </summary>
		private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);

		/// <summary>
		/// An event to signal all the threads to quit immediately.
		/// </summary>
		private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false);

		/// <summary>
		/// A flag to indicate the threads to quit.
		/// </summary>
		private bool _shutdown = false;

		/// <summary>
		/// Counts the threads created in the pool.
		/// It is used to name the threads.
		/// </summary>
		private int _threadCounter = 0;

		/// <summary>
		/// Indicate that the SmartThreadPool has been disposed
		/// </summary>
		private bool _isDisposed = false;

		/// <summary>
		/// Event to send that the thread pool is idle
		/// </summary>
		private event EventHandler _stpIdle;

		/// <summary>
		/// On idle event
		/// </summary>
		//private event WorkItemsGroupIdleHandler _onIdle;

		/// <summary>
		/// Holds all the WorkItemsGroup instaces that have at least one 
		/// work item int the SmartThreadPool
		/// This variable is used in case of Shutdown
		/// </summary>
		private Hashtable _workItemsGroups = Hashtable.Synchronized(new Hashtable());

		/// <summary>
		/// A reference from each thread in the thread pool to its SmartThreadPool
		/// object container.
		/// With this variable a thread can know whatever it belongs to a 
		/// SmartThreadPool.
		/// </summary>
		[ThreadStatic]
		private static SmartThreadPool _smartThreadPool;

		/// <summary>
		/// A reference to the current work item a thread from the thread pool 
		/// is executing.
		/// </summary>
		[ThreadStatic]
		private static WorkItem _currentWorkItem;

		/// <summary>
		/// STP performance counters
		/// </summary>
		private ISTPInstancePerformanceCounters _pcs = NullSTPInstancePerformanceCounters.Instance;

		#endregion

		#region Construction and Finalization

		/// <summary>
		/// Constructor
		/// </summary>
		public SmartThreadPool()
		{
			Initialize();
		}

		/// <summary>
		/// Constructor
		/// </summary>
		/// <param name="idleTimeout">Idle timeout in milliseconds</param>
		public SmartThreadPool(int idleTimeout)
		{
			_stpStartInfo.IdleTimeout = idleTimeout;
			Initialize();
		}

		/// <summary>
		/// Constructor
		/// </summary>
		/// <param name="idleTimeout">Idle timeout in milliseconds</param>
		/// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
		public SmartThreadPool(
			int idleTimeout,
			int maxWorkerThreads)
		{
			_stpStartInfo.IdleTimeout = idleTimeout;
			_stpStartInfo.MaxWorkerThreads = maxWorkerThreads;
			Initialize();
		}

		/// <summary>
		/// Constructor
		/// </summary>
		/// <param name="idleTimeout">Idle timeout in milliseconds</param>
		/// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
		/// <param name="minWorkerThreads">Lower limit of threads in the pool</param>
		public SmartThreadPool(
			int idleTimeout,
			int maxWorkerThreads,
			int minWorkerThreads)
		{
			_stpStartInfo.IdleTimeout = idleTimeout;
			_stpStartInfo.MaxWorkerThreads = maxWorkerThreads;
			_stpStartInfo.MinWorkerThreads = minWorkerThreads;
			Initialize();
		}

		/// <summary>
		/// Constructor
		/// </summary>
		public SmartThreadPool(STPStartInfo stpStartInfo)
		{
			_stpStartInfo = new STPStartInfo(stpStartInfo);
			Initialize();
		}

		private void Initialize()
		{
			ValidateSTPStartInfo();

			if (null != _stpStartInfo.PerformanceCounterInstanceName)
			{
				try
				{
					_pcs = new STPInstancePerformanceCounters(_stpStartInfo.PerformanceCounterInstanceName);
				}
				catch(Exception e)
				{
					Debug.WriteLine("Unable to create Performance Counters: " + e.ToString());
					_pcs = NullSTPInstancePerformanceCounters.Instance;
				}
			}

			StartOptimalNumberOfThreads();
		}

		private void StartOptimalNumberOfThreads()
		{
			int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads);
			threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads);
			StartThreads(threadsCount);
		}

		private void ValidateSTPStartInfo()
		{
			if (_stpStartInfo.MinWorkerThreads < 0)
			{
				throw new ArgumentOutOfRangeException(
					"MinWorkerThreads", "MinWorkerThreads cannot be negative");
			}

			if (_stpStartInfo.MaxWorkerThreads <= 0)
			{
				throw new ArgumentOutOfRangeException(
					"MaxWorkerThreads", "MaxWorkerThreads must be greater than zero");
			}

			if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads)
			{
				throw new ArgumentOutOfRangeException(
					"MinWorkerThreads, maxWorkerThreads", 
					"MaxWorkerThreads must be greater or equal to MinWorkerThreads");
			}
		}

		private void ValidateCallback(Delegate callback)
		{
			if(callback.GetInvocationList().Length > 1)
			{
				throw new NotSupportedException("SmartThreadPool doesn't support delegates chains");
			}
		}

		#endregion

		#region Thread Processing

		/// <summary>
		/// Waits on the queue for a work item, shutdown, or timeout.
		/// </summary>
		/// <returns>
		/// Returns the WaitingCallback or null in case of timeout or shutdown.
		/// </returns>
		private WorkItem Dequeue()
		{
			WorkItem workItem = 
				_workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent);

			return workItem;
		}

		/// <summary>
		/// Put a new work item in the queue
		/// </summary>
		/// <param name="workItem">A work item to queue</param>
		private void Enqueue(WorkItem workItem)
		{
			Enqueue(workItem, true);
		}

		/// <summary>
		/// Put a new work item in the queue
		/// </summary>
		/// <param name="workItem">A work item to queue</param>
		internal void Enqueue(WorkItem workItem, bool incrementWorkItems)
		{
			// Make sure the workItem is not null
			Debug.Assert(null != workItem);

			if (incrementWorkItems)
			{
				IncrementWorkItemsCount();
			}

			_workItemsQueue.EnqueueWorkItem(workItem);
			workItem.WorkItemIsQueued();

			// If all the threads are busy then try to create a new one
			if ((InUseThreads + WaitingCallbacks) > _workerThreads.Count) 
			{
				StartThreads(1);
			}
		}

		private void IncrementWorkItemsCount()
		{
			_pcs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);

			int count = Interlocked.Increment(ref _currentWorkItemsCount);
			//Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
			if (count == 1) 
			{
				//Trace.WriteLine("STP is NOT idle");
				_isIdleWaitHandle.Reset();
			}
		}

		private void DecrementWorkItemsCount()
		{
			++_workItemsProcessed;

			// The counter counts even if the work item was cancelled
			_pcs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);

			int count = Interlocked.Decrement(ref _currentWorkItemsCount);
			//Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
			if (count == 0) 
			{
				//Trace.WriteLine("STP is idle");
				_isIdleWaitHandle.Set();
			}
		}

		internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
		{
			_workItemsGroups[workItemsGroup] = workItemsGroup;
		}

		internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
		{
			if (_workItemsGroups.Contains(workItemsGroup))
			{
				_workItemsGroups.Remove(workItemsGroup);
			}
		}

		/// <summary>
		/// Inform that the current thread is about to quit or quiting.
		/// The same thread may call this method more than once.
		/// </summary>
		private void InformCompleted()
		{
			// There is no need to lock the two methods together 
			// since only the current thread removes itself
			// and the _workerThreads is a synchronized hashtable
			if (_workerThreads.Contains(Thread.CurrentThread))
			{
				_workerThreads.Remove(Thread.CurrentThread);
				_pcs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
			}
		}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -