⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 schedulerbase.cpp

📁 C语言库函数的原型,有用的拿去
💻 CPP
📖 第 1 页 / 共 3 页
字号:
// ==++==
//
// Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// SchedulerBase.cpp
//
// Implementation file of the metaphor for a concrt scheduler 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

#include "concrtinternal.h"

namespace Concurrency
{
/// <summary>
///     Creates a scheduler that only manages internal contexts. Implicitly calls Reference.
///     If Attach is called, the scheduler is no longer anonymous because it is also managing the external
///     context where Attach was called.  To destroy an anonymous scheduler, Release needs to be called.
/// </summary>
/// <param name="policy">
///     [in] A const reference to the scheduler policy.
/// </param>
/// <returns>
///     A pointer to the new scheduler
/// </returns>
Scheduler* Scheduler::Create(__in const SchedulerPolicy& policy)
{
    ::Concurrency::details::SchedulerBase *pScheduler = ::Concurrency::details::SchedulerBase::Create(policy);
    pScheduler->Reference();
    return pScheduler;
}

/// <summary>
///     Allows a user defined policy to be used to create the default scheduler. It is only valid to call this API when no default
///     scheduler exists. Once a default policy is set, it remains in effect until the next valid callto the API.
/// </summary>
/// <param name="_Policy">
///     [in] The policy to be set as the default. The runtime will make a copy of the policy for its use, and the user
///     is responsible for the lifetime of the policy that is passed in.
/// </param>
void Scheduler::SetDefaultSchedulerPolicy(__in const SchedulerPolicy & _Policy)
{
    ::Concurrency::details::SchedulerBase::SetDefaultSchedulerPolicy(_Policy);
}

/// <summary>
///     Resets the default scheduler policy, and the next time a default scheduler is created, it will use the runtime's default policy settings.
/// </summary>
void Scheduler::ResetDefaultSchedulerPolicy()
{
    ::Concurrency::details::SchedulerBase::ResetDefaultSchedulerPolicy();
}

//
// Internal bit mask definitions for the shutdown gate.
//
#define SHUTDOWN_INITIATED_FLAG                 0x80000000
#define SUSPEND_GATE_FLAG                       0x40000000
#define SHUTDOWN_COMPLETED_FLAG                 0x20000000
#define GATE_COUNT_MASK                         0x1FFFFFFF
#define GATE_FLAGS_MASK                         0xE0000000

namespace details
{
    // Template specializations for types used by the scheduler.
    template <>
    inline unsigned int Hash<HANDLE, ExternalContextBase*>::HashValue(const HANDLE& key, int size)
    {
        ASSERT(size > 0);
        // Handle values are a multiple of 4.
        return (unsigned int) ((size_t)key/4) % size;
    }

    // The default scheduler lock protects access to both the default scheduler as well as the
    // default scheduler policy.
    SchedulerBase *SchedulerBase::s_pDefaultScheduler = NULL;
    SchedulerPolicy *SchedulerBase::s_pDefaultSchedulerPolicy = NULL;

    LONG SchedulerBase::s_initializedCount = 0;
    LONG SchedulerBase::s_oneShotInitializationState = ONESHOT_NOT_INITIALIZED;
    volatile LONG SchedulerBase::s_workQueueIdCounter = 0;
    DWORD SchedulerBase::t_dwContextIndex;

    // Number of suballocators for use by external contexts that are active in the process.
    volatile LONG SchedulerBase::s_numExternalAllocators = 0;

    // The max number of external contexts that could have suballocators at any given time.
    const int SchedulerBase::s_maxExternalAllocators = 32;

    // The maximum depth of the free pool of allocators.
    const int SchedulerBase::s_allocatorFreePoolLimit = 16;

    /// <summary>
    ///     Constructor for SchedulerBase.
    /// </summary>
    SchedulerBase::SchedulerBase(__in const ::Concurrency::SchedulerPolicy& policy) :
        m_policy(policy),
        m_refCount(0),
        m_attachCount(0),
        m_id(-1),
        m_boundContextCount(0),
        m_internalContextCountPlusOne(1),
        m_nextSchedulingRingIndex(0),
        m_contextIdCounter(-1),
        m_scheduleGroupIdCounter(-1),
        m_vprocShutdownGate(0),
        m_activeVProcCount(0),
        m_initialReference(0),
        m_enqueuedTaskCounter(0),
        m_dequeuedTaskCounter(0),
        m_enqueuedTaskCheckpoint(0),
        m_dequeuedTaskCheckpoint(0),
        m_pResourceManager(NULL),
        m_pSchedulerResourceManagement(NULL),
        m_externalThreadStatistics(NULL, 256, ListArray<ExternalStatistics>::DeletionThresholdInfinite),
        m_safePointDataVersion(0),
        m_safePointCommitVersion(0),
        m_safePointPendingVersion(0)
    {
        m_schedulerKind             = (::Concurrency::SchedulerType) policy.GetPolicyValue(::Concurrency::SchedulerKind);
        m_localContextCacheSize     = (unsigned short) policy.GetPolicyValue(::Concurrency::LocalContextCacheSize);
        m_schedulingProtocol        = (::Concurrency::SchedulingProtocolType) policy.GetPolicyValue(::Concurrency::SchedulingProtocol);

        //
        // This is a count before which we will **NOT** perform any throttling.  In the event of repeated latent blocking, we will reach
        // this number of threads rapidly.  By default, we choose this number to be 4x the number of cores.  If a client has specified a
        // MinConcurrency value that implies a greater number of vprocs than this, we will adjust the throttling limit upwards to 
        // MinConcurrency.  This may result in poorer overall throttling performance; however -- one would expect that most clients aren't
        // requesting > 4x oversubscription.
        //
        m_threadsBeforeThrottling   = max(::Concurrency::GetProcessorCount() * 4, policy.GetPolicyValue(::Concurrency::MinConcurrency));

        // Allocate a TLS slot to track statistics for threads alien to this scheduler
        m_dwExternalStatisticsIndex = TlsAlloc();
        if (m_dwExternalStatisticsIndex == TLS_OUT_OF_INDEXES)
        {
            throw scheduler_resource_allocation_error(HRESULT_FROM_WIN32(GetLastError()));
        }

    }

    SchedulerBase::~SchedulerBase()
    {
        Cleanup();
    }

    void SchedulerBase::Cleanup()
    {
        for (int idx = 0; idx < m_nodeCount; ++idx)
            delete m_nodes[idx];

        for (int idx = 0; idx < m_nodeCount; ++idx)
            delete m_rings[idx];

        delete [] m_nodes;
        delete [] m_rings;

        // Cleanup a TLS slot and allow a reuse
        TlsFree(m_dwExternalStatisticsIndex); 
        m_dwExternalStatisticsIndex = 0;

        m_pResourceManager->Release();
        SchedulerBase::StaticDestruction();
    }

    // race is fine -- only for inputting work
    SchedulingRing *SchedulerBase::GetNextSchedulingRing()
    {
        SchedulingRing *pRing = m_rings[m_nextSchedulingRingIndex];
        ASSERT(pRing != NULL);
        m_nextSchedulingRingIndex = GetNextValidSchedulingRingIndex(m_nextSchedulingRingIndex);
        return pRing;
    }

    int SchedulerBase::GetValidSchedulingRingIndex(int idx)
    {
        ASSERT(idx >= 0 && idx <= m_nodeCount);
        if (m_rings[idx] == NULL)
            return GetNextValidSchedulingRingIndex(idx);
        return idx;
    }

    int SchedulerBase::GetNextValidSchedulingRingIndex(int idx)
    {
        ASSERT(idx >= 0 && idx <= m_nodeCount);
        do
        {
            idx = (idx+1) % m_nodeCount;
        } while (m_rings[idx] == NULL);
        return idx;
    }

    SchedulingRing *SchedulerBase::GetNextSchedulingRing(const SchedulingRing *pOwningRing, SchedulingRing *pCurrentRing)
    {
        ASSERT(pCurrentRing != NULL && pOwningRing != NULL);

        SchedulingRing *pRing = m_rings[GetNextValidSchedulingRingIndex(pCurrentRing->Id())];
        ASSERT(pRing != NULL);
        if (pRing == pOwningRing)
            pRing = NULL;
        return pRing;
    }

    /// <summary>
    ///     Creates a scheduler instance
    /// </summary>
    /// <param name="policy">
    ///     [in] A const reference to the scheduler policy.
    /// </param>
    /// <returns>
    ///     A pointer to the new scheduler An exception is thrown if an error occurs.
    /// </returns>
    __ecount(1) SchedulerBase* SchedulerBase::Create(__in const SchedulerPolicy& policy)
    {
        SchedulerBase *pScheduler = CreateWithoutInitializing(policy);
        // Obtain hardware threads, initialize virtual processors, etc.
        pScheduler->Initialize();

        return pScheduler;
    }

    /// <summary>
    ///     Creates a scheduler instance
    /// </summary>
    /// <param name="policy">
    ///     [in] A const pointer to the scheduler policy.
    /// </param>
    /// <returns>
    ///     A pointer to the new scheduler An exception is thrown if an error occurs.
    /// </returns>
    __ecount(1) SchedulerBase* SchedulerBase::CreateWithoutInitializing(__in const SchedulerPolicy& policy)
    {
        policy._ValidateConcRTPolicy();
        CheckStaticConstruction();

        ::Concurrency::SchedulerType schedulerKind = (::Concurrency::SchedulerType) policy.GetPolicyValue(::Concurrency::SchedulerKind);
        SchedulerBase *pScheduler = NULL;

        if (schedulerKind == ::Concurrency::ThreadScheduler)
        {
            pScheduler = ThreadScheduler::Create(policy);
        }
        else
        {
            ASSERT(schedulerKind == ::Concurrency::UmsThreadDefault);
            pScheduler = UMSThreadScheduler::Create(policy);
        }

        ASSERT(pScheduler != NULL);

        return pScheduler;
    }

    /// <summary>
    ///     Generates a unique identifier for a context.
    /// </summary>
    unsigned int SchedulerBase::GetNewContextId()
    {
        return (unsigned int) InterlockedIncrement(&m_contextIdCounter);
    }

    /// <summary>
    ///     Generates a unique identifier for a schedule group.
    /// </summary>
    unsigned int SchedulerBase::GetNewScheduleGroupId()
    {
        return (unsigned int) InterlockedIncrement(&m_scheduleGroupIdCounter);
    }

    /// <summary>
    ///     Generates a unique identifier for a work queue (across scheduler instances in the process).
    /// </summary>
    unsigned int SchedulerBase::GetNewWorkQueueId()
    {
        return (unsigned int) InterlockedIncrement(&s_workQueueIdCounter);
    }

    /// <summary>
    ///     Anything which requires a one shot pattern of initialization with no destruction until termination goes here.
    /// </summary>
    void SchedulerBase::OneShotStaticConstruction()
    {
        _SpinCount::_Initialize();

        //
        // The TLS indicies must be one-shot as they are used outside the domain of guaranteed scheduler presence.  We cannot free them
        // until process-exit/CRT-unload or we'll have races with scheduler teardown/creation and outside APIs which require the TLS indicies.
        //

        t_dwContextIndex = TlsAlloc();
        if (t_dwContextIndex == TLS_OUT_OF_INDEXES)
        {
            throw scheduler_resource_allocation_error(HRESULT_FROM_WIN32(GetLastError()));
        }

        UMSThreadScheduler::OneShotStaticConstruction();
    }

    /// <summary>
    ///     Anything which requires a pattern of demand initialization upon first scheduler creation and destruction upon last
    ///     scheduler destruction goes here.
    /// </summary>
    void SchedulerBase::StaticConstruction()
    {
    }

    /// <summary>
    ///     Called to ensure static construction is performed upon creation of a scheduler.
    /// </summary>
    void SchedulerBase::CheckStaticConstruction()
    {
        _StaticLock::_Scoped_lock lockHolder(s_schedulerLock);

        if (InterlockedIncrement(&s_initializedCount) == 1) 
        {
            //
            // all static initialization here
            //
            StaticConstruction();

            if ((s_oneShotInitializationState & ONESHOT_INITIALIZED_FLAG) == 0)
            {
                OneShotStaticConstruction();

                //
                // This both guarantees a full fence and protects against simultaneous manipulation of the reference count stored within the lower
                // 31 bits of s_oneShotInitializationState.
                //
                InterlockedOr(&s_oneShotInitializationState, ONESHOT_INITIALIZED_FLAG);
            }
        }
    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -