⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 taskcollection.cpp

📁 C语言库函数的原型,有用的拿去
💻 CPP
📖 第 1 页 / 共 5 页
字号:
        //
        _TaskCollection *pSnapPoint = _M_pNextAlias; 
        bool fOverflow = false;

        //
        // The parent context needs to be snapped here.  It's possible that the executing collection on the
        // context at the time that Wait is invoked, is different from the executing collection on the current
        // context when the task pool was created.
        //
        pAlias->_M_pParent = pCurrentContext->GetExecutingCollection();
        pAlias->_M_inliningDepth = pAlias->_M_pParent ? pAlias->_M_pParent->_InliningDepth() + 1 : 0;

        //
        // Set up the EH frame.  We need to stop cancellation propagation when we hit someone who
        // has become canceled.
        //
        pCurrentContext->SetExecutingCollection(pAlias);

        try
        {
            //
            // This *MUST* be fenced due to allowing cancellation from arbitrary threads.  The cancellation routine may have switched
            // to deferred cancellation based on us not being inline.  We cannot arbitrarily overwrite that result.
            //
            LONG xchgStatus = InterlockedCompareExchange(&pAlias->_M_executionStatus, TASKCOLLECTION_EXECUTION_STATUS_INLINE, TASKCOLLECTION_EXECUTION_STATUS_CLEAR);
            if (xchgStatus == TASKCOLLECTION_EXECUTION_STATUS_CANCEL_DEFERRED)
            {
                //
                // The catch block will expect this.
                //
                if (pChore != NULL)
                    pAlias->_NotifyNewChore();
                throw task_canceled();
            }

            if (pChore != NULL)
            {
                pAlias->_NotifyNewChore();

                if (pCurrentContext->HasAnyCancellation() || _M_pOriginalCollection->_M_exitCode != 0 || pAlias->_M_executionStatus != TASKCOLLECTION_EXECUTION_STATUS_INLINE)
                {
                    _Interrupt(_M_pOriginalCollection->_M_exitCode != 0 || pAlias->_M_executionStatus != TASKCOLLECTION_EXECUTION_STATUS_INLINE);
                }

                pChore->m_pFunction(pChore);

                pChore->_M_pTaskCollection = NULL;
                _UnrealizedChore::_InternalFree(pChore);

                pAlias->_NotifyCompletedChore();
                pChore = NULL;
            }

            for(;;)
            {
                TaskStack *pStack;

                //
                // Pop and run can execute inline for internal contexts.
                //
                while (pAlias->_M_stackPos > 0)
                {
                    //
                    // The _M_exitCode != 0 is a necessary semantic (pass a canceled task collection to a new thread -- this is the only check that 
                    // will prevent stuff from going onto it prior to a reset).  It's also necessary to check this on the original collection  because 
                    // we could have a scenario where a chore is stolen from a direct alias which then pushes chores back to the original collection.  This will
                    // result in an indirect alias being used and the stealing won't see the alias inlined.  Hence -- waiting on the indirect alias cannot be canceled.
                    //
                    if (pCurrentContext->HasAnyCancellation() || _M_pOriginalCollection->_M_exitCode != 0 || pAlias->_M_executionStatus != TASKCOLLECTION_EXECUTION_STATUS_INLINE)
                    {
                        _Interrupt(_M_pOriginalCollection->_M_exitCode != 0 || pAlias->_M_executionStatus != TASKCOLLECTION_EXECUTION_STATUS_INLINE);
                    }

                    int taskCookie;

                    if (pAlias->_M_stackPos > SIZEOF_ARRAY(pAlias->_M_taskCookies))
                    {
                        pStack = reinterpret_cast<TaskStack *>(pAlias->_M_pTaskExtension);
                        ASSERT(!pStack->IsEmpty());
                        taskCookie = pStack->Pop();
                    }
                    else
                        taskCookie = _M_taskCookies[pAlias->_M_stackPos - 1];

                    pAlias->_M_stackPos--;

                    pChore = pCurrentContext->TryPopUnstructured(taskCookie);
                    if (pChore == NULL)
                    {
                        //
                        // If we failed because something was stolen, everything underneath us was stolen as well and the wait on stolen chores
                        // will guarantee that we wait on everything necessary.  We can clear out the stack to prevent reuse of the task collection
                        // from just building up excess entries.
                        //
                        TaskStack *pStack = reinterpret_cast<TaskStack *> (pAlias->_M_pTaskExtension);
                        if (pStack != NULL) pStack->Clear();
                        pAlias->_M_stackPos = 0;

                        break;
                    }

                    if (pCurrentContext->IsExternal())
                        static_cast<ExternalContextBase *>(pCurrentContext)->IncrementDequeuedTaskCounter();
                    else
                        static_cast<InternalContextBase *>(pCurrentContext)->IncrementDequeuedTaskCounter();

                    pChore->m_pFunction(pChore);

                    pChore->_M_pTaskCollection = NULL;
                    _UnrealizedChore::_InternalFree(pChore);

                    pAlias->_NotifyCompletedChore();
                    pChore = NULL;
                }

                //
                // If the task stack overflowed, there are potentially still items on the work stealing queue we could not inline.  If we simply
                // block without care and one of those items cancels, we can deadlock (since we cannot steal from canceled contexts).  If the
                // stack overflowed, we need to perform special handling.
                //
                pStack = reinterpret_cast<TaskStack *>(pAlias->_M_pTaskExtension);
                if (pStack != NULL && pStack->Overflow())
                {
                    fOverflow = true;

                    //
                    // We need to tell the canceling thread to perform the WSQ sweep or do ourselves as determined by a CAS.
                    //
                    LONG xchgStatus = InterlockedCompareExchange(&pAlias->_M_executionStatus, 
                                                                 TASKCOLLECTION_EXECUTION_STATUS_INLINE_WAIT_WITH_OVERFLOW_STACK,
                                                                 TASKCOLLECTION_EXECUTION_STATUS_INLINE);

                    switch(xchgStatus)
                    {
                        case TASKCOLLECTION_EXECUTION_STATUS_INLINE_CANCEL_IN_PROGRESS:
                        case TASKCOLLECTION_EXECUTION_STATUS_CANCEL_COMPLETE:
                            throw task_canceled();
                        default:
                            break;
                    }
                }

                _FullAliasWait(pSnapPoint);

                if (fOverflow)
                {
                    //
                    // We cannot *EVER* touch the work stealing queue if another context has canceled and is sweeping it for cancellation.
                    // CAS back to INLINE.  If the CAS turns up INLINE_CANCEL_IN_PROGRESS, another thread is playing with our WSQ and we must spin
                    // until that's done.
                    //
                    // Note that this path should be rather rare and requires the use both of direct aliasing (passing between threads) **AND** pushing
                    // more than the task pool cap onto a single alias (1026 tasks) before the wait operation.
                    //
                    if (InterlockedCompareExchange(&pAlias->_M_executionStatus,
                                               TASKCOLLECTION_EXECUTION_STATUS_INLINE,
                                               TASKCOLLECTION_EXECUTION_STATUS_INLINE_WAIT_WITH_OVERFLOW_STACK) ==
                                               TASKCOLLECTION_EXECUTION_STATUS_INLINE_CANCEL_IN_PROGRESS)
                    {
                        _SpinWaitBackoffNone spinWait;
                        while(_M_executionStatus == TASKCOLLECTION_EXECUTION_STATUS_INLINE_CANCEL_IN_PROGRESS)
                        {
                            spinWait._SpinOnce();
                        }
                    }
                }

                //
                // It is entirely possible that we took a snapshot and during the execution of a chore on this task collection, the task collection
                // was passed to another thread that has not yet touched the task collection (be it an arbitrary one or an N-level descendent
                // (N > 1).  In this case, a new alias was created and we did not see it in the snapshot.  We cannot know until after
                // the _FullAliasWait call.  If the snap point has changed, we must loop around or we will miss waiting on chores that 
                // were created on other threads during execution of a chore which was known about.  This would be contrary to user expectation.
                //
                if (pSnapPoint == _M_pNextAlias)
                    break;

                pSnapPoint = _M_pNextAlias;

            }
        }
        catch (const task_canceled &)
        {
            if (pChore != NULL)
            {
                pChore->_M_pTaskCollection = NULL;
                _UnrealizedChore::_InternalFree(pChore);
                pAlias->_NotifyCompletedChore();
            }

            //
            // This exception will be rethrown to a higher level if cancellation is still triggered on this context.  In order to conserve
            // stack space on x64 and consolidate this path with the exception path, the rethrow happens below outside this particular
            // catch.
            //
        }
        catch(...)
        {
            if (pChore != NULL)
            {
                pChore->_M_pTaskCollection = NULL;
                _UnrealizedChore::_InternalFree(pChore);
                pAlias->_NotifyCompletedChore();
            }

            pAlias->_RaisedException();
        }

        pCurrentContext->SetExecutingCollection(pAlias->_M_pParent);

        if (pCurrentContext->HasAnyCancellation() || _M_pOriginalCollection->_M_exitCode != 0 || pAlias->_M_executionStatus != TASKCOLLECTION_EXECUTION_STATUS_INLINE)
        {
            if (pCurrentContext->IsCanceled() || _M_pOriginalCollection->_M_exitCode != 0 || pAlias->_M_executionStatus != TASKCOLLECTION_EXECUTION_STATUS_CLEAR ||
                _WillInterruptForPendingCancel())
            {
                pAlias->_Abort();

                if (pCurrentContext->HasAnyCancellation())
                {
                    _Interrupt(false);
                }

                return _Canceled;
            }
        }

        pAlias->_Reset(pSnapPoint);

        return _Completed;
    }

    /// <summary>
    ///     Performs an abortive sweep of the WSQ for inline stack overflow.
    /// </summary>
    /// <param name="_PCtx">
    ///     The context to sweep
    /// </param>
    void _TaskCollection::_AbortiveSweep(void *_PCtx)
    {
        ContextBase *pContext = reinterpret_cast<ContextBase *>(_PCtx);

        SweeperContext ctx(this);
        pContext->SweepUnstructured(&reinterpret_cast<WorkStealingQueue<_UnrealizedChore>::SweepPredicate> (_TaskCollection::_CollectionMatchPredicate),
                                    &ctx,
                                    &_TaskCollection::_SweepAbortedChore);

        //
        // Update the statistical information with the fact that a task has been dequeued
        //
        if (ctx.m_sweptChores > 0)
        {
            ContextBase *pCurrentContext = SchedulerBase::FastCurrentContext();

            if (pCurrentContext->IsExternal())
                static_cast<ExternalContextBase *>(pCurrentContext)->IncrementDequeuedTaskCounter(ctx.m_sweptChores);
            else
                static_cast<InternalContextBase *>(pCurrentContext)->IncrementDequeuedTaskCounter(ctx.m_sweptChores);
        }
    }

    /// <summary>
    ///     A predicate function checking whether a given chore belongs to a given collection.
    /// </summary>
    /// <param name="_PChore">
    ///     The chore to check
    /// </param>
    /// <param name="_PData">
    ///     The data to check against
    /// </param>
    /// <returns>
    ///     Whether or not the chore belongs to the collection
    /// </returns>
    bool _TaskCollection::_CollectionMatchPredicate(_UnrealizedChore *_PChore, void *_PData)
    {
        SweeperContext *pCtx = reinterpret_cast<SweeperContext *>(_PData);
        return (_PChore->_M_pTaskCollection == pCtx->m_pTaskCollection);
    }

    /// <summary>
    ///     Called to sweep an aborted chore in the case of inline stack overflow.
    /// </summary>
    /// <param name="_PChore">
    ///     The chore to sweep
    /// </param>
    /// <param name="_PData">
    ///     The data which was passed into the sweeper predicate
    /// </param>
    /// <returns>
    ///     An indication of whether the chore is now gone
    /// </returns>
    bool _TaskCollection::_SweepAbortedChore(_UnrealizedChore *_PChore, void *_PData)
    {
        SweeperContext *pCtx = reinterpret_ca

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -