⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 workitemsgroup.cs

📁 线程池实例,1.1版本,用于代替.net自带线程池
💻 CS
字号:
// Ami Bar
// amibar@gmail.com

using System;
using System.Threading;
using System.Runtime.CompilerServices;
using System.Diagnostics;

namespace Amib.Threading.Internal
{
	#region WorkItemsGroup class 

	/// <summary>
	/// Summary description for WorkItemsGroup.
	/// </summary>
	public class WorkItemsGroup : IWorkItemsGroup
	{
		#region Private members

		private object _lock = new object();
		/// <summary>
		/// Contains the name of this instance of SmartThreadPool.
		/// Can be changed by the user.
		/// </summary>
		private string _name = "WorkItemsGroup";

		/// <summary>
		/// A reference to the SmartThreadPool instance that created this 
		/// WorkItemsGroup.
		/// </summary>
		private SmartThreadPool _stp;

		/// <summary>
		/// The OnIdle event
		/// </summary>
		private event WorkItemsGroupIdleHandler _onIdle;

		/// <summary>
		/// Defines how many work items of this WorkItemsGroup can run at once.
		/// </summary>
		private int _concurrency;

		/// <summary>
		/// Priority queue to hold work items before they are passed 
		/// to the SmartThreadPool.
		/// </summary>
		private PriorityQueue _workItemsQueue;

		/// <summary>
		/// Indicate how many work items are waiting in the SmartThreadPool
		/// queue.
		/// This value is used to apply the concurrency.
		/// </summary>
		private int _workItemsInStpQueue;

		/// <summary>
		/// Indicate how many work items are currently running in the SmartThreadPool.
		/// This value is used with the Cancel, to calculate if we can send new 
		/// work items to the STP.
		/// </summary>
		private int _workItemsExecutingInStp = 0;

		/// <summary>
		/// WorkItemsGroup start information
		/// </summary>
		private WIGStartInfo _workItemsGroupStartInfo;

		/// <summary>
		/// Signaled when all of the WorkItemsGroup's work item completed.
		/// </summary>
		private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);

		/// <summary>
		/// A common object for all the work items that this work items group
		/// generate so we can mark them to cancel in O(1)
		/// </summary>
		private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();

		#endregion 

		#region Construction

		public WorkItemsGroup(
			SmartThreadPool stp, 
			int concurrency, 
			WIGStartInfo wigStartInfo)
		{
			if (concurrency <= 0)
			{
				throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero");
			}
			_stp = stp;
			_concurrency = concurrency;
			_workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo);
			_workItemsQueue = new PriorityQueue();

			// The _workItemsInStpQueue gets the number of currently executing work items,
			// because once a work item is executing, it cannot be cancelled.
			_workItemsInStpQueue = _workItemsExecutingInStp;
		}

		#endregion 

		#region IWorkItemsGroup implementation

		/// <summary>
		/// Get/Set the name of the SmartThreadPool instance
		/// </summary>
		public string Name 
		{ 
			get
			{
				return _name;
			}

			set
			{
				_name = value;
			}
		}

		/// <summary>
		/// Queue a work item
		/// </summary>
		/// <param name="callback">A callback to execute</param>
		/// <returns>Returns a work item result</returns>
		public IWorkItemResult QueueWorkItem(WorkItemCallback callback)
		{
			WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback);
			EnqueueToSTPNextWorkItem(workItem);
			return workItem.GetWorkItemResult();
		}

		/// <summary>
		/// Queue a work item
		/// </summary>
		/// <param name="callback">A callback to execute</param>
		/// <param name="workItemPriority">The priority of the work item</param>
		/// <returns>Returns a work item result</returns>
		public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority)
		{
			WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority);
			EnqueueToSTPNextWorkItem(workItem);
			return workItem.GetWorkItemResult();
		}

		/// <summary>
		/// Queue a work item
		/// </summary>
		/// <param name="workItemInfo">Work item info</param>
		/// <param name="callback">A callback to execute</param>
		/// <returns>Returns a work item result</returns>
		public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback)
		{
			WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback);
			EnqueueToSTPNextWorkItem(workItem);
			return workItem.GetWorkItemResult();
		}

		/// <summary>
		/// Queue a work item
		/// </summary>
		/// <param name="callback">A callback to execute</param>
		/// <param name="state">
		/// The context object of the work item. Used for passing arguments to the work item. 
		/// </param>
		/// <returns>Returns a work item result</returns>
		public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state)
		{
			WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state);
			EnqueueToSTPNextWorkItem(workItem);
			return workItem.GetWorkItemResult();
		}

		/// <summary>
		/// Queue a work item
		/// </summary>
		/// <param name="callback">A callback to execute</param>
		/// <param name="state">
		/// The context object of the work item. Used for passing arguments to the work item. 
		/// </param>
		/// <param name="workItemPriority">The work item priority</param>
		/// <returns>Returns a work item result</returns>
		public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority)
		{
			WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority);
			EnqueueToSTPNextWorkItem(workItem);
			return workItem.GetWorkItemResult();
		}

		/// <summary>
		/// Queue a work item
		/// </summary>
		/// <param name="workItemInfo">Work item information</param>
		/// <param name="callback">A callback to execute</param>
		/// <param name="state">
		/// The context object of the work item. Used for passing arguments to the work item. 
		/// </param>
		/// <returns>Returns a work item result</returns>
		public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state)
		{
			WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state);
			EnqueueToSTPNextWorkItem(workItem);
			return workItem.GetWorkItemResult();
		}

		/// <summary>
		/// Queue a work item
		/// </summary>
		/// <param name="callback">A callback to execute</param>
		/// <param name="state">
		/// The context object of the work item. Used for passing arguments to the work item. 
		/// </param>
		/// <param name="postExecuteWorkItemCallback">
		/// A delegate to call after the callback completion
		/// </param>
		/// <returns>Returns a work item result</returns>
		public IWorkItemResult QueueWorkItem(
			WorkItemCallback callback, 
			object state,
			PostExecuteWorkItemCallback postExecuteWorkItemCallback)
		{
			WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback);
			EnqueueToSTPNextWorkItem(workItem);
			return workItem.GetWorkItemResult();
		}

		/// <summary>
		/// Queue a work item
		/// </summary>
		/// <param name="callback">A callback to execute</param>
		/// <param name="state">
		/// The context object of the work item. Used for passing arguments to the work item. 
		/// </param>
		/// <param name="postExecuteWorkItemCallback">
		/// A delegate to call after the callback completion
		/// </param>
		/// <param name="workItemPriority">The work item priority</param>
		/// <returns>Returns a work item result</returns>
		public IWorkItemResult QueueWorkItem(
			WorkItemCallback callback, 
			object state,
			PostExecuteWorkItemCallback postExecuteWorkItemCallback,
			WorkItemPriority workItemPriority)
		{
			WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority);
			EnqueueToSTPNextWorkItem(workItem);
			return workItem.GetWorkItemResult();
		}

		/// <summary>
		/// Queue a work item
		/// </summary>
		/// <param name="callback">A callback to execute</param>
		/// <param name="state">
		/// The context object of the work item. Used for passing arguments to the work item. 
		/// </param>
		/// <param name="postExecuteWorkItemCallback">
		/// A delegate to call after the callback completion
		/// </param>
		/// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param>
		/// <returns>Returns a work item result</returns>
		public IWorkItemResult QueueWorkItem(
			WorkItemCallback callback, 
			object state,
			PostExecuteWorkItemCallback postExecuteWorkItemCallback,
			CallToPostExecute callToPostExecute)
		{
			WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute);
			EnqueueToSTPNextWorkItem(workItem);
			return workItem.GetWorkItemResult();
		}

		/// <summary>
		/// Queue a work item
		/// </summary>
		/// <param name="callback">A callback to execute</param>
		/// <param name="state">
		/// The context object of the work item. Used for passing arguments to the work item. 
		/// </param>
		/// <param name="postExecuteWorkItemCallback">
		/// A delegate to call after the callback completion
		/// </param>
		/// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param>
		/// <param name="workItemPriority">The work item priority</param>
		/// <returns>Returns a work item result</returns>
		public IWorkItemResult QueueWorkItem(
			WorkItemCallback callback, 
			object state,
			PostExecuteWorkItemCallback postExecuteWorkItemCallback,
			CallToPostExecute callToPostExecute,
			WorkItemPriority workItemPriority)
		{
			WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority);
			EnqueueToSTPNextWorkItem(workItem);
			return workItem.GetWorkItemResult();
		}

		/// <summary>
		/// Wait for the thread pool to be idle
		/// </summary>
		public void WaitForIdle()
		{
			WaitForIdle(Timeout.Infinite);
		}

		/// <summary>
		/// Wait for the thread pool to be idle
		/// </summary>
		public bool WaitForIdle(TimeSpan timeout)
		{
			return WaitForIdle((int)timeout.TotalMilliseconds);
		}

		/// <summary>
		/// Wait for the thread pool to be idle
		/// </summary>
		public bool WaitForIdle(int millisecondsTimeout)
		{
			_stp.ValidateWorkItemsGroupWaitForIdle(this);
			return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false);
		}

		public int WaitingCallbacks
		{
			get
			{
				return _workItemsQueue.Count;
			}
		}

		public event WorkItemsGroupIdleHandler OnIdle
		{
			add
			{
				_onIdle += value;
			}
			remove
			{
				_onIdle -= value;
			}
		}

		public void Cancel()
		{
			lock(_lock)
			{
				_canceledWorkItemsGroup.IsCanceled = true;
				_workItemsQueue.Clear();
				_workItemsInStpQueue = 0;
				_canceledWorkItemsGroup = new CanceledWorkItemsGroup();
			}
		}

		public void Start()
		{
			lock (this)
			{
				if (!_workItemsGroupStartInfo.StartSuspended)
				{
					return;
				}
				_workItemsGroupStartInfo.StartSuspended = false;
			}
			
			for(int i = 0; i < _concurrency; ++i)
			{
				EnqueueToSTPNextWorkItem(null, false);
			}
		}

		#endregion 

		#region Private methods

		private void RegisterToWorkItemCompletion(IWorkItemResult wir)
		{
			IInternalWorkItemResult iwir = wir as IInternalWorkItemResult;
			iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback);
			iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback);
		}

		public void OnSTPIsStarting()
		{
			lock (this)
			{
				if (_workItemsGroupStartInfo.StartSuspended)
				{
					return;
				}
			}
			
			for(int i = 0; i < _concurrency; ++i)
			{
				EnqueueToSTPNextWorkItem(null, false);
			}
		}

		private object FireOnIdle(object state)
		{
			FireOnIdleImpl(_onIdle);
			return null;
		}

		[MethodImpl(MethodImplOptions.NoInlining)]
		private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
		{
			if(null == onIdle)
			{
				return;
			}

			Delegate[] delegates = onIdle.GetInvocationList();
			foreach(WorkItemsGroupIdleHandler eh in delegates)
			{
				try
				{
					eh(this);
				}
					// Ignore exceptions
				catch{} 
			}
		}

		private void OnWorkItemStartedCallback(WorkItem workItem)
		{
			lock(_lock)
			{
				++_workItemsExecutingInStp;
			}
		}

		private void OnWorkItemCompletedCallback(WorkItem workItem)
		{
			EnqueueToSTPNextWorkItem(null, true);
		}

		private void EnqueueToSTPNextWorkItem(WorkItem workItem)
		{
			EnqueueToSTPNextWorkItem(workItem, false);
		}

		private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
		{
			lock(_lock)
			{
				// Got here from OnWorkItemCompletedCallback()
				if (decrementWorkItemsInStpQueue)
				{
					--_workItemsInStpQueue;

					if(_workItemsInStpQueue < 0)
					{
						_workItemsInStpQueue = 0;
					}

					--_workItemsExecutingInStp;

					if(_workItemsExecutingInStp < 0)
					{
						_workItemsExecutingInStp = 0;
					}
				}

				// If the work item is not null then enqueue it
				if (null != workItem)
				{
					workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;

					RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
					_workItemsQueue.Enqueue(workItem);
					//_stp.IncrementWorkItemsCount();

					if ((1 == _workItemsQueue.Count) && 
						(0 == _workItemsInStpQueue))
					{
						_stp.RegisterWorkItemsGroup(this);
						Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle");
						_isIdleWaitHandle.Reset();
					}
				}

				// If the work items queue of the group is empty than quit
				if (0 == _workItemsQueue.Count)
				{
					if (0 == _workItemsInStpQueue)
					{
						_stp.UnregisterWorkItemsGroup(this);
						Trace.WriteLine("WorkItemsGroup " + Name + " is idle");
						_isIdleWaitHandle.Set();
						_stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle));
					}
					return;
				}

				if (!_workItemsGroupStartInfo.StartSuspended)
				{
					if (_workItemsInStpQueue < _concurrency)
					{
						WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
						_stp.Enqueue(nextWorkItem, true);
						++_workItemsInStpQueue;
					}
				}
			}
		}

		#endregion
	}

	#endregion
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -