📄 taskcollection.cpp
字号:
//
_TaskCollection *pSnapPoint = _M_pNextAlias;
bool fOverflow = false;
//
// The parent context needs to be snapped here. It's possible that the executing collection on the
// context at the time that Wait is invoked, is different from the executing collection on the current
// context when the task pool was created.
//
pAlias->_M_pParent = pCurrentContext->GetExecutingCollection();
pAlias->_M_inliningDepth = pAlias->_M_pParent ? pAlias->_M_pParent->_InliningDepth() + 1 : 0;
//
// Set up the EH frame. We need to stop cancellation propagation when we hit someone who
// has become canceled.
//
pCurrentContext->SetExecutingCollection(pAlias);
try
{
//
// This *MUST* be fenced due to allowing cancellation from arbitrary threads. The cancellation routine may have switched
// to deferred cancellation based on us not being inline. We cannot arbitrarily overwrite that result.
//
LONG xchgStatus = InterlockedCompareExchange(&pAlias->_M_executionStatus, TASKCOLLECTION_EXECUTION_STATUS_INLINE, TASKCOLLECTION_EXECUTION_STATUS_CLEAR);
if (xchgStatus == TASKCOLLECTION_EXECUTION_STATUS_CANCEL_DEFERRED)
{
//
// The catch block will expect this.
//
if (pChore != NULL)
pAlias->_NotifyNewChore();
throw task_canceled();
}
if (pChore != NULL)
{
pAlias->_NotifyNewChore();
if (pCurrentContext->HasAnyCancellation() || _M_pOriginalCollection->_M_exitCode != 0 || pAlias->_M_executionStatus != TASKCOLLECTION_EXECUTION_STATUS_INLINE)
{
_Interrupt(_M_pOriginalCollection->_M_exitCode != 0 || pAlias->_M_executionStatus != TASKCOLLECTION_EXECUTION_STATUS_INLINE);
}
pChore->m_pFunction(pChore);
pChore->_M_pTaskCollection = NULL;
_UnrealizedChore::_InternalFree(pChore);
pAlias->_NotifyCompletedChore();
pChore = NULL;
}
for(;;)
{
TaskStack *pStack;
//
// Pop and run can execute inline for internal contexts.
//
while (pAlias->_M_stackPos > 0)
{
//
// The _M_exitCode != 0 is a necessary semantic (pass a canceled task collection to a new thread -- this is the only check that
// will prevent stuff from going onto it prior to a reset). It's also necessary to check this on the original collection because
// we could have a scenario where a chore is stolen from a direct alias which then pushes chores back to the original collection. This will
// result in an indirect alias being used and the stealing won't see the alias inlined. Hence -- waiting on the indirect alias cannot be canceled.
//
if (pCurrentContext->HasAnyCancellation() || _M_pOriginalCollection->_M_exitCode != 0 || pAlias->_M_executionStatus != TASKCOLLECTION_EXECUTION_STATUS_INLINE)
{
_Interrupt(_M_pOriginalCollection->_M_exitCode != 0 || pAlias->_M_executionStatus != TASKCOLLECTION_EXECUTION_STATUS_INLINE);
}
int taskCookie;
if (pAlias->_M_stackPos > SIZEOF_ARRAY(pAlias->_M_taskCookies))
{
pStack = reinterpret_cast<TaskStack *>(pAlias->_M_pTaskExtension);
ASSERT(!pStack->IsEmpty());
taskCookie = pStack->Pop();
}
else
taskCookie = _M_taskCookies[pAlias->_M_stackPos - 1];
pAlias->_M_stackPos--;
pChore = pCurrentContext->TryPopUnstructured(taskCookie);
if (pChore == NULL)
{
//
// If we failed because something was stolen, everything underneath us was stolen as well and the wait on stolen chores
// will guarantee that we wait on everything necessary. We can clear out the stack to prevent reuse of the task collection
// from just building up excess entries.
//
TaskStack *pStack = reinterpret_cast<TaskStack *> (pAlias->_M_pTaskExtension);
if (pStack != NULL) pStack->Clear();
pAlias->_M_stackPos = 0;
break;
}
if (pCurrentContext->IsExternal())
static_cast<ExternalContextBase *>(pCurrentContext)->IncrementDequeuedTaskCounter();
else
static_cast<InternalContextBase *>(pCurrentContext)->IncrementDequeuedTaskCounter();
pChore->m_pFunction(pChore);
pChore->_M_pTaskCollection = NULL;
_UnrealizedChore::_InternalFree(pChore);
pAlias->_NotifyCompletedChore();
pChore = NULL;
}
//
// If the task stack overflowed, there are potentially still items on the work stealing queue we could not inline. If we simply
// block without care and one of those items cancels, we can deadlock (since we cannot steal from canceled contexts). If the
// stack overflowed, we need to perform special handling.
//
pStack = reinterpret_cast<TaskStack *>(pAlias->_M_pTaskExtension);
if (pStack != NULL && pStack->Overflow())
{
fOverflow = true;
//
// We need to tell the canceling thread to perform the WSQ sweep or do ourselves as determined by a CAS.
//
LONG xchgStatus = InterlockedCompareExchange(&pAlias->_M_executionStatus,
TASKCOLLECTION_EXECUTION_STATUS_INLINE_WAIT_WITH_OVERFLOW_STACK,
TASKCOLLECTION_EXECUTION_STATUS_INLINE);
switch(xchgStatus)
{
case TASKCOLLECTION_EXECUTION_STATUS_INLINE_CANCEL_IN_PROGRESS:
case TASKCOLLECTION_EXECUTION_STATUS_CANCEL_COMPLETE:
throw task_canceled();
default:
break;
}
}
_FullAliasWait(pSnapPoint);
if (fOverflow)
{
//
// We cannot *EVER* touch the work stealing queue if another context has canceled and is sweeping it for cancellation.
// CAS back to INLINE. If the CAS turns up INLINE_CANCEL_IN_PROGRESS, another thread is playing with our WSQ and we must spin
// until that's done.
//
// Note that this path should be rather rare and requires the use both of direct aliasing (passing between threads) **AND** pushing
// more than the task pool cap onto a single alias (1026 tasks) before the wait operation.
//
if (InterlockedCompareExchange(&pAlias->_M_executionStatus,
TASKCOLLECTION_EXECUTION_STATUS_INLINE,
TASKCOLLECTION_EXECUTION_STATUS_INLINE_WAIT_WITH_OVERFLOW_STACK) ==
TASKCOLLECTION_EXECUTION_STATUS_INLINE_CANCEL_IN_PROGRESS)
{
_SpinWaitBackoffNone spinWait;
while(_M_executionStatus == TASKCOLLECTION_EXECUTION_STATUS_INLINE_CANCEL_IN_PROGRESS)
{
spinWait._SpinOnce();
}
}
}
//
// It is entirely possible that we took a snapshot and during the execution of a chore on this task collection, the task collection
// was passed to another thread that has not yet touched the task collection (be it an arbitrary one or an N-level descendent
// (N > 1). In this case, a new alias was created and we did not see it in the snapshot. We cannot know until after
// the _FullAliasWait call. If the snap point has changed, we must loop around or we will miss waiting on chores that
// were created on other threads during execution of a chore which was known about. This would be contrary to user expectation.
//
if (pSnapPoint == _M_pNextAlias)
break;
pSnapPoint = _M_pNextAlias;
}
}
catch (const task_canceled &)
{
if (pChore != NULL)
{
pChore->_M_pTaskCollection = NULL;
_UnrealizedChore::_InternalFree(pChore);
pAlias->_NotifyCompletedChore();
}
//
// This exception will be rethrown to a higher level if cancellation is still triggered on this context. In order to conserve
// stack space on x64 and consolidate this path with the exception path, the rethrow happens below outside this particular
// catch.
//
}
catch(...)
{
if (pChore != NULL)
{
pChore->_M_pTaskCollection = NULL;
_UnrealizedChore::_InternalFree(pChore);
pAlias->_NotifyCompletedChore();
}
pAlias->_RaisedException();
}
pCurrentContext->SetExecutingCollection(pAlias->_M_pParent);
if (pCurrentContext->HasAnyCancellation() || _M_pOriginalCollection->_M_exitCode != 0 || pAlias->_M_executionStatus != TASKCOLLECTION_EXECUTION_STATUS_INLINE)
{
if (pCurrentContext->IsCanceled() || _M_pOriginalCollection->_M_exitCode != 0 || pAlias->_M_executionStatus != TASKCOLLECTION_EXECUTION_STATUS_CLEAR ||
_WillInterruptForPendingCancel())
{
pAlias->_Abort();
if (pCurrentContext->HasAnyCancellation())
{
_Interrupt(false);
}
return _Canceled;
}
}
pAlias->_Reset(pSnapPoint);
return _Completed;
}
/// <summary>
/// Performs an abortive sweep of the WSQ for inline stack overflow.
/// </summary>
/// <param name="_PCtx">
/// The context to sweep
/// </param>
void _TaskCollection::_AbortiveSweep(void *_PCtx)
{
ContextBase *pContext = reinterpret_cast<ContextBase *>(_PCtx);
SweeperContext ctx(this);
pContext->SweepUnstructured(&reinterpret_cast<WorkStealingQueue<_UnrealizedChore>::SweepPredicate> (_TaskCollection::_CollectionMatchPredicate),
&ctx,
&_TaskCollection::_SweepAbortedChore);
//
// Update the statistical information with the fact that a task has been dequeued
//
if (ctx.m_sweptChores > 0)
{
ContextBase *pCurrentContext = SchedulerBase::FastCurrentContext();
if (pCurrentContext->IsExternal())
static_cast<ExternalContextBase *>(pCurrentContext)->IncrementDequeuedTaskCounter(ctx.m_sweptChores);
else
static_cast<InternalContextBase *>(pCurrentContext)->IncrementDequeuedTaskCounter(ctx.m_sweptChores);
}
}
/// <summary>
/// A predicate function checking whether a given chore belongs to a given collection.
/// </summary>
/// <param name="_PChore">
/// The chore to check
/// </param>
/// <param name="_PData">
/// The data to check against
/// </param>
/// <returns>
/// Whether or not the chore belongs to the collection
/// </returns>
bool _TaskCollection::_CollectionMatchPredicate(_UnrealizedChore *_PChore, void *_PData)
{
SweeperContext *pCtx = reinterpret_cast<SweeperContext *>(_PData);
return (_PChore->_M_pTaskCollection == pCtx->m_pTaskCollection);
}
/// <summary>
/// Called to sweep an aborted chore in the case of inline stack overflow.
/// </summary>
/// <param name="_PChore">
/// The chore to sweep
/// </param>
/// <param name="_PData">
/// The data which was passed into the sweeper predicate
/// </param>
/// <returns>
/// An indication of whether the chore is now gone
/// </returns>
bool _TaskCollection::_SweepAbortedChore(_UnrealizedChore *_PChore, void *_PData)
{
SweeperContext *pCtx = reinterpret_ca
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -