📄 taskcollection.cpp
字号:
return (_M_pOriginalCollection->_M_exitCode != 0);
}
/// <summary>
/// Called when a new chore is placed upon the task collection. Guarantees forward synchronization with the completion of them.
/// </summary>
void _TaskCollection::_NotifyNewChore()
{
long val = InterlockedIncrement(&_M_unpoppedChores);
ASSERT(val > 0);
//
// Because the task collection can be passed between threads and waited upon, either this thread or a stealer might need to wake
// another thread on final completion (we might never wait). Thus, we need to fence these operations. We also need
// to make sure 0->1 and 1->0 transitions correctly perform the appropriate signaling.
//
if (val == 1)
{
//
// It's entirely possible that we're racing with a _NotifyCompletedChore which just did a 1->0 and we just did a 0->1. We need to make
// sure that the event is signaled before we clear it. Otherwise, it's possible that the event winds up out of sync with
// the counter.
//
// In the vast majority of cases, the pEvent->Wait() call has no fences and merely checks the state seeing it signaled and returns.
// The only time there's even a fence is during the race.
//
_M_event.wait();
//
// This is the barrier at which point other threads think there's something to wait upon. Note that it's not upon the WSQ yet
// (meaning no one can steal and transition us from 1->0 as of yet).
//
_M_event.reset();
}
}
/// <summary>
/// Called when a chore is completed.
/// </summary>
void _TaskCollection::_NotifyCompletedChore()
{
long val = InterlockedDecrement(&_M_unpoppedChores);
ASSERT(val >= 0);
//
// Because the task collection can be passed between threads and waited upon, any transition from 1->0 needs to wake an arbitrary set
// of threads, hence -- this needs a fence.
//
if (val == 0)
{
//
// No games need be played here. Anyone who pushes a chore will see the event clear and wait before pushing it upon the WSQ. This
// means there can be no race with messing up the event state. Setting the event suffices.
//
_M_event.set();
}
}
/// <summary>
/// Perform a wait on every alias. Note that we make no attempt to inline any of the executions of things pushed on other threads. We merely
/// wait for them. They'll be stolen and executed eventually.
/// </summary>
/// <param name="pSnapPoint">
/// The snapshot point which indicates which aliases are involved in the wait
/// </param>
void _TaskCollection::_FullAliasWait(_TaskCollection *pSnapPoint)
{
_TaskCollection *pAlias = pSnapPoint;
int count = 0;
while (pAlias != NULL)
{
count++;
pAlias = pAlias->_M_pNextAlias;
}
if (count > 0)
{
event **pEvents = reinterpret_cast <event **> (_malloca(sizeof (event *) * (count + 1)));
if (pEvents == NULL)
throw std::bad_alloc();
_MallocaHolder mholder(pEvents);
pEvents[0] = &(_M_pOriginalCollection->_M_event);
int i = 1;
pAlias = pSnapPoint;
while (i < count + 1)
{
pEvents[i] = &(pAlias->_M_event);
i++;
pAlias = pAlias->_M_pNextAlias;
}
event::wait_for_multiple(pEvents, (count + 1), true);
}
else
{
_M_event.wait();
}
}
/// <summary>
/// Schedules a new unstructured chore upon an unstructured task collection
/// </summary>
/// <param name="pChore">
/// The new unrealized chore to schedule
/// </param>
void _TaskCollection::_Schedule(_UnrealizedChore *pChore)
{
if (pChore->_M_pTaskCollection != NULL)
throw invalid_multiple_scheduling();
try
{
_TaskCollection *pAlias = _Alias();
pChore->_M_pTaskCollection = pAlias;
pChore->_M_pChoreFunction = &_UnrealizedChore::_UnstructuredChoreWrapper;
ASSERT(pAlias->_M_stackPos >= 0); // Satisfy static analyzers that might assume _M_unpoppedChores could be negative (it's signed).
int locationBase = pAlias->_M_stackPos++;
if (locationBase >= SIZEOF_ARRAY(pAlias->_M_taskCookies))
{
//
// We've spilled outside the allowable internal allocation of tasks (this is largely an optimization to avoid
// heap allocations on typically sized task collections).
//
TaskStack *pStack = reinterpret_cast<TaskStack *> (pAlias->_M_pTaskExtension);
if (pStack == NULL)
{
pStack = new TaskStack();
pAlias->_M_pTaskExtension = pStack;
}
pAlias->_NotifyNewChore();
//
// ctor has already guaranteed context exists
//
if (!pStack->Push(SchedulerBase::FastCurrentContext()->PushUnstructured(pChore)))
{
//
// It's not on the inlining list -- it must be stolen! This is due to the cap being reached (see comments in
// TaskStack).
//
pAlias->_M_stackPos--;
}
}
else
{
pAlias->_NotifyNewChore();
//
// ctor has already guarenteed context exists
//
ASSERT(locationBase < SIZEOF_ARRAY(pAlias->_M_taskCookies));
pAlias->_M_taskCookies[locationBase] = SchedulerBase::FastCurrentContext()->PushUnstructured(pChore);
}
}
catch (...)
{
//
// We are responsible for the freeing of the chore. If any exception was thrown out, we didn't schedule it and hence
// won't free it later. It must be done now.
//
_UnrealizedChore::_InternalFree(pChore);
throw;
}
}
/// <summary>
/// Resets the task collection for future usage.
/// </summary>
/// <param name="pSnapPoint">
/// The snapshot from which to reset
/// </param>
void _TaskCollection::_Reset(_TaskCollection *pSnapPoint)
{
//
// If someone is in the middle of canceling, we must let them proceed until they've reached the point where the cancellation
// of the context happens. Spin wait. Note that if we do not do this, it's entirely possible that we check cancellation
// of the context below BEFORE they cancel it, they cancel it, and some arbitrary task collection gets canceled instead of the one
// intended on the inline side.
//
_SpinWaitBackoffNone spinWait;
while (_M_executionStatus == TASKCOLLECTION_EXECUTION_STATUS_INLINE_CANCEL_IN_PROGRESS)
{
spinWait._SpinOnce();
}
//
// Clear the cancellation flag. Note that if a cancellation was done for the context, we must clear the collection cancel flag. This is only
// safe to do for the owning context. If the execution status indicates any kind of inlining, the owning context is in the midst of a
// _Abort, _Reset, or Wait and will take care of itself. Only on non-inline status do we need to do something cross thread. The only state
// to which that applies is TASKCOLLECTION_EXECUTION_STATUS_CANCEL_DEFERRED.
//
ContextBase *pCurrentContext = SchedulerBase::FastCurrentContext();
ContextBase *pOwningContext = reinterpret_cast <ContextBase *> (_M_pOwningContext);
if (pCurrentContext == pOwningContext)
{
if (InterlockedExchange(&_M_executionStatus, TASKCOLLECTION_EXECUTION_STATUS_CLEAR) == TASKCOLLECTION_EXECUTION_STATUS_CANCEL_COMPLETE)
pCurrentContext->CollectionCancelComplete(_M_inliningDepth);
_M_inliningDepth = -1;
}
else
InterlockedCompareExchange(&_M_executionStatus, TASKCOLLECTION_EXECUTION_STATUS_CLEAR, TASKCOLLECTION_EXECUTION_STATUS_CANCEL_DEFERRED);
//
// If there are direct aliases, we must clear those up too.
//
if (!_IsAlias())
{
if (pSnapPoint)
{
_TaskCollection *pAlias = pSnapPoint;
while (pAlias)
{
if (!pAlias->_IsStaleAlias())
{
pAlias->_Reset(NULL);
}
pAlias = pAlias->_M_pNextAlias;
}
}
//
// Any caught exception on the collection should be rethrown on this thread. The exception can be one of several things:
//
// task_cancelled (or another internal runtime exception):
//
// - We want to let this exception continue propagating unless there's a *more important* one (like an arbitrary exception) that occurred
// elsewhere. There is an unfortunate situation here:
//
// o We might be within a destructor. Here, by the C++ standard, we cannot throw a different exception or the
// process will terminate. This is unfortunate because it might be better to throw one of the exceptions
// which did happen. You might run into code like this where you have
//
// try
// {
// *_TaskCollection tp;
// tp.Schedule(t1); // throws e1
// tp.Schedule(t2); // throws e2
//
// // arbitrary code with an interruption point that causes task_cancelled to be thrown.
//
// tp.Wait();
// }
// catch (...) { }
//
// an arbitrary exception:
//
// - We are allowed to choose an arbitrary exception to flow back.
//
long exitCode = InterlockedExchange(&_M_exitCode, 0);
if ((exitCode & EXIT_STATUS_FLAG_EXCEPTION) != 0)
{
_SpinWaitBackoffNone spinWait;
while ((size_t) _M_pException == _S_nonNull) // make sure the exception is ready
spinWait._SpinOnce();
_RethrowException();
}
}
else
{
//
// A reset of the alias must reset the overall collection.
//
if (_IsDirectAlias() && pSnapPoint != NULL)
_M_pOriginalCollection->_Reset(pSnapPoint);
}
}
/// <summary>
/// Runs a specified chore (pChore) and subsequently waits on all chores associated with the task collection
/// to execute.
/// </summary>
/// <param name="pChore">
/// The chore to run locally.
/// </param>
/// <returns>
/// An indication of the status of the wait.
/// </returns>
_TaskCollectionStatus __stdcall _TaskCollection::_RunAndWait(_UnrealizedChore *pChore)
{
ASSERT(!_IsDirectAlias());
_TaskCollection *pAlias = _Alias();
ContextBase *pCurrentContext = SchedulerBase::FastCurrentContext();
//
// Snapshot the list of aliases so we have internal consistency between what we wait upon, what we reset, etc...
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -