⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 workitem.cs

📁 线程池实例,1.1版本,用于代替.net自带线程池
💻 CS
📖 第 1 页 / 共 2 页
字号:
// Ami Bar
// amibar@gmail.com

using System;
using System.Threading;
using System.Diagnostics;

namespace Amib.Threading.Internal
{
	#region WorkItem Delegate

	/// <summary>
	/// An internal delegate to call when the WorkItem starts or completes
	/// </summary>
	internal delegate void WorkItemStateCallback(WorkItem workItem);

	#endregion

	#region IInternalWorkItemResult interface 

	public class CanceledWorkItemsGroup
	{
		public readonly static CanceledWorkItemsGroup NotCanceledWorkItemsGroup = new CanceledWorkItemsGroup();

		private bool _isCanceled = false;
		public bool IsCanceled 
		{ 
			get { return _isCanceled; }
			set { _isCanceled = value; }
		}
	}

	internal interface IInternalWorkItemResult
	{
		event WorkItemStateCallback OnWorkItemStarted;
		event WorkItemStateCallback OnWorkItemCompleted;
	}

	#endregion

	#region IWorkItem interface

	public interface IWorkItem
	{

	}

	#endregion

	#region WorkItem class

	/// <summary>
	/// Holds a callback delegate and the state for that delegate.
	/// </summary>
	public class WorkItem : IHasWorkItemPriority, IWorkItem
	{
		#region WorkItemState enum

		/// <summary>
		/// Indicates the state of the work item in the thread pool
		/// </summary>
		private enum WorkItemState
		{
			InQueue,
			InProgress,
			Completed,
			Canceled,
		}

		#endregion

		#region Member Variables

		/// <summary>
		/// Callback delegate for the callback.
		/// </summary>
		private WorkItemCallback _callback;

		/// <summary>
		/// State with which to call the callback delegate.
		/// </summary>
		private object _state;

		/// <summary>
		/// Stores the caller's context
		/// </summary>
		private CallerThreadContext _callerContext;

		/// <summary>
		/// Holds the result of the mehtod
		/// </summary>
		private object _result;

        /// <summary>
        /// Hold the exception if the method threw it
        /// </summary>
        private Exception _exception;

		/// <summary>
		/// Hold the state of the work item
		/// </summary>
		private WorkItemState _workItemState;

		/// <summary>
		/// A ManualResetEvent to indicate that the result is ready
		/// </summary>
		private ManualResetEvent _workItemCompleted;

		/// <summary>
		/// A reference count to the _workItemCompleted. 
		/// When it reaches to zero _workItemCompleted is Closed
		/// </summary>
		private int _workItemCompletedRefCount;

		/// <summary>
		/// Represents the result state of the work item
		/// </summary>
		private WorkItemResult _workItemResult;

		/// <summary>
		/// Work item info
		/// </summary>
		private WorkItemInfo _workItemInfo;

		/// <summary>
		/// Called when the WorkItem starts
		/// </summary>
		private event WorkItemStateCallback _workItemStartedEvent;

		/// <summary>
		/// Called when the WorkItem completes
		/// </summary>
		private event WorkItemStateCallback _workItemCompletedEvent;

		/// <summary>
		/// A reference to an object that indicates whatever the 
		/// WorkItemsGroup has been canceled
		/// </summary>
		private CanceledWorkItemsGroup _canceledWorkItemsGroup = CanceledWorkItemsGroup.NotCanceledWorkItemsGroup;

		/// <summary>
		/// The work item group this work item belong to.
		/// 
		/// </summary>
		private IWorkItemsGroup _workItemsGroup;

		#region Performance Counter fields

		/// <summary>
		/// The time when the work items is queued.
		/// Used with the performance counter.
		/// </summary>
		private DateTime _queuedTime;

		/// <summary>
		/// The time when the work items starts its execution.
		/// Used with the performance counter.
		/// </summary>
		private DateTime _beginProcessTime;

		/// <summary>
		/// The time when the work items ends its execution.
		/// Used with the performance counter.
		/// </summary>
		private DateTime _endProcessTime;

		#endregion

		#endregion

		#region Properties

		public TimeSpan WaitingTime
		{
			get 
			{
				return (_beginProcessTime - _queuedTime);
			}
		}

		public TimeSpan ProcessTime
		{
			get 
			{
				return (_endProcessTime - _beginProcessTime);
			}
		}

		#endregion

		#region Construction

		/// <summary>
		/// Initialize the callback holding object.
		/// </summary>
		/// <param name="callback">Callback delegate for the callback.</param>
		/// <param name="state">State with which to call the callback delegate.</param>
		/// 
		/// We assume that the WorkItem object is created within the thread
		/// that meant to run the callback
		public WorkItem(
			IWorkItemsGroup workItemsGroup,
			WorkItemInfo workItemInfo,
			WorkItemCallback callback, 
			object state)
		{
			_workItemsGroup = workItemsGroup;
			_workItemInfo = workItemInfo;

			if (_workItemInfo.UseCallerCallContext || _workItemInfo.UseCallerHttpContext)
			{
				_callerContext = CallerThreadContext.Capture(_workItemInfo.UseCallerCallContext, _workItemInfo.UseCallerHttpContext);
			}

			_callback = callback;
			_state = state;
			_workItemResult = new WorkItemResult(this);
			Initialize();
		}

		internal void Initialize()
		{
			_workItemState = WorkItemState.InQueue;
			_workItemCompleted = null;
			_workItemCompletedRefCount = 0;
		}

		internal bool WasQueuedBy(IWorkItemsGroup workItemsGroup)
		{
			return (workItemsGroup == _workItemsGroup);
		}


		#endregion

		#region Methods

		public CanceledWorkItemsGroup CanceledWorkItemsGroup
		{
			get
			{
				return _canceledWorkItemsGroup;
			}

			set
			{
				_canceledWorkItemsGroup = value;
			}
		}

		/// <summary>
		/// Change the state of the work item to in progress if it wasn't canceled.
		/// </summary>
		/// <returns>
		/// Return true on success or false in case the work item was canceled.
		/// If the work item needs to run a post execute then the method will return true.
		/// </returns>
		public bool StartingWorkItem()
		{
			_beginProcessTime = DateTime.Now;

			lock(this)
			{
				if (IsCanceled)
				{
                    bool result = false;
					if ((_workItemInfo.PostExecuteWorkItemCallback != null) &&
                        ((_workItemInfo.CallToPostExecute & CallToPostExecute.WhenWorkItemCanceled) == CallToPostExecute.WhenWorkItemCanceled))
					{
						result = true;
					}

                    return result;
				}

				Debug.Assert(WorkItemState.InQueue == GetWorkItemState());

				SetWorkItemState(WorkItemState.InProgress);
			}

			return true;
		}

		/// <summary>
		/// Execute the work item and the post execute
		/// </summary>
		public void Execute()
		{
            CallToPostExecute currentCallToPostExecute = 0;

			// Execute the work item if we are in the correct state
			switch(GetWorkItemState())
			{
				case WorkItemState.InProgress:
					currentCallToPostExecute |= CallToPostExecute.WhenWorkItemNotCanceled;
					ExecuteWorkItem();
					break;
				case WorkItemState.Canceled:
					currentCallToPostExecute |= CallToPostExecute.WhenWorkItemCanceled;
					break;
				default:
					Debug.Assert(false);
					throw new NotSupportedException();
			}

            // Run the post execute as needed
			if ((currentCallToPostExecute & _workItemInfo.CallToPostExecute) != 0)
			{
				PostExecute();
			}

			_endProcessTime = DateTime.Now;
		}

		internal void FireWorkItemCompleted()
		{
			try
			{
				if (null != _workItemCompletedEvent)
				{
					_workItemCompletedEvent(this);
				}
			}
			catch // Ignore exceptions
			{}
		}

        /// <summary>
        /// Execute the work item
        /// </summary>
        private void ExecuteWorkItem()
        {
            CallerThreadContext ctc = null;
            if (null != _callerContext)
            {
                ctc = CallerThreadContext.Capture(_callerContext.CapturedCallContext, _callerContext.CapturedHttpContext);
                CallerThreadContext.Apply(_callerContext);
            }

            Exception exception = null;
            object result = null;

            try
            {
                result = _callback(_state);
            }
            catch (Exception e) 
            {
                // Save the exception so we can rethrow it later
                exception = e;
            }
		
            if (null != _callerContext)
            {
                CallerThreadContext.Apply(ctc);
            }

            SetResult(result, exception);
        }

		/// <summary>
		/// Runs the post execute callback
		/// </summary>
		private void PostExecute()
		{
			if (null != _workItemInfo.PostExecuteWorkItemCallback)
			{
                try
                {
                    _workItemInfo.PostExecuteWorkItemCallback(this._workItemResult);
                }
                catch (Exception e) 
                {
                    Debug.Assert(null != e);
                }
			}
		}

		/// <summary>
		/// Set the result of the work item to return
		/// </summary>
		/// <param name="result">The result of the work item</param>
		internal void SetResult(object result, Exception exception)
		{
			_result = result;
            _exception = exception;
			SignalComplete(false);
		}

		/// <summary>
		/// Returns the work item result
		/// </summary>
		/// <returns>The work item result</returns>
		internal IWorkItemResult GetWorkItemResult()
		{
			return _workItemResult;
		}

		/// <summary>
		/// Wait for all work items to complete
		/// </summary>
		/// <param name="workItemResults">Array of work item result objects</param>
		/// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
		/// <param name="exitContext">
		/// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 
		/// </param>
		/// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
		/// <returns>
		/// true when every work item in workItemResults has completed; otherwise false.
		/// </returns>
		internal static bool WaitAll(
			IWorkItemResult [] workItemResults,
			int millisecondsTimeout,
			bool exitContext,
			WaitHandle cancelWaitHandle)
		{
			if (0 == workItemResults.Length)
			{
				return true;
			}

			bool success;
			WaitHandle [] waitHandles = new WaitHandle[workItemResults.Length];;
			GetWaitHandles(workItemResults, waitHandles);

			if ((null == cancelWaitHandle) && (waitHandles.Length <= 64))
			{
				success = WaitHandle.WaitAll(waitHandles, millisecondsTimeout, exitContext);
			}
			else
			{
				success = true;
				int millisecondsLeft = millisecondsTimeout;
				DateTime start = DateTime.Now;

				WaitHandle [] whs;
				if (null != cancelWaitHandle)
				{
					whs = new WaitHandle [] { null, cancelWaitHandle };
				}
				else
				{
					whs = new WaitHandle [] { null };
				}

                bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout);
				// Iterate over the wait handles and wait for each one to complete.
				// We cannot use WaitHandle.WaitAll directly, because the cancelWaitHandle
				// won't affect it.
				// Each iteration we update the time left for the timeout.
				for(int i = 0; i < workItemResults.Length; ++i)
				{
                    // WaitAny don't work with negative numbers
                    if (!waitInfinitely && (millisecondsLeft < 0))
                    {
                        success = false;
                        break;
                    }

					whs[0] = waitHandles[i];
					int result = WaitHandle.WaitAny(whs, millisecondsLeft, exitContext);
					if((result > 0) || (WaitHandle.WaitTimeout == result))
					{
						success = false;
						break;
					}

					if(!waitInfinitely)
					{
                        // Update the time left to wait
						TimeSpan ts = DateTime.Now - start;
						millisecondsLeft = millisecondsTimeout - (int)ts.TotalMilliseconds;
					}
				}
			}
			// Release the wait handles
			ReleaseWaitHandles(workItemResults);

			return success;
		}

		/// <summary>
		/// Waits for any of the work items in the specified array to complete, cancel, or timeout
		/// </summary>
		/// <param name="workItemResults">Array of work item result objects</param>
		/// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
		/// <param name="exitContext">
		/// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. 
		/// </param>
		/// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
		/// <returns>
		/// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
		/// </returns>
		internal static int WaitAny(			
			IWorkItemResult [] workItemResults,
			int millisecondsTimeout,
			bool exitContext,
			WaitHandle cancelWaitHandle)
		{
			WaitHandle [] waitHandles = null;

			if (null != cancelWaitHandle)
			{
				waitHandles = new WaitHandle[workItemResults.Length+1];
				GetWaitHandles(workItemResults, waitHandles);
				waitHandles[workItemResults.Length] = cancelWaitHandle;
			}
			else
			{
				waitHandles = new WaitHandle[workItemResults.Length];

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -