⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 schedulerproxy.cpp

📁 C语言库函数的原型,有用的拿去
💻 CPP
📖 第 1 页 / 共 4 页
字号:
// ==++==
//
// Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// SchedulerProxy.cpp
//
// RM proxy for a scheduler instance
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

#include "concrtinternal.h"

namespace Concurrency
{
namespace details
{
    /// <summary>
    ///     Constructs a scheduler proxy.
    /// </summary>
    SchedulerProxy::SchedulerProxy(IScheduler * pScheduler, ResourceManager * pResourceManager, const SchedulerPolicy &policy) :
        m_pResourceManager(pResourceManager),
        m_pThreadProxyFactory(NULL),
        m_pAllocatedNodes(NULL),
        m_pSortedNodeOrder(NULL),
        m_nodeCount(0),
        m_numAllocatedCores(0),
        m_numBorrowedCores(0),
        m_numExternalThreads(0),
        m_numExternalThreadCores(0),
        m_numFixedCores(0),
        m_queueLength(0),
        m_pHillClimbing(NULL),
        m_fCoresStolen(false),
        m_fNeedsExternalThreadAllocation(false),
        m_currentConcurrency(0)
    {
        ASSERT(pScheduler != NULL);

        m_pScheduler = pScheduler;

        m_maxConcurrency                = policy.GetPolicyValue(::Concurrency::MaxConcurrency);
        m_minConcurrency                = policy.GetPolicyValue(::Concurrency::MinConcurrency);
        m_targetOversubscriptionFactor  = policy.GetPolicyValue(::Concurrency::TargetOversubscriptionFactor);
        m_contextStackSize              = policy.GetPolicyValue(::Concurrency::ContextStackSize);
        m_contextPriority               = policy.GetPolicyValue(::Concurrency::ContextPriority);
        m_fDoHillClimbing               = policy.GetPolicyValue(::Concurrency::DynamicProgressFeedback) == ::Concurrency::ProgressFeedbackEnabled;

        if (m_contextPriority == INHERIT_THREAD_PRIORITY)
        {
            m_contextPriority = (char) GetThreadPriority(GetCurrentThread());
        }

        m_id = m_pScheduler->GetId();
        ASSERT(m_id != -1);

        unsigned int coreCount = m_pResourceManager->GetCoreCount();

        m_physicalResourceCount = coreCount;

        ASSERT(coreCount > 0 && coreCount <= INT_MAX);
        ASSERT(m_minConcurrency >= 0 && m_maxConcurrency > 0 && m_maxConcurrency >= m_minConcurrency);

        unsigned int originalTof = m_targetOversubscriptionFactor;

        // Find the mininum target oversubscription factor required to satisfy MaxConcurrency with the cores available.
        unsigned int minTof = (m_maxConcurrency + coreCount - 1)/coreCount;

        if (originalTof < minTof)
        {
            // Adjust target oversubscription factor to ensure that we can satisfy MaxConcurrency with the cores on the system.
            m_targetOversubscriptionFactor  = minTof;
            // The scheduler needs all the cores on the machine to satisfy max threads. Moreover we will need to oversubscribe
            // more than the user indicated.
            m_desiredHardwareThreads = coreCount;
        }
        else
        {
            m_desiredHardwareThreads = (m_maxConcurrency + originalTof - 1)/originalTof;
        }

        // Now adjust target oversubscription factor to ensure that MaxConcurrency virtual processors are evenly distributed
        // over the desired number of hardware threads (i.e each core gets either m_tof vprocs or m_tof - 1 vprocs). Also
        // calculate how many of the assigned cores will get m_tof vprocs.
        if ((m_maxConcurrency % m_desiredHardwareThreads) == 0)
        {
            // This is the common case. We have a simple distribution and every allocated core will get tof vprocs.
            m_targetOversubscriptionFactor = m_maxConcurrency/m_desiredHardwareThreads;
            m_numFullySubscribedCores = m_desiredHardwareThreads;
            m_minimumHardwareThreads = (m_minConcurrency + m_targetOversubscriptionFactor - 1)/m_targetOversubscriptionFactor;
        }
        else
        {
            // We have an uneven distribution; some cores will get tof vprocs and some will get tof - 1.
            ASSERT(m_targetOversubscriptionFactor > 1);

            m_targetOversubscriptionFactor = (m_maxConcurrency + m_desiredHardwareThreads - 1)/m_desiredHardwareThreads;
            m_numFullySubscribedCores = m_desiredHardwareThreads - ((m_desiredHardwareThreads * m_targetOversubscriptionFactor) - m_maxConcurrency);

            // Calculate min hardware threads. We need to make sure that given the way vprocs are distributed to cores
            // (where some cores could get tof vprocs and some could get tof - 1 vprocs), the scheduler proxy will never go below
            // min concurrency if it is left with just the minimum number of cores (and all of those cores happen to have tof -1
            // vprocs assigned to them).
            if (((m_desiredHardwareThreads - m_numFullySubscribedCores) * (m_targetOversubscriptionFactor - 1)) >= m_minConcurrency)
            {
                m_minimumHardwareThreads = (m_minConcurrency + m_targetOversubscriptionFactor - 2)/(m_targetOversubscriptionFactor - 1);
            }
            else
            {
                m_minimumHardwareThreads = (m_desiredHardwareThreads - m_numFullySubscribedCores);

                unsigned int remainingThreads = (m_minConcurrency - (m_minimumHardwareThreads * (m_targetOversubscriptionFactor - 1)));
                ASSERT(remainingThreads < m_minConcurrency);
                m_minimumHardwareThreads += (remainingThreads + m_targetOversubscriptionFactor - 1)/m_targetOversubscriptionFactor;
            }
        }

        ASSERT(m_maxConcurrency <= m_targetOversubscriptionFactor * m_desiredHardwareThreads);
        ASSERT(m_numFullySubscribedCores <= m_desiredHardwareThreads);
        ASSERT(m_targetOversubscriptionFactor > 1 || m_numFullySubscribedCores == m_desiredHardwareThreads);

        ASSERT(m_targetOversubscriptionFactor > 0 && m_targetOversubscriptionFactor <= INT_MAX);
        ASSERT(m_desiredHardwareThreads > 0 && m_desiredHardwareThreads <= coreCount);
        ASSERT(m_minimumHardwareThreads >= 0 && m_desiredHardwareThreads > 0 && m_minimumHardwareThreads <= m_desiredHardwareThreads);

        // Hold a reference to the resource manager.
        int ref = m_pResourceManager->Reference();
        (ref);
        CORE_ASSERT(ref > 1);

        if (m_fDoHillClimbing)
        {
            m_pHillClimbing = new HillClimbing(m_id, coreCount, this);
        }
#if defined(CONCRT_TRACING)
        m_drmInitialState = NULL;
#endif
    }

    /// <summary>
    ///     Called by a scheduler in order make an initial request for an allocation of virtual processors.  The request
    ///     is driven by policies within the scheduler queried via the IScheduler::GetPolicy method.  If the request
    ///     can be satisfied via the rules of allocation, it is communicated to the scheduler as a call to
    ///     IScheduler::AddVirtualProcessors.
    /// </summary>
    /// <param name="doSubscribeCurrentThread">
    ///     Whether to subscribe the current thread and account for it during resource allocation.
    /// </param>
    /// <returns>
    ///     The IExecutionResource instance representing current thread if doSubscribeCurrentThread was true; NULL otherwise.
    /// </returns>
    IExecutionResource * SchedulerProxy::RequestInitialVirtualProcessors(bool doSubscribeCurrentThread)
    {
        return m_pResourceManager->RequestInitialVirtualProcessors(this, doSubscribeCurrentThread);
    }

    /// <summary>
    ///     Called in order to notify the resource manager that the given scheduler is shutting down.  This
    ///     will cause the resource manager to immediately reclaim all resources granted to the scheduler.
    /// </summary>
    void SchedulerProxy::Shutdown()
    {
        m_pResourceManager->Shutdown(this);
    }

    /// <summary>
    ///     Gets a new thread proxy from the factory.
    /// </summary>
    IThreadProxy * SchedulerProxy::GetNewThreadProxy(IExecutionContext * pContext)
    {
        if (m_pThreadProxyFactory == NULL)
        {
            // Populate the cached pointer from the one in the RM
            m_pThreadProxyFactory = GetResourceManager()->GetThreadProxyFactoryManager()->GetFreeThreadProxyFactory();
        }

        FreeThreadProxy * pProxy = static_cast<FreeThreadProxy *>(m_pThreadProxyFactory->RequestProxy(ContextStackSize(), ContextPriority()));
        pProxy->AssociateExecutionContext(pContext);

        return pProxy;
    }

    /// <summary>
    ///     Ensures that a context is bound to a thread proxy.  This API should *NOT* be called in the vast majority of circumstances.
    ///     The IThreadProxy::SwitchTo will perform late binding to thread proxies as necessary.  There are, however, circumstances
    ///     where it is necessary to pre-bind a context to ensure that the SwitchTo operation switches to an already bound context.  This
    ///     is the case on a UMS scheduling context as it cannot call allocation APIs.
    /// </summary>
    /// <param name="pContext">
    ///     The context to bind.
    /// </param>
    void SchedulerProxy::BindContext(IExecutionContext * pContext)
    {
        if (pContext == NULL)
        {
            throw std::invalid_argument("pContext");
        }

        // Find out if this context already has a thread proxy, if not we have to request one from the factory.
        if (pContext->GetProxy() == NULL)
        {
            // Find a thread proxy from the pool that corresponds to the stack size and priority we need.
            GetNewThreadProxy(pContext);
        }
    }

    /// <summary>
    ///     Returns an **unstarted** thread proxy attached to pContext, to the thread proxy factory.
    ///     Such a thread proxy **must** be unstarted.
    ///     This API should *NOT* be called in the vast majority of circumstances.
    /// </summary>
    /// <param name="pContext">
    ///     The context to unbind.
    /// </param>
    void SchedulerProxy::UnbindContext(IExecutionContext * pContext)
    {
        if (pContext == NULL)
        {
            throw std::invalid_argument("pContext");
        }

        FreeThreadProxy * pProxy = static_cast<FreeThreadProxy *> (pContext->GetProxy());

        ASSERT(pProxy != NULL);
        pProxy->ReturnIdleProxy();
    }

    /// <summary>
    ///     This function retrieves the execution resource associated with this thread, if one exists
    /// </summary>
    /// <returns>
    ///     The ExecutionResource instance representing current thread in the runtime.
    /// </returns>
    ExecutionResource * SchedulerProxy::GetCurrentThreadExecutionResource()
    {
        ExecutionResource * pExecutionResource = NULL;
        DWORD tlsSlot = GetResourceManager()->GetExecutionResourceTls();
        void * tlsPointer = TlsGetValue(tlsSlot);
        size_t tlsValue = (size_t) tlsPointer;

        if ((tlsPointer != NULL) && ((tlsValue & TlsResourceBitMask) == TlsResourceInResource))
        {
            pExecutionResource = (ExecutionResource *) tlsValue;
        }

        return pExecutionResource;
    }

    /// <summary>
    ///     This function retrieves the execution resource associated with this thread, if one exists,
    ///     and updates the reference count on it for better bookkeeping.
    /// </summary>
    /// <returns>
    ///     The ExecutionResource instance representing current thread in the runtime.
    /// </returns>
    ExecutionResource * SchedulerProxy::ReferenceCurrentThreadExecutionResource()
    {
        ExecutionResource * pExecutionResource = NULL;
        DWORD tlsSlot = GetResourceManager()->GetExecutionResourceTls();
        void * tlsPointer = TlsGetValue(tlsSlot);

        if (tlsPointer != NULL)
        {
            size_t tlsValue = (size_t) tlsPointer;

            if ((tlsValue & TlsResourceBitMask) == TlsResourceInResource)
            {
                pExecutionResource = (ExecutionResource *) tlsValue;

                VirtualProcessorRoot * pVPRoot = pExecutionResource->GetVirtualProcessorRoot();

                // If this is a nested subscribe call then if there was a virtual processor root,
                // it could not have been removed, because it would have been marked as "fixed".
                ASSERT(pVPRoot == NULL || !pVPRoot->IsRootRemoved());
                pExecutionResource->IncrementUseCounts();
            }
            else if ((tlsValue & TlsResourceBitMask) == TlsResourceInProxy)
            {
                FreeThreadProxy * pThreadProxy = (FreeThreadProxy *) (((size_t) tlsValue) & ~TlsResourceInProxy);
                pExecutionResource = pThreadProxy->GetVirtualProcessorRoot()->GetExecutionResource();
                VirtualProcessorRoot * pVPRoot = pExecutionResource->GetVirtualProcessorRoot();
                if (pVPRoot != NULL && pVPRoot->IsRootRemoved())
                {
                    // The virtual processor root that this thread is running on has been removed. We have to
                    // create a new execution resource abstraction for the current thread and perform an external
                    // thread allocation for this scheduler proxy.
                    pExecutionResource = NULL;
                }
                else
                {
                    pExecutionResource->IncrementUseCounts();
                }
            }
            else
            {
                ASSERT((tlsValue & TlsResourceBitMask) == TlsResourceInUMSProxy);
                UMSFreeThreadProxy * pThreadProxy = (UMSFreeThreadProxy *) (((size_t) tlsValue) & ~TlsResourceInUMSProxy);

                // For a UMS thread proxy we need to be in a critical region while accessing the virtual processor root, and
                // until we increment a count on the execution resource, making the underlying vproc 'fixed', if it is not
                // already so.
                pThreadProxy->EnterCriticalRegion();
                pExecutionResource = pThreadProxy->GetVirtualProcessorRoot()->GetExecutionResource();

                VirtualProcessorRoot * pVPRoot = pExecutionResource->GetVirtualProcessorRoot();
                if (pVPRoot != NULL && pVPRoot->IsRootRemoved())
                {
                    // The virtual processor root that this thread is running on has been removed. We have to
                    // create a new execution resource abstraction for the current thread and perform an external
                    // thread allocation for this scheduler proxy.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -