📄 workitem.cs
字号:
// Ami Bar
// amibar@gmail.com
using System;
using System.Threading;
using System.Diagnostics;
namespace Amib.Threading.Internal
{
#region WorkItem Delegate
/// <summary>
/// An internal delegate to call when the WorkItem starts or completes
/// </summary>
internal delegate void WorkItemStateCallback(WorkItem workItem);
#endregion
#region IInternalWorkItemResult interface
public class CanceledWorkItemsGroup
{
public readonly static CanceledWorkItemsGroup NotCanceledWorkItemsGroup = new CanceledWorkItemsGroup();
private bool _isCanceled = false;
public bool IsCanceled
{
get { return _isCanceled; }
set { _isCanceled = value; }
}
}
internal interface IInternalWorkItemResult
{
event WorkItemStateCallback OnWorkItemStarted;
event WorkItemStateCallback OnWorkItemCompleted;
}
#endregion
#region IWorkItem interface
public interface IWorkItem
{
}
#endregion
#region WorkItem class
/// <summary>
/// Holds a callback delegate and the state for that delegate.
/// </summary>
public class WorkItem : IHasWorkItemPriority, IWorkItem
{
#region WorkItemState enum
/// <summary>
/// Indicates the state of the work item in the thread pool
/// </summary>
private enum WorkItemState
{
InQueue,
InProgress,
Completed,
Canceled,
}
#endregion
#region Member Variables
/// <summary>
/// Callback delegate for the callback.
/// </summary>
private WorkItemCallback _callback;
/// <summary>
/// State with which to call the callback delegate.
/// </summary>
private object _state;
/// <summary>
/// Stores the caller's context
/// </summary>
private CallerThreadContext _callerContext;
/// <summary>
/// Holds the result of the mehtod
/// </summary>
private object _result;
/// <summary>
/// Hold the exception if the method threw it
/// </summary>
private Exception _exception;
/// <summary>
/// Hold the state of the work item
/// </summary>
private WorkItemState _workItemState;
/// <summary>
/// A ManualResetEvent to indicate that the result is ready
/// </summary>
private ManualResetEvent _workItemCompleted;
/// <summary>
/// A reference count to the _workItemCompleted.
/// When it reaches to zero _workItemCompleted is Closed
/// </summary>
private int _workItemCompletedRefCount;
/// <summary>
/// Represents the result state of the work item
/// </summary>
private WorkItemResult _workItemResult;
/// <summary>
/// Work item info
/// </summary>
private WorkItemInfo _workItemInfo;
/// <summary>
/// Called when the WorkItem starts
/// </summary>
private event WorkItemStateCallback _workItemStartedEvent;
/// <summary>
/// Called when the WorkItem completes
/// </summary>
private event WorkItemStateCallback _workItemCompletedEvent;
/// <summary>
/// A reference to an object that indicates whatever the
/// WorkItemsGroup has been canceled
/// </summary>
private CanceledWorkItemsGroup _canceledWorkItemsGroup = CanceledWorkItemsGroup.NotCanceledWorkItemsGroup;
/// <summary>
/// The work item group this work item belong to.
///
/// </summary>
private IWorkItemsGroup _workItemsGroup;
#region Performance Counter fields
/// <summary>
/// The time when the work items is queued.
/// Used with the performance counter.
/// </summary>
private DateTime _queuedTime;
/// <summary>
/// The time when the work items starts its execution.
/// Used with the performance counter.
/// </summary>
private DateTime _beginProcessTime;
/// <summary>
/// The time when the work items ends its execution.
/// Used with the performance counter.
/// </summary>
private DateTime _endProcessTime;
#endregion
#endregion
#region Properties
public TimeSpan WaitingTime
{
get
{
return (_beginProcessTime - _queuedTime);
}
}
public TimeSpan ProcessTime
{
get
{
return (_endProcessTime - _beginProcessTime);
}
}
#endregion
#region Construction
/// <summary>
/// Initialize the callback holding object.
/// </summary>
/// <param name="callback">Callback delegate for the callback.</param>
/// <param name="state">State with which to call the callback delegate.</param>
///
/// We assume that the WorkItem object is created within the thread
/// that meant to run the callback
public WorkItem(
IWorkItemsGroup workItemsGroup,
WorkItemInfo workItemInfo,
WorkItemCallback callback,
object state)
{
_workItemsGroup = workItemsGroup;
_workItemInfo = workItemInfo;
if (_workItemInfo.UseCallerCallContext || _workItemInfo.UseCallerHttpContext)
{
_callerContext = CallerThreadContext.Capture(_workItemInfo.UseCallerCallContext, _workItemInfo.UseCallerHttpContext);
}
_callback = callback;
_state = state;
_workItemResult = new WorkItemResult(this);
Initialize();
}
internal void Initialize()
{
_workItemState = WorkItemState.InQueue;
_workItemCompleted = null;
_workItemCompletedRefCount = 0;
}
internal bool WasQueuedBy(IWorkItemsGroup workItemsGroup)
{
return (workItemsGroup == _workItemsGroup);
}
#endregion
#region Methods
public CanceledWorkItemsGroup CanceledWorkItemsGroup
{
get
{
return _canceledWorkItemsGroup;
}
set
{
_canceledWorkItemsGroup = value;
}
}
/// <summary>
/// Change the state of the work item to in progress if it wasn't canceled.
/// </summary>
/// <returns>
/// Return true on success or false in case the work item was canceled.
/// If the work item needs to run a post execute then the method will return true.
/// </returns>
public bool StartingWorkItem()
{
_beginProcessTime = DateTime.Now;
lock(this)
{
if (IsCanceled)
{
bool result = false;
if ((_workItemInfo.PostExecuteWorkItemCallback != null) &&
((_workItemInfo.CallToPostExecute & CallToPostExecute.WhenWorkItemCanceled) == CallToPostExecute.WhenWorkItemCanceled))
{
result = true;
}
return result;
}
Debug.Assert(WorkItemState.InQueue == GetWorkItemState());
SetWorkItemState(WorkItemState.InProgress);
}
return true;
}
/// <summary>
/// Execute the work item and the post execute
/// </summary>
public void Execute()
{
CallToPostExecute currentCallToPostExecute = 0;
// Execute the work item if we are in the correct state
switch(GetWorkItemState())
{
case WorkItemState.InProgress:
currentCallToPostExecute |= CallToPostExecute.WhenWorkItemNotCanceled;
ExecuteWorkItem();
break;
case WorkItemState.Canceled:
currentCallToPostExecute |= CallToPostExecute.WhenWorkItemCanceled;
break;
default:
Debug.Assert(false);
throw new NotSupportedException();
}
// Run the post execute as needed
if ((currentCallToPostExecute & _workItemInfo.CallToPostExecute) != 0)
{
PostExecute();
}
_endProcessTime = DateTime.Now;
}
internal void FireWorkItemCompleted()
{
try
{
if (null != _workItemCompletedEvent)
{
_workItemCompletedEvent(this);
}
}
catch // Ignore exceptions
{}
}
/// <summary>
/// Execute the work item
/// </summary>
private void ExecuteWorkItem()
{
CallerThreadContext ctc = null;
if (null != _callerContext)
{
ctc = CallerThreadContext.Capture(_callerContext.CapturedCallContext, _callerContext.CapturedHttpContext);
CallerThreadContext.Apply(_callerContext);
}
Exception exception = null;
object result = null;
try
{
result = _callback(_state);
}
catch (Exception e)
{
// Save the exception so we can rethrow it later
exception = e;
}
if (null != _callerContext)
{
CallerThreadContext.Apply(ctc);
}
SetResult(result, exception);
}
/// <summary>
/// Runs the post execute callback
/// </summary>
private void PostExecute()
{
if (null != _workItemInfo.PostExecuteWorkItemCallback)
{
try
{
_workItemInfo.PostExecuteWorkItemCallback(this._workItemResult);
}
catch (Exception e)
{
Debug.Assert(null != e);
}
}
}
/// <summary>
/// Set the result of the work item to return
/// </summary>
/// <param name="result">The result of the work item</param>
internal void SetResult(object result, Exception exception)
{
_result = result;
_exception = exception;
SignalComplete(false);
}
/// <summary>
/// Returns the work item result
/// </summary>
/// <returns>The work item result</returns>
internal IWorkItemResult GetWorkItemResult()
{
return _workItemResult;
}
/// <summary>
/// Wait for all work items to complete
/// </summary>
/// <param name="workItemResults">Array of work item result objects</param>
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
/// <param name="exitContext">
/// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
/// </param>
/// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
/// <returns>
/// true when every work item in workItemResults has completed; otherwise false.
/// </returns>
internal static bool WaitAll(
IWorkItemResult [] workItemResults,
int millisecondsTimeout,
bool exitContext,
WaitHandle cancelWaitHandle)
{
if (0 == workItemResults.Length)
{
return true;
}
bool success;
WaitHandle [] waitHandles = new WaitHandle[workItemResults.Length];;
GetWaitHandles(workItemResults, waitHandles);
if ((null == cancelWaitHandle) && (waitHandles.Length <= 64))
{
success = WaitHandle.WaitAll(waitHandles, millisecondsTimeout, exitContext);
}
else
{
success = true;
int millisecondsLeft = millisecondsTimeout;
DateTime start = DateTime.Now;
WaitHandle [] whs;
if (null != cancelWaitHandle)
{
whs = new WaitHandle [] { null, cancelWaitHandle };
}
else
{
whs = new WaitHandle [] { null };
}
bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout);
// Iterate over the wait handles and wait for each one to complete.
// We cannot use WaitHandle.WaitAll directly, because the cancelWaitHandle
// won't affect it.
// Each iteration we update the time left for the timeout.
for(int i = 0; i < workItemResults.Length; ++i)
{
// WaitAny don't work with negative numbers
if (!waitInfinitely && (millisecondsLeft < 0))
{
success = false;
break;
}
whs[0] = waitHandles[i];
int result = WaitHandle.WaitAny(whs, millisecondsLeft, exitContext);
if((result > 0) || (WaitHandle.WaitTimeout == result))
{
success = false;
break;
}
if(!waitInfinitely)
{
// Update the time left to wait
TimeSpan ts = DateTime.Now - start;
millisecondsLeft = millisecondsTimeout - (int)ts.TotalMilliseconds;
}
}
}
// Release the wait handles
ReleaseWaitHandles(workItemResults);
return success;
}
/// <summary>
/// Waits for any of the work items in the specified array to complete, cancel, or timeout
/// </summary>
/// <param name="workItemResults">Array of work item result objects</param>
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
/// <param name="exitContext">
/// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
/// </param>
/// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
/// <returns>
/// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
/// </returns>
internal static int WaitAny(
IWorkItemResult [] workItemResults,
int millisecondsTimeout,
bool exitContext,
WaitHandle cancelWaitHandle)
{
WaitHandle [] waitHandles = null;
if (null != cancelWaitHandle)
{
waitHandles = new WaitHandle[workItemResults.Length+1];
GetWaitHandles(workItemResults, waitHandles);
waitHandles[workItemResults.Length] = cancelWaitHandle;
}
else
{
waitHandles = new WaitHandle[workItemResults.Length];
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -