⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 schedulerproxy.cpp

📁 C语言库函数的原型,有用的拿去
💻 CPP
📖 第 1 页 / 共 4 页
字号:
                    pExecutionResource = NULL;
                }
                else
                {
                    pExecutionResource->IncrementUseCounts();
                }
                pThreadProxy->ExitCriticalRegion();
            }
        }

        if (pExecutionResource != NULL)
        {
            return GetResourceForNewSubscription(pExecutionResource);
        }

        return pExecutionResource;
    }

    /// <summary>
    ///     Creates or reuses an execution resource for the thread subscription
    /// </summary>
    ExecutionResource * SchedulerProxy::GetResourceForNewSubscription(ExecutionResource * pParentExecutionResource)
    {
        ExecutionResource * pExecutionResource = NULL;

        if (pParentExecutionResource->GetSchedulerProxy() != this)
        {
            pExecutionResource = new ExecutionResource(this, pParentExecutionResource);
            pExecutionResource->IncrementUseCounts();
        }
        else
        {
            pExecutionResource = pParentExecutionResource;
        }

        return pExecutionResource;
    }

    /// <summary>
    ///     Registers that a call to SubscribeCurrentThread has occured for this core, making this core immovable.
    /// </summary>
    void SchedulerProxy::IncrementFixedCoreCount(unsigned int nodeId, unsigned int coreIndex, bool isExternalThread)
    {
        SchedulerCore * pCore = &m_pAllocatedNodes[nodeId].m_pCores[coreIndex];
        if (pCore->m_numFixedThreads++ == 0)
        {
            SchedulerNode * pNode = &m_pAllocatedNodes[nodeId];
            pNode->m_numFixedCores++;
            m_numFixedCores++;
            if (pCore->IsBorrowed())
            {
                // When a core becomes fixed, we temporarily remove the borrowed flag on it, and restore it when it
                // becomes movable again.
                pCore->m_fPreviouslyBorrowed = true;
                ToggleBorrowedState(pNode, coreIndex);
            }
        }

        // Increment the external thread count on the core, which helps account for all the resources running on that core.
        if (isExternalThread)
        {
            m_numExternalThreads++;
            pCore->m_numExternalThreads++;
        }
    }

    /// <summary>
    ///     Registers that a call to IExecutionResource::Release has occured, potentially freeing this core.
    /// </summary>
    void SchedulerProxy::DecrementFixedCoreCount(unsigned int nodeId, unsigned int coreIndex, bool isExternalThread)
    {
        SchedulerCore * pCore = &m_pAllocatedNodes[nodeId].m_pCores[coreIndex];
        // Decrement external thread count on the core which helps account for all the resources running on that core.
        if (isExternalThread)
        {
            ASSERT(pCore->m_numExternalThreads > 0);
            pCore->m_numExternalThreads--;
            m_numExternalThreads--;
        }

        ASSERT(pCore->m_numFixedThreads > 0);
        if (--pCore->m_numFixedThreads == 0)
        {
            SchedulerNode * pNode = &m_pAllocatedNodes[nodeId];
            ASSERT(pCore->m_numExternalThreads == 0);
            m_numFixedCores--;
            pNode->m_numFixedCores--;

            if (pCore->m_fPreviouslyBorrowed)
            {
                // If this was a borrowed core convereted to fixed due to a subscription request, we restore the state
                // back to borrowed, here.
                ASSERT(!pCore->IsBorrowed());
                ToggleBorrowedState(pNode, coreIndex);
                pCore->m_fPreviouslyBorrowed = false;
            }

            // If this core was owned only due to an external thread being on it, then there is
            // no more reason for it to be marked as such.
            if (isExternalThread && m_pAllocatedNodes[nodeId].m_pCores[coreIndex].m_numAssignedThreads == 0)
            {
                m_numExternalThreadCores--;
            }
        }
    }

    /// <summary>
    ///     This API registers the current thread with the resource manager associating it with this scheduler proxy,
    ///     and returns an instance of IExecutionResource back to the scheduler for bookkeeping and maintenance.
    /// </summary>
    /// <returns>
    ///     The IExecutionResource instance representing current thread in the runtime.
    /// </returns>
    IExecutionResource * SchedulerProxy::SubscribeCurrentThread()
    {
        return m_pResourceManager->SubscribeCurrentThread(this);
    }

    /// <summary>
    ///     Creates a new execution resource for the external thread and registers it with the scheduler proxy.
    /// </summary>
    ExecutionResource * SchedulerProxy::CreateExternalThreadResource(SchedulerNode * pNode, unsigned int coreIndex)
    {
        ExecutionResource * pExecutionResource = new ExecutionResource(this, pNode, coreIndex);
        pExecutionResource->IncrementUseCounts();
        return pExecutionResource;
    }

    /// <summary>
    ///     Adds the execution resource to the list of subscribed threads
    /// </summary>
    void SchedulerProxy::AddThreadSubscription(ExecutionResource * pExecutionResource)
    {
        m_threadSubscriptions.AddTail(pExecutionResource);
    }

    /// <summary>
    ///     Removes the execution resource from the list of subscribed threads
    /// </summary>
    void SchedulerProxy::RemoveThreadSubscription(ExecutionResource * pExecutionResource)
    {
        m_threadSubscriptions.Remove(pExecutionResource);
        delete pExecutionResource;
    }

    /// <summary>
    ///     Finds the core allocated by the RM on which a single subscribed external thread should run, OR
    ///     if doOversubscribeCore is true, find the core with the smallest use count to oversubscribe.
    /// </summary>
    ExecutionResource * SchedulerProxy::GrantExternalThreadAllocation(bool doOversubscribeCore)
    {
        unsigned int lowestUseCount = (unsigned int) -1;
        unsigned int lowestUseCoreIndex = (unsigned int) -1;
        SchedulerNode * pLowestUseNode = NULL;

        unsigned int currentNodeIndex = (unsigned int) -1;

        if (doOversubscribeCore)
        {
            currentNodeIndex = m_pResourceManager->GetCurrentNodeAndCore(NULL);
        }

        for (unsigned int nodeIndex = 0; nodeIndex < m_nodeCount; ++nodeIndex)
        {
            SchedulerNode * pNode = &m_pAllocatedNodes[nodeIndex];
            if (pNode->m_allocatedCores > 0)
            {
                for(unsigned int coreIndex = 0; coreIndex < pNode->m_coreCount; ++coreIndex)
                {
                    SchedulerCore * pCore = &pNode->m_pCores[coreIndex];

                    if (pCore->m_coreState == ProcessorCore::Allocated)
                    {
                        unsigned int totalUseCount = pCore->m_numAssignedThreads + pCore->m_numExternalThreads;

                        if (doOversubscribeCore)
                        {
                            ASSERT(totalUseCount > 0);

                            // If this core has the lowest use count, remember it
                            // If this core has the same use count as the lowest and is on the affinitized node, remember it
                            if (totalUseCount < lowestUseCount || (totalUseCount == lowestUseCount && nodeIndex == currentNodeIndex))
                            {
                                lowestUseCount = totalUseCount;
                                pLowestUseNode = pNode;
                                lowestUseCoreIndex = coreIndex;
                            }
                        }
                        else if (totalUseCount == 0)
                        {
                            m_numExternalThreadCores++;
                            m_numAllocatedCores++;
                            return CreateExternalThreadResource(pNode, coreIndex);
                        }
                    }
                }
            }
        }

        ASSERT(doOversubscribeCore);
        ASSERT(pLowestUseNode != NULL);

        return CreateExternalThreadResource(pLowestUseNode, lowestUseCoreIndex);
    }

    /// <summary>
    ///     Called by the RM when it is done allocating cores for the scheduler proxy. Gives the proxy
    ///     an array of nodes and cores.
    /// </summary>
    ExecutionResource * SchedulerProxy::GrantAllocation(SchedulerNode * pAllocatedNodes, unsigned int nodeCount, unsigned int numberAllocated, bool doExternalThreadAllocation)
    {
        ASSERT(m_pAllocatedNodes == NULL);
        ASSERT(m_numAllocatedCores == 0);
        ASSERT(m_numExternalThreads == 0);

        m_nodeCount = nodeCount;

        // The RM provides the scheduler proxy with an array of nodes and cores, with 'numberAllocated' of those cores marked
        // as ProcessorCore::Allocated. These are the cores that the RM has deemed sufficient to satisfy the request of this
        // scheduler proxy based on its policy values and the availability of resources.
        m_pAllocatedNodes = pAllocatedNodes;
        m_numAllocatedCores = numberAllocated;

        m_pSortedNodeOrder = new unsigned int[m_nodeCount];
        for (unsigned int i = 0; i < m_nodeCount; ++i)
        {
            m_pSortedNodeOrder[i] = i;
        }

        // Calculate the number of virtual processors we will give this scheduler based on the core allocation
        // we received. Each core will be allocated either m_tof vprocs or m_tof - 1 vprocs, based on the
        // desired hardware threads and the value for max concurrency.
        unsigned int vprocCores = m_numAllocatedCores;
        bool hasExternalThreadCore = false;

        // If we have an external thread we may need to adjust the number of vprocs cores we have. Note that we use m_minimumHardwareThreads
        // and not MinHWThreads() (the latter includes the external thread count).
        if (doExternalThreadAllocation && vprocCores > m_minimumHardwareThreads)
        {
            hasExternalThreadCore = true;
            vprocCores--;
        }

        unsigned int vprocCount = 0; 

        ASSERT(m_numFullySubscribedCores > 0 && m_numFullySubscribedCores <= m_desiredHardwareThreads);
        if (vprocCores <= m_numFullySubscribedCores)
        {
            vprocCount = vprocCores * m_targetOversubscriptionFactor;
        }
        else
        {
            vprocCount = (m_numFullySubscribedCores * m_targetOversubscriptionFactor) + 
                            ((vprocCores - m_numFullySubscribedCores) * (m_targetOversubscriptionFactor - 1));
        }

        ASSERT(vprocCount >= m_minConcurrency && vprocCount <= m_maxConcurrency);
        m_numAssignedThreads = vprocCount;

        ExecutionResource * pExecutionResource = NULL;
        IVirtualProcessorRoot** vprocArray = new IVirtualProcessorRoot *[vprocCount];
        unsigned int vprocIndex = 0;
        unsigned int coresAssigned = 0;
        bool externalThreadAllocated = !doExternalThreadAllocation;

        for (unsigned int nodeIndex = 0; coresAssigned < m_numAllocatedCores && nodeIndex < m_nodeCount; ++nodeIndex)
        {
            SchedulerNode * pNode = &m_pAllocatedNodes[nodeIndex];
            if (pNode->m_allocatedCores > 0)
            {
                for(unsigned int coreIndex = 0; coresAssigned < m_numAllocatedCores && coreIndex < pNode->m_coreCount; ++coreIndex)
                {
                    SchedulerCore * pCore = &pNode->m_pCores[coreIndex];
                    if (pCore->m_coreState == ProcessorCore::Allocated)
                    {
                        ASSERT(pCore->m_numAssignedThreads == 0 && pCore->m_numFixedThreads == 0);

                        ++coresAssigned;

                        // If the external thread also needs a core, first try to put it in a node whose affinity is a superset of the hardware thread
                        // it is currently running on. If not, reaffinitize it.
                        if (!externalThreadAllocated && ((coresAssigned == m_numAllocatedCores) || (m_pResourceManager->GetCurrentNodeAndCore(NULL) == nodeIndex)))
                        {
                            // Create an execution resource and affinitize it to this node
                            pExecutionResource = CreateExternalThreadResource(pNode, coreIndex);
                            externalThreadAllocated = true;

                            // If this core was reserved for the external thread, there is no need to allocate any vprocs on it.
                            if (hasExternalThreadCore)
                            {
                                m_numExternalThreadCores++;
                                continue;
                            }
                        }

                        // Create virtual processor roots in the scheduler proxy, corresponding to the node and core we're currently looking at.
                        unsigned int numVprocs = 0;
                        if (m_numFullySubscribedCores > 0)
                        {
                            numVprocs = m_targetOversubscriptionFactor;
                            // As we assign m_tof threads to a core, we decrement this value. This value is also updated in
                            // AddCore and RemoveCore. After the scheduler proxy has been given its initial allocation
                            // or resources, this variable keeps track of how many out of the remaining quota of cores the
                            // scheduler proxy could acquire (desired - allocated) would get tof threads per core if they
                            // were added to the scheduler during dynamic core migration.
                            --m_numFullySubscribedCores;
                        }
                        else
                        {
                            numVprocs = m_targetOversubscriptionFactor - 1;
                        }
                        pCore->m_numAssignedThreads += numVprocs;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -