📄 schedulerbase.cpp
字号:
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// SchedulerBase.cpp
//
// Implementation file of the metaphor for a concrt scheduler
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
#include "concrtinternal.h"
namespace Concurrency
{
/// <summary>
/// Creates a scheduler that only manages internal contexts. Implicitly calls Reference.
/// If Attach is called, the scheduler is no longer anonymous because it is also managing the external
/// context where Attach was called. To destroy an anonymous scheduler, Release needs to be called.
/// </summary>
/// <param name="policy">
/// [in] A const reference to the scheduler policy.
/// </param>
/// <returns>
/// A pointer to the new scheduler
/// </returns>
Scheduler* Scheduler::Create(__in const SchedulerPolicy& policy)
{
::Concurrency::details::SchedulerBase *pScheduler = ::Concurrency::details::SchedulerBase::Create(policy);
pScheduler->Reference();
return pScheduler;
}
/// <summary>
/// Allows a user defined policy to be used to create the default scheduler. It is only valid to call this API when no default
/// scheduler exists. Once a default policy is set, it remains in effect until the next valid callto the API.
/// </summary>
/// <param name="_Policy">
/// [in] The policy to be set as the default. The runtime will make a copy of the policy for its use, and the user
/// is responsible for the lifetime of the policy that is passed in.
/// </param>
void Scheduler::SetDefaultSchedulerPolicy(__in const SchedulerPolicy & _Policy)
{
::Concurrency::details::SchedulerBase::SetDefaultSchedulerPolicy(_Policy);
}
/// <summary>
/// Resets the default scheduler policy, and the next time a default scheduler is created, it will use the runtime's default policy settings.
/// </summary>
void Scheduler::ResetDefaultSchedulerPolicy()
{
::Concurrency::details::SchedulerBase::ResetDefaultSchedulerPolicy();
}
//
// Internal bit mask definitions for the shutdown gate.
//
#define SHUTDOWN_INITIATED_FLAG 0x80000000
#define SUSPEND_GATE_FLAG 0x40000000
#define SHUTDOWN_COMPLETED_FLAG 0x20000000
#define GATE_COUNT_MASK 0x1FFFFFFF
#define GATE_FLAGS_MASK 0xE0000000
namespace details
{
// Template specializations for types used by the scheduler.
template <>
inline unsigned int Hash<HANDLE, ExternalContextBase*>::HashValue(const HANDLE& key, int size)
{
ASSERT(size > 0);
// Handle values are a multiple of 4.
return (unsigned int) ((size_t)key/4) % size;
}
// The default scheduler lock protects access to both the default scheduler as well as the
// default scheduler policy.
SchedulerBase *SchedulerBase::s_pDefaultScheduler = NULL;
SchedulerPolicy *SchedulerBase::s_pDefaultSchedulerPolicy = NULL;
LONG SchedulerBase::s_initializedCount = 0;
LONG SchedulerBase::s_oneShotInitializationState = ONESHOT_NOT_INITIALIZED;
volatile LONG SchedulerBase::s_workQueueIdCounter = 0;
DWORD SchedulerBase::t_dwContextIndex;
// Number of suballocators for use by external contexts that are active in the process.
volatile LONG SchedulerBase::s_numExternalAllocators = 0;
// The max number of external contexts that could have suballocators at any given time.
const int SchedulerBase::s_maxExternalAllocators = 32;
// The maximum depth of the free pool of allocators.
const int SchedulerBase::s_allocatorFreePoolLimit = 16;
/// <summary>
/// Constructor for SchedulerBase.
/// </summary>
SchedulerBase::SchedulerBase(__in const ::Concurrency::SchedulerPolicy& policy) :
m_policy(policy),
m_refCount(0),
m_attachCount(0),
m_id(-1),
m_boundContextCount(0),
m_internalContextCountPlusOne(1),
m_nextSchedulingRingIndex(0),
m_contextIdCounter(-1),
m_scheduleGroupIdCounter(-1),
m_vprocShutdownGate(0),
m_activeVProcCount(0),
m_initialReference(0),
m_enqueuedTaskCounter(0),
m_dequeuedTaskCounter(0),
m_enqueuedTaskCheckpoint(0),
m_dequeuedTaskCheckpoint(0),
m_pResourceManager(NULL),
m_pSchedulerResourceManagement(NULL),
m_externalThreadStatistics(NULL, 256, ListArray<ExternalStatistics>::DeletionThresholdInfinite),
m_safePointDataVersion(0),
m_safePointCommitVersion(0),
m_safePointPendingVersion(0)
{
m_schedulerKind = (::Concurrency::SchedulerType) policy.GetPolicyValue(::Concurrency::SchedulerKind);
m_localContextCacheSize = (unsigned short) policy.GetPolicyValue(::Concurrency::LocalContextCacheSize);
m_schedulingProtocol = (::Concurrency::SchedulingProtocolType) policy.GetPolicyValue(::Concurrency::SchedulingProtocol);
//
// This is a count before which we will **NOT** perform any throttling. In the event of repeated latent blocking, we will reach
// this number of threads rapidly. By default, we choose this number to be 4x the number of cores. If a client has specified a
// MinConcurrency value that implies a greater number of vprocs than this, we will adjust the throttling limit upwards to
// MinConcurrency. This may result in poorer overall throttling performance; however -- one would expect that most clients aren't
// requesting > 4x oversubscription.
//
m_threadsBeforeThrottling = max(::Concurrency::GetProcessorCount() * 4, policy.GetPolicyValue(::Concurrency::MinConcurrency));
// Allocate a TLS slot to track statistics for threads alien to this scheduler
m_dwExternalStatisticsIndex = TlsAlloc();
if (m_dwExternalStatisticsIndex == TLS_OUT_OF_INDEXES)
{
throw scheduler_resource_allocation_error(HRESULT_FROM_WIN32(GetLastError()));
}
}
SchedulerBase::~SchedulerBase()
{
Cleanup();
}
void SchedulerBase::Cleanup()
{
for (int idx = 0; idx < m_nodeCount; ++idx)
delete m_nodes[idx];
for (int idx = 0; idx < m_nodeCount; ++idx)
delete m_rings[idx];
delete [] m_nodes;
delete [] m_rings;
// Cleanup a TLS slot and allow a reuse
TlsFree(m_dwExternalStatisticsIndex);
m_dwExternalStatisticsIndex = 0;
m_pResourceManager->Release();
SchedulerBase::StaticDestruction();
}
// race is fine -- only for inputting work
SchedulingRing *SchedulerBase::GetNextSchedulingRing()
{
SchedulingRing *pRing = m_rings[m_nextSchedulingRingIndex];
ASSERT(pRing != NULL);
m_nextSchedulingRingIndex = GetNextValidSchedulingRingIndex(m_nextSchedulingRingIndex);
return pRing;
}
int SchedulerBase::GetValidSchedulingRingIndex(int idx)
{
ASSERT(idx >= 0 && idx <= m_nodeCount);
if (m_rings[idx] == NULL)
return GetNextValidSchedulingRingIndex(idx);
return idx;
}
int SchedulerBase::GetNextValidSchedulingRingIndex(int idx)
{
ASSERT(idx >= 0 && idx <= m_nodeCount);
do
{
idx = (idx+1) % m_nodeCount;
} while (m_rings[idx] == NULL);
return idx;
}
SchedulingRing *SchedulerBase::GetNextSchedulingRing(const SchedulingRing *pOwningRing, SchedulingRing *pCurrentRing)
{
ASSERT(pCurrentRing != NULL && pOwningRing != NULL);
SchedulingRing *pRing = m_rings[GetNextValidSchedulingRingIndex(pCurrentRing->Id())];
ASSERT(pRing != NULL);
if (pRing == pOwningRing)
pRing = NULL;
return pRing;
}
/// <summary>
/// Creates a scheduler instance
/// </summary>
/// <param name="policy">
/// [in] A const reference to the scheduler policy.
/// </param>
/// <returns>
/// A pointer to the new scheduler An exception is thrown if an error occurs.
/// </returns>
__ecount(1) SchedulerBase* SchedulerBase::Create(__in const SchedulerPolicy& policy)
{
SchedulerBase *pScheduler = CreateWithoutInitializing(policy);
// Obtain hardware threads, initialize virtual processors, etc.
pScheduler->Initialize();
return pScheduler;
}
/// <summary>
/// Creates a scheduler instance
/// </summary>
/// <param name="policy">
/// [in] A const pointer to the scheduler policy.
/// </param>
/// <returns>
/// A pointer to the new scheduler An exception is thrown if an error occurs.
/// </returns>
__ecount(1) SchedulerBase* SchedulerBase::CreateWithoutInitializing(__in const SchedulerPolicy& policy)
{
policy._ValidateConcRTPolicy();
CheckStaticConstruction();
::Concurrency::SchedulerType schedulerKind = (::Concurrency::SchedulerType) policy.GetPolicyValue(::Concurrency::SchedulerKind);
SchedulerBase *pScheduler = NULL;
if (schedulerKind == ::Concurrency::ThreadScheduler)
{
pScheduler = ThreadScheduler::Create(policy);
}
else
{
ASSERT(schedulerKind == ::Concurrency::UmsThreadDefault);
pScheduler = UMSThreadScheduler::Create(policy);
}
ASSERT(pScheduler != NULL);
return pScheduler;
}
/// <summary>
/// Generates a unique identifier for a context.
/// </summary>
unsigned int SchedulerBase::GetNewContextId()
{
return (unsigned int) InterlockedIncrement(&m_contextIdCounter);
}
/// <summary>
/// Generates a unique identifier for a schedule group.
/// </summary>
unsigned int SchedulerBase::GetNewScheduleGroupId()
{
return (unsigned int) InterlockedIncrement(&m_scheduleGroupIdCounter);
}
/// <summary>
/// Generates a unique identifier for a work queue (across scheduler instances in the process).
/// </summary>
unsigned int SchedulerBase::GetNewWorkQueueId()
{
return (unsigned int) InterlockedIncrement(&s_workQueueIdCounter);
}
/// <summary>
/// Anything which requires a one shot pattern of initialization with no destruction until termination goes here.
/// </summary>
void SchedulerBase::OneShotStaticConstruction()
{
_SpinCount::_Initialize();
//
// The TLS indicies must be one-shot as they are used outside the domain of guaranteed scheduler presence. We cannot free them
// until process-exit/CRT-unload or we'll have races with scheduler teardown/creation and outside APIs which require the TLS indicies.
//
t_dwContextIndex = TlsAlloc();
if (t_dwContextIndex == TLS_OUT_OF_INDEXES)
{
throw scheduler_resource_allocation_error(HRESULT_FROM_WIN32(GetLastError()));
}
UMSThreadScheduler::OneShotStaticConstruction();
}
/// <summary>
/// Anything which requires a pattern of demand initialization upon first scheduler creation and destruction upon last
/// scheduler destruction goes here.
/// </summary>
void SchedulerBase::StaticConstruction()
{
}
/// <summary>
/// Called to ensure static construction is performed upon creation of a scheduler.
/// </summary>
void SchedulerBase::CheckStaticConstruction()
{
_StaticLock::_Scoped_lock lockHolder(s_schedulerLock);
if (InterlockedIncrement(&s_initializedCount) == 1)
{
//
// all static initialization here
//
StaticConstruction();
if ((s_oneShotInitializationState & ONESHOT_INITIALIZED_FLAG) == 0)
{
OneShotStaticConstruction();
//
// This both guarantees a full fence and protects against simultaneous manipulation of the reference count stored within the lower
// 31 bits of s_oneShotInitializationState.
//
InterlockedOr(&s_oneShotInitializationState, ONESHOT_INITIALIZED_FLAG);
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -