📄 smartthreadpool.cs
字号:
// Ami Bar
// amibar@gmail.com
//
// Smart thread pool in C#.
// 7 Aug 2004 - Initial release
// 14 Sep 2004 - Bug fixes
// 15 Oct 2004 - Added new features
// - Work items return result.
// - Support waiting synchronization for multiple work items.
// - Work items can be cancelled.
// - Passage of the caller thread抯 context to the thread in the pool.
// - Minimal usage of WIN32 handles.
// - Minor bug fixes.
// 26 Dec 2004 - Changes:
// - Removed static constructors.
// - Added finalizers.
// - Changed Exceptions so they are serializable.
// - Fixed the bug in one of the SmartThreadPool constructors.
// - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters.
// The SmartThreadPool.WaitAny() is still limited by the .NET Framework.
// - Added PostExecute with options on which cases to call it.
// - Added option to dispose of the state objects.
// - Added a WaitForIdle() method that waits until the work items queue is empty.
// - Added an STPStartInfo class for the initialization of the thread pool.
// - Changed exception handling so if a work item throws an exception it
// is rethrown at GetResult(), rather then firing an UnhandledException event.
// Note that PostExecute exception are always ignored.
// 25 Mar 2005 - Changes:
// - Fixed lost of work items bug
// 3 Jul 2005: Changes.
// - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed.
// 16 Aug 2005: Changes.
// - Fixed bug where the InUseThreads becomes negative when canceling work items.
//
// 31 Jan 2006 - Changes:
// - Added work items priority
// - Removed support of chained delegates in callbacks and post executes (nobody really use this)
// - Added work items groups
// - Added work items groups idle event
// - Changed SmartThreadPool.WaitAll() behavior so when it gets empty array
// it returns true rather then throwing an exception.
// - Added option to start the STP and the WIG as suspended
// - Exception behavior changed, the real exception is returned by an
// inner exception
// - Added option to keep the Http context of the caller thread. (Thanks to Steven T.)
// - Added performance counters
// - Added priority to the threads in the pool
//
// 13 Feb 2006 - Changes:
// - Added a call to the dispose of the Performance Counter so
// their won't be a Performance Counter leak.
// - Added exception catch in case the Performance Counters cannot
// be created.
using System;
using System.Security;
using System.Threading;
using System.Collections;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using Amib.Threading.Internal;
namespace Amib.Threading
{
#region SmartThreadPool class
/// <summary>
/// Smart thread pool class.
/// </summary>
public class SmartThreadPool : IWorkItemsGroup, IDisposable
{
#region Default Constants
/// <summary>
/// Default minimum number of threads the thread pool contains. (0)
/// </summary>
public const int DefaultMinWorkerThreads = 0;
/// <summary>
/// Default maximum number of threads the thread pool contains. (25)
/// </summary>
public const int DefaultMaxWorkerThreads = 25;
/// <summary>
/// Default idle timeout in milliseconds. (One minute)
/// </summary>
public const int DefaultIdleTimeout = 60*1000; // One minute
/// <summary>
/// Indicate to copy the security context of the caller and then use it in the call. (false)
/// </summary>
public const bool DefaultUseCallerCallContext = false;
/// <summary>
/// Indicate to copy the HTTP context of the caller and then use it in the call. (false)
/// </summary>
public const bool DefaultUseCallerHttpContext = false;
/// <summary>
/// Indicate to dispose of the state objects if they support the IDispose interface. (false)
/// </summary>
public const bool DefaultDisposeOfStateObjects = false;
/// <summary>
/// The default option to run the post execute
/// </summary>
public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always;
/// <summary>
/// The default post execute method to run.
/// When null it means not to call it.
/// </summary>
public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback = null;
/// <summary>
/// The default work item priority
/// </summary>
public const WorkItemPriority DefaultWorkItemPriority = WorkItemPriority.Normal;
/// <summary>
/// The default is to work on work items as soon as they arrive
/// and not to wait for the start.
/// </summary>
public const bool DefaultStartSuspended = false;
/// <summary>
/// The default is not to use the performance counters
/// </summary>
public static readonly string DefaultPerformanceCounterInstanceName = null;
/// <summary>
/// The default thread priority
/// </summary>
public const ThreadPriority DefaultThreadPriority = ThreadPriority.Normal;
#endregion
#region Member Variables
/// <summary>
/// Contains the name of this instance of SmartThreadPool.
/// Can be changed by the user.
/// </summary>
private string _name = "SmartThreadPool";
/// <summary>
/// Hashtable of all the threads in the thread pool.
/// </summary>
private Hashtable _workerThreads = Hashtable.Synchronized(new Hashtable());
/// <summary>
/// Queue of work items.
/// </summary>
private WorkItemsQueue _workItemsQueue = new WorkItemsQueue();
/// <summary>
/// Count the work items handled.
/// Used by the performance counter.
/// </summary>
private long _workItemsProcessed = 0;
/// <summary>
/// Number of threads that currently work (not idle).
/// </summary>
private int _inUseWorkerThreads = 0;
/// <summary>
/// Start information to use.
/// It is simpler than providing many constructors.
/// </summary>
private STPStartInfo _stpStartInfo = new STPStartInfo();
/// <summary>
/// Total number of work items that are stored in the work items queue
/// plus the work items that the threads in the pool are working on.
/// </summary>
private int _currentWorkItemsCount = 0;
/// <summary>
/// Signaled when the thread pool is idle, i.e. no thread is busy
/// and the work items queue is empty
/// </summary>
private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
/// <summary>
/// An event to signal all the threads to quit immediately.
/// </summary>
private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false);
/// <summary>
/// A flag to indicate the threads to quit.
/// </summary>
private bool _shutdown = false;
/// <summary>
/// Counts the threads created in the pool.
/// It is used to name the threads.
/// </summary>
private int _threadCounter = 0;
/// <summary>
/// Indicate that the SmartThreadPool has been disposed
/// </summary>
private bool _isDisposed = false;
/// <summary>
/// Event to send that the thread pool is idle
/// </summary>
private event EventHandler _stpIdle;
/// <summary>
/// On idle event
/// </summary>
//private event WorkItemsGroupIdleHandler _onIdle;
/// <summary>
/// Holds all the WorkItemsGroup instaces that have at least one
/// work item int the SmartThreadPool
/// This variable is used in case of Shutdown
/// </summary>
private Hashtable _workItemsGroups = Hashtable.Synchronized(new Hashtable());
/// <summary>
/// A reference from each thread in the thread pool to its SmartThreadPool
/// object container.
/// With this variable a thread can know whatever it belongs to a
/// SmartThreadPool.
/// </summary>
[ThreadStatic]
private static SmartThreadPool _smartThreadPool;
/// <summary>
/// A reference to the current work item a thread from the thread pool
/// is executing.
/// </summary>
[ThreadStatic]
private static WorkItem _currentWorkItem;
/// <summary>
/// STP performance counters
/// </summary>
private ISTPInstancePerformanceCounters _pcs = NullSTPInstancePerformanceCounters.Instance;
#endregion
#region Construction and Finalization
/// <summary>
/// Constructor
/// </summary>
public SmartThreadPool()
{
Initialize();
}
/// <summary>
/// Constructor
/// </summary>
/// <param name="idleTimeout">Idle timeout in milliseconds</param>
public SmartThreadPool(int idleTimeout)
{
_stpStartInfo.IdleTimeout = idleTimeout;
Initialize();
}
/// <summary>
/// Constructor
/// </summary>
/// <param name="idleTimeout">Idle timeout in milliseconds</param>
/// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
public SmartThreadPool(
int idleTimeout,
int maxWorkerThreads)
{
_stpStartInfo.IdleTimeout = idleTimeout;
_stpStartInfo.MaxWorkerThreads = maxWorkerThreads;
Initialize();
}
/// <summary>
/// Constructor
/// </summary>
/// <param name="idleTimeout">Idle timeout in milliseconds</param>
/// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
/// <param name="minWorkerThreads">Lower limit of threads in the pool</param>
public SmartThreadPool(
int idleTimeout,
int maxWorkerThreads,
int minWorkerThreads)
{
_stpStartInfo.IdleTimeout = idleTimeout;
_stpStartInfo.MaxWorkerThreads = maxWorkerThreads;
_stpStartInfo.MinWorkerThreads = minWorkerThreads;
Initialize();
}
/// <summary>
/// Constructor
/// </summary>
public SmartThreadPool(STPStartInfo stpStartInfo)
{
_stpStartInfo = new STPStartInfo(stpStartInfo);
Initialize();
}
private void Initialize()
{
ValidateSTPStartInfo();
if (null != _stpStartInfo.PerformanceCounterInstanceName)
{
try
{
_pcs = new STPInstancePerformanceCounters(_stpStartInfo.PerformanceCounterInstanceName);
}
catch(Exception e)
{
Debug.WriteLine("Unable to create Performance Counters: " + e.ToString());
_pcs = NullSTPInstancePerformanceCounters.Instance;
}
}
StartOptimalNumberOfThreads();
}
private void StartOptimalNumberOfThreads()
{
int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads);
threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads);
StartThreads(threadsCount);
}
private void ValidateSTPStartInfo()
{
if (_stpStartInfo.MinWorkerThreads < 0)
{
throw new ArgumentOutOfRangeException(
"MinWorkerThreads", "MinWorkerThreads cannot be negative");
}
if (_stpStartInfo.MaxWorkerThreads <= 0)
{
throw new ArgumentOutOfRangeException(
"MaxWorkerThreads", "MaxWorkerThreads must be greater than zero");
}
if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads)
{
throw new ArgumentOutOfRangeException(
"MinWorkerThreads, maxWorkerThreads",
"MaxWorkerThreads must be greater or equal to MinWorkerThreads");
}
}
private void ValidateCallback(Delegate callback)
{
if(callback.GetInvocationList().Length > 1)
{
throw new NotSupportedException("SmartThreadPool doesn't support delegates chains");
}
}
#endregion
#region Thread Processing
/// <summary>
/// Waits on the queue for a work item, shutdown, or timeout.
/// </summary>
/// <returns>
/// Returns the WaitingCallback or null in case of timeout or shutdown.
/// </returns>
private WorkItem Dequeue()
{
WorkItem workItem =
_workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent);
return workItem;
}
/// <summary>
/// Put a new work item in the queue
/// </summary>
/// <param name="workItem">A work item to queue</param>
private void Enqueue(WorkItem workItem)
{
Enqueue(workItem, true);
}
/// <summary>
/// Put a new work item in the queue
/// </summary>
/// <param name="workItem">A work item to queue</param>
internal void Enqueue(WorkItem workItem, bool incrementWorkItems)
{
// Make sure the workItem is not null
Debug.Assert(null != workItem);
if (incrementWorkItems)
{
IncrementWorkItemsCount();
}
_workItemsQueue.EnqueueWorkItem(workItem);
workItem.WorkItemIsQueued();
// If all the threads are busy then try to create a new one
if ((InUseThreads + WaitingCallbacks) > _workerThreads.Count)
{
StartThreads(1);
}
}
private void IncrementWorkItemsCount()
{
_pcs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
int count = Interlocked.Increment(ref _currentWorkItemsCount);
//Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
if (count == 1)
{
//Trace.WriteLine("STP is NOT idle");
_isIdleWaitHandle.Reset();
}
}
private void DecrementWorkItemsCount()
{
++_workItemsProcessed;
// The counter counts even if the work item was cancelled
_pcs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
int count = Interlocked.Decrement(ref _currentWorkItemsCount);
//Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
if (count == 0)
{
//Trace.WriteLine("STP is idle");
_isIdleWaitHandle.Set();
}
}
internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
{
_workItemsGroups[workItemsGroup] = workItemsGroup;
}
internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
{
if (_workItemsGroups.Contains(workItemsGroup))
{
_workItemsGroups.Remove(workItemsGroup);
}
}
/// <summary>
/// Inform that the current thread is about to quit or quiting.
/// The same thread may call this method more than once.
/// </summary>
private void InformCompleted()
{
// There is no need to lock the two methods together
// since only the current thread removes itself
// and the _workerThreads is a synchronized hashtable
if (_workerThreads.Contains(Thread.CurrentThread))
{
_workerThreads.Remove(Thread.CurrentThread);
_pcs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -