⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 schedulerproxy.cpp

📁 C语言库函数的原型,有用的拿去
💻 CPP
📖 第 1 页 / 共 4 页
字号:
                m_pScheduler->NotifyResourcesExternallyBusy(pRoots, numThreadsIndex);
            }
            else
            {
                m_pScheduler->NotifyResourcesExternallyIdle(pRoots, numThreadsIndex);
            }

        } // end locked region

        if (pRoots!= pRootArray)
        {
            delete [] pRoots;
        }
    }
    /// <summary>
    ///     Removes a root from the scheduler proxy and destroys it. This API is called in response to a scheduler
    ///     informing the RM that it is done with a virtual processor root.
    /// </summary>
    void SchedulerProxy::DestroyVirtualProcessorRoot(VirtualProcessorRoot * pRoot)
    {
        // Synchronize with other concurrent calls that are adding/removing virtual processor roots.
        { // begin locked region
            _ReentrantBlockingLock::_Scoped_lock lock(m_lock);
            SchedulerNode * pNode = &m_pAllocatedNodes[pRoot->GetNodeId()];
            ASSERT(pNode->m_id == pRoot->GetNodeId());

            // NOTE: This API is called in response to a scheduler being done with a virtual processor root.
            // The scheduler is expected not to invoke ISchedulerProxy::Shutdown, which destroys
            // all remaining roots in the proxy, until all individual calls for removing virtual processor
            // roots have completed.

            pNode->m_pCores[pRoot->GetCoreIndex()].m_resources.Remove(pRoot->GetExecutionResource());

            if (!pRoot->IsOversubscribed())
            {
                // Oversubscibed vprocs do not contribute towards concurrency level
                ASSERT(m_currentConcurrency > 0);
                --m_currentConcurrency;
            }

        } // end locked region

        pRoot->DeleteThis();
    }

    /// <summary>
    ///     Removes an execution resource from the scheduler proxy, and destroys it. This API is called in response to a scheduler
    ///     informing the RM that it is done with an execution resource.
    /// </summary>
    void SchedulerProxy::DestroyExecutionResource(ExecutionResource * pExecutionResource)
    {
        // NOTE: This function should be called with the RM lock held.
        SchedulerNode * pNode = &m_pAllocatedNodes[pExecutionResource->GetNodeId()];
        SchedulerCore * pCore = &pNode->m_pCores[pExecutionResource->GetCoreIndex()];
        ASSERT(pNode->m_id == pExecutionResource->GetNodeId());

        // Mark this core as available to others if this was the last resource on it
        // If this is the last running resource on this core then mark it as available again
        if (pCore->m_numAssignedThreads + pCore->m_numExternalThreads == 0)
        {
            // If there are no vprocs or external threads, then core cannot be fixed
            ASSERT(!pCore->IsFixed());
            ASSERT(pNode->m_allocatedCores > 0 && pNode->m_allocatedCores <= pNode->m_coreCount);
            pNode->m_allocatedCores--;
            ASSERT(m_numAllocatedCores > MinHWThreads());
            pCore->m_coreState = ProcessorCore::Available;
            m_numAllocatedCores--;
            ASSERT(m_numAllocatedCores <= DesiredHWThreads());
            m_pResourceManager->DecrementCoreUseCount(pExecutionResource->GetNodeId(), pExecutionResource->GetCoreIndex());
        }

        // Synchronize with other concurrent calls that are adding/removing execution resources.
        { // begin locked region
            _ReentrantBlockingLock::_Scoped_lock lock(m_lock);
            pCore->m_resources.Remove(pExecutionResource);
        } // end locked region

        delete pExecutionResource;
    }

    /// <summary>
    ///     Called to assist dynamic resourcemanagement in determining whether cores assigned to schedulers
    ///     are idle. An idle core is one whose subscription level is 0.
    /// </summary>
    void SchedulerProxy::IncrementCoreSubscription(ExecutionResource * pExecutionResource)
    {
        unsigned int nodeId = pExecutionResource->GetNodeId();
        unsigned int coreIndex = pExecutionResource->GetCoreIndex();

        if ((InterlockedIncrement(&m_pAllocatedNodes[nodeId].m_pCores[coreIndex].m_subscriptionLevel) == 1) &&
            (m_pResourceManager->GetNumSchedulersForNotifications() > (ShouldReceiveNotifications() ? 1UL : 0UL)))
        {
            // We've incremented the local subscription from 0 to 1 -> this may warrant notifications.
            // Note that the number of schedulers needing notifications may change right after we read it, but any
            // missed notifications will be sent at the next Dynamic RM Poll.

            // We simply set the dynamic RM event here. Note -> there may not yet be a dynamic RM thread at this point.
            // We clearly have 2 schedulers, but it could be that the second one is just being created. In that case,
            // notifications will be sent when the dynamic RM starts up (right after the second scheduler has finished
            // receiving all its resources). We may even race with shutdown for the penultimate scheduler. If the DRM
            // thread wakes up and there is only one scheduler left, it will go back to waiting.
            m_pResourceManager->WakeupDynamicRMWorker();
        }
    }

    /// <summary>
    ///     Called to assist dynamic resourcemanagement in determining whether cores assigned to schedulers
    ///     are idle. An idle core is one whose subscription level is 0.
    /// </summary>
    void SchedulerProxy::DecrementCoreSubscription(ExecutionResource * pExecutionResource)
    {
        unsigned int nodeId = pExecutionResource->GetNodeId();
        unsigned int coreIndex = pExecutionResource->GetCoreIndex();

        if ((InterlockedDecrement(&m_pAllocatedNodes[nodeId].m_pCores[coreIndex].m_subscriptionLevel) == 0) &&
            (m_pResourceManager->GetNumSchedulersForNotifications() > (ShouldReceiveNotifications() ? 1UL : 0UL)))
        {
            // We've decremented the local subscription from 1 to 0 -> this may warrant notifications.
            // Note that the number of schedulers needing notifications may change right after we read it, but any
            // missed notifications will be sent at the next Dynamic RM Poll.

            // We simply set the dynamic RM event here. Note -> there may not yet be a dynamic RM thread at this point.
            // We clearly have 2 schedulers, but it could be that the second one is just being created. In that case,
            // notifications will be sent when the dynamic RM starts up (right after the second scheduler has finished
            // receiving all its resources). We may even race with shutdown for the penultimate scheduler. If the DRM
            // thread wakes up and there is only one scheduler left, it will go back to waiting.
            m_pResourceManager->WakeupDynamicRMWorker();
        }
    }

    /// <summary>
    ///     Called to adjust the suggested allocation such that we do not exceed maxConcurrency.
    ///     This routine takes into account vprocs that are marked for removal but haven't yet been
    ///     retired by the scheduler. The suggested allocation would be decreased to account for such
    ///     vprocs.
    /// </summary>
    unsigned int SchedulerProxy::AdjustAllocationIncrease(unsigned int suggestedAllocation) const
    {
        ASSERT(suggestedAllocation >= GetNumAllocatedCores());
        ASSERT(suggestedAllocation <= DesiredHWThreads());

        // Figure out the max number of new cores we can add
        unsigned int newCores = 0;

        // Since we could be not holding the scheduler proxy lock the value in m_currentConcurrency could
        // be changing. This is fine since a later DRM sweep will migrate appropriate number of cores.
        if (m_maxConcurrency > m_currentConcurrency)
        {
            unsigned int remainingConcurrency = m_maxConcurrency - m_currentConcurrency;
            remainingConcurrency = m_maxConcurrency - m_currentConcurrency;

            // Convert remaining concurrency to number of cores
            unsigned int fullySubscribedConcurrency = m_numFullySubscribedCores * m_targetOversubscriptionFactor;

            if (fullySubscribedConcurrency >= remainingConcurrency)
            {
                newCores = remainingConcurrency / m_targetOversubscriptionFactor;
            }
            else
            {
                ASSERT(m_targetOversubscriptionFactor > 1);
                newCores = m_numFullySubscribedCores;
                newCores += ((remainingConcurrency - fullySubscribedConcurrency) / (m_targetOversubscriptionFactor - 1));
            }
        }

        unsigned int maxAllocation = (GetNumAllocatedCores() + newCores);

        // Cores used exclusively by external threads are included in numAllocatedCores. As a result
        // maxAllocation could go above desired.
        maxAllocation = min(maxAllocation, DesiredHWThreads());

#if defined(CONCRT_TRACING)
        if (maxAllocation < suggestedAllocation)
        {
            TRACE(CONCRT_TRACE_DYNAMIC_RM, L"Scheduler %d: Allocated: %d, Suggested: %d, Adjusted Suggested: %d",
                GetId(), GetNumAllocatedCores(), suggestedAllocation, maxAllocation);
        }
#endif

        return min(maxAllocation, suggestedAllocation);
    }

    SchedulerProxy::~SchedulerProxy()
    {
        //
        // Clean up anything which might be used during the asynchronous delete.
        //
        for (unsigned int i = 0; i < m_nodeCount; ++i)
        {
            m_pAllocatedNodes[i].Cleanup();
        }

        delete [] m_pAllocatedNodes;
        delete [] m_pSortedNodeOrder;

#if defined(CONCRT_TRACING)
        delete [] m_drmInitialState;
#endif
        //
        // Release the reference on the Resource manager
        //
        m_pResourceManager->Release();
    }

    /// <summary>
    ///     Called to shutdown a scheduler proxy.  Derived classes can override shutdown behavior based on this.
    /// </summary>
    void SchedulerProxy::FinalShutdown()
    {
        Cleanup();
        DeleteThis();
    }

    /// <summary>
    ///     Cleans up resources associated with the scheduler.
    /// </summary>
    void SchedulerProxy::Cleanup()
    {
        //
        // Delete vproc roots that exist in the allocated nodes at this time.  The deletion here is a notification.  It may happen asynchronously
        // depending on the type of scheduler proxy.  The data structures maintained for the scheduler proxy cannot go away until the deferred
        // deletion happens.
        //
        for (unsigned int i = 0; i < m_nodeCount; ++i)
        {
            SchedulerNode * pNode = &m_pAllocatedNodes[i];

            for (unsigned int j = 0; j < pNode->m_coreCount; ++j)
            {
                ExecutionResource * pExecutionResource = pNode->m_pCores[j].m_resources.First();

                while (pExecutionResource != NULL)
                {
                    ExecutionResource * pExecutionResourceToDelete = pExecutionResource;
                    pExecutionResource = pNode->m_pCores[j].m_resources.Next(pExecutionResource);
                    VirtualProcessorRoot * pVPRoot = pExecutionResourceToDelete->GetVirtualProcessorRoot();
                    ASSERT(pVPRoot != NULL);

                    // Since the root is going away, check if it contributes to the subscription count on the core, and
                    // fix up the count, if so.
                    pVPRoot->ResetSubscriptionLevel();
                    pVPRoot->DeleteThis();
                }
            }
        }

        delete m_pHillClimbing;
    }

#if defined(CONCRT_TRACING)

    /// <summary>
    ///     Sets or clears a flag indicating that the RM needs to do an external thread allocation for this 
    ///     scheduler proxy.
    /// </summary>
    void SchedulerProxy::TraceInitialDRMState()
    {
        int traceCoreIndex = 0;
        for (unsigned int nodeIndex = 0; nodeIndex < m_nodeCount; ++nodeIndex)
        {
            SchedulerNode * pAllocatedNode = &m_pAllocatedNodes[nodeIndex];
            for (unsigned int coreIndex = 0; coreIndex < pAllocatedNode->m_coreCount; ++coreIndex)
            {
                SchedulerCore * pAllocatedCore = &pAllocatedNode->m_pCores[coreIndex];
                SchedulerCoreData * pCoreData = &m_drmInitialState[traceCoreIndex++];
                pCoreData->m_nodeIndex = (unsigned char)nodeIndex;
                pCoreData->m_coreIndex = (unsigned char)coreIndex;
                pCoreData->m_fAllocated = pAllocatedCore->m_coreState == ProcessorCore::Allocated;
                pCoreData->m_fFixed = pAllocatedCore->IsFixed();
                pCoreData->m_fBorrowed = pAllocatedCore->IsBorrowed();
                pCoreData->m_fIdle = pAllocatedCore->IsIdle();
            }
        }
    }

    /// <summary>
    ///     Dumps the allocation, for this scheduler proxy.
    /// </summary>
    void SchedulerProxy::DumpAllocations()
    {
        printf("\nProxy(%p) - Scheduler(id=%d,allocated=%d,min=%d/%d,max=%d/%d,mult=%d,pri=%d,stk=%d)\n", this,
            m_id, m_numAllocatedCores, m_minimumHardwareThreads, m_minConcurrency, m_desiredHardwareThreads, 
            m_maxConcurrency, m_targetOversubscriptionFactor, m_contextPriority, m_contextStackSize);

        if (m_pAllocatedNodes != NULL)
        {
            for (unsigned int i = 0; i < m_nodeCount; ++i)
            {
                SchedulerNode * pNode = &m_pAllocatedNodes[i];

                printf("node=%Ix,count=%d/%d,group=%d\n", pNode->m_id, pNode->m_allocatedCores, pNode->m_coreCount, pNode->m_processorGroup);
                for (unsigned int j = 0; j < pNode->m_coreCount; ++j)
                {
                    printf("allocated=%d,prcNumber=%d\n", pNode->m_pCores[j].m_coreState, pNode->m_pCores[j].m_processorNumber);

                    ExecutionResource *pExecutionResource = pNode->m_pCores[j].m_resources.First();
                    for (; pExecutionResource != NULL; pExecutionResource = pNode->m_pCores[j].m_resources.Next(pExecutionResource))
                    {
                        unsigned int nodeId = pExecutionResource->GetNodeId();
                        unsigned int maskId = pExecutionResource->GetExecutionResourceId();
                        printf("\tExecution Resource(nodeId=%d, maskId=%d, isVproc=%d)\n", nodeId, maskId, pExecutionResource->GetVirtualProcessorRoot() != NULL);
                    }
                }
            }
        }
    }

#endif
} // namespace details
} // namespace Concurrency

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -