📄 schedulerbase.cpp
字号:
pVirtualProcessor->GetId(), (int)pVirtualProcessor->IsAvailable());
pVirtualProcessor->StartupWorkerContext(pGroup);
}
else
{
// If the scheduler has already shutdown, it is unable to activate new virtual processors.
// The shutdown path synchronizes with VirtualProcessorActive, and fails the call if this is the case.
// Restore the virtual processor availability - this may not be required, but there is no harm in doing
// it here, and this will prevent errors in the future if we assume that all !available virtual processors
// have worker contexts attached to them.
pVirtualProcessor->MakeAvailable();
}
}
/// <summary>
/// Looks for an available virtual processor in the scheduler, and returns it.
/// </summary>
VirtualProcessor* SchedulerBase::FindAvailableVirtualProcessor(VirtualProcessor *pBias)
{
// Direct or indirect callers of this API MUST check that that the available virtual processor count in the scheduler
// is non-zero before calling the API. We avoid putting that check here since it would evaluate to false
// most of the time, and it saves the function call overhead on fast paths (chore push)
VirtualProcessor *pVProc = NULL;
SchedulingNode *pBiasNode = NULL;
//
// Bias first towards the given virtual processor, secondly to its node, and thirdly to everyone else in order.
//
if (pBias != NULL)
{
pBiasNode = pBias->GetOwningNode();
VirtualProcessor *pVProc = pBiasNode->FindAvailableVirtualProcessor(pBias);
if (pVProc != NULL)
return pVProc;
}
for (int idx = 0; idx < m_nodeCount; ++idx)
{
SchedulingNode *pNode = m_nodes[idx];
if (pNode != NULL && pNode != pBiasNode)
{
// Perform a quick check of the processor count to avoid the function call overhead
// on some fast paths (chore push operations) on a system with many nodes.
if (pNode->m_virtualProcessorAvailableCount > 0)
{
pVProc = pNode->FindAvailableVirtualProcessor();
if (pVProc != NULL)
{
break;
}
}
}
}
return pVProc;
}
/// <summary>
/// Steals a local runnable contexts from nodes in the scheduler other than the skip node provided.
/// </summary>
InternalContextBase *SchedulerBase::StealForeignLocalRunnableContext(SchedulingNode *pSkipNode)
{
ASSERT(pSkipNode != NULL);
for (int i = 0; i < m_nodeCount; ++i)
{
if (m_nodes[i] != NULL && m_nodes[i] != pSkipNode)
{
ASSERT(m_nodes[i]->m_id == i);
InternalContextBase *pContext = m_nodes[i]->StealLocalRunnableContext();
if (pContext != NULL)
return pContext;
}
}
return NULL;
}
/// <summary>
/// Determines how long in milliseconds until the next set of threads is allowed to be created.
/// </summary>
ULONG SchedulerBase::ThrottlingTime(ULONG stepWidth)
{
ULONG boundContextCount = GetNumberOfBoundContexts();
if (boundContextCount < m_threadsBeforeThrottling)
return 0;
ULONG overage = (boundContextCount - m_threadsBeforeThrottling);
//
// We can instantly shoot up to m_threadsBeforeThrottling. For all below notes, K is the stair-step width. Note that we are
// hard limited to 8K threads on Win7 UMS currently. This should have hefty slope to reach the thousands especially since this is per-scheduler
// and we can have multiple schedulers in the process!
//
// After that, the next 100 threads will take approximately (1) seconds to create. // 100 threads
// , the next 200 threads will take approximately (8) seconds to create. // 300 threads
// , the next 300 threads will take approximately (20) seconds to create. // 600 threads
// , the next 900 threads will take approximately (6.5) minutes to create. // 1.5K threads (2.5m: 600-1000)
// , the next 1000 threads will take approximately (20) minutes to create. // 2.5K threads
// , the next 1500 threads will take approximately (1.5) hours to create. // 4K threads
// , the next 4000 threads will take approximately (12) hours to create. // 8K threads
// , we run out of resources. Hopefully, we aren't repeatedly waiting on operations with multi-hour latency in a parallel loop.
//
ULONG delay = 0;
if (overage < 100)
delay = 5 + (overage / 10);
else if (overage < 300)
delay = 15 + 0 + (overage / 8);
else if (overage < 600)
delay = 53 + 7 + (overage / 5);
else if (overage < 1500)
delay = 180 + 0 + (overage / 4);
else if (overage < 2500)
delay = 555 + 0 + (overage / 3);
else if (overage < 4000)
delay = 1388 + 1112 + (overage / 3);
else
delay = 3833 + 4367 + (overage / 2);
return (delay * stepWidth);
}
/// <summary>
/// Acquires a new internal context of the appropriate type and returns it. This can come from either
/// a free list within the scheduler, or be one newly allocated from the heap.
/// </summary>
InternalContextBase *SchedulerBase::GetInternalContext()
{
InternalContextBase *pContext = m_internalContextPool.Pop();
if (pContext == NULL)
{
pContext = CreateInternalContext();
AddContext(pContext);
// The internal reference count on the scheduler *must* be incremented by the creator of the context.
// The reference count will be released when the context is canceled. If the context is executing on
// a thread proxy at the time it is canceled, it will decrement its own reference count before
// leaving the dispatch loop for the final time. If it is on the idle pool - the thread/context
// that cancels it is responsible for decrementing the reference count.
IncrementInternalContextCount();
}
//
// IMPORTANT NOTE: It is possible that there is a thread proxy still in the process of executing this
// context's dispatch loop. This is because on going idle, contexts add themselves to the idle pool,
// and proceed to leave their dispatch loops - they could be picked up and re-initialized before they
// have actually left the routine.
//
// We must be careful *not* to re-initialize any variables the context uses after the point at which it
// adds itself to the idle list, until the time it has left the dispatch routine.
//
CORE_ASSERT(pContext != NULL);
//
// Note also that there are other fields which need to be touched until m_blockedState is set. When we reuse a context, we must spin until
// that bit is set. Newly created contexts are started blocked.
//
pContext->SpinUntilBlocked();
// Context shall be marked as idle.
CORE_ASSERT(pContext->IsIdle());
// Bind a thread proxy to the context
pContext->m_pThreadProxy = NULL;
InterlockedIncrement(&m_boundContextCount);
GetSchedulerProxy()->BindContext(pContext);
//__faststorefence();
CMTRACE(MTRACE_EVT_CONTEXT_ACQUIRED, pContext, NULL, NULL);
#if defined(_DEBUG)
pContext->NotifyAcquired();
#endif // _DEBUG
pContext->ClearCriticalRegion();
return pContext;
}
/// <summary>
/// Enqueues a context into m_allContexts
/// </summary>
void SchedulerBase::AddContext(InternalContextBase * pContext)
{
ContextNode *pNode = new ContextNode(pContext);
m_allContexts.Push(pNode);
}
///<summary>
/// Releases an internal context of the appropriate to the scheduler's idle pool.
///</summary>
void SchedulerBase::ReleaseInternalContext(InternalContextBase *pContext)
{
#if defined(_DEBUG)
pContext->m_fEverRecycled = true;
pContext->SetDebugBits(CTX_DEBUGBIT_RELEASED);
#endif // _DEBUG
InterlockedDecrement(&m_boundContextCount);
// Context shall be marked as idle.
CORE_ASSERT(pContext->IsIdle());
// We keep all contexts created by the scheduler. Deleting/canceling these contexts would required us to
// remove it from the list of 'all contexts' (m_allContexts), which we use during finalization to detect
// blocked contexts, and would require an additional level of synchronization there. Since idle contexts
// do not have backing thread proxies, this is not a problem.
m_internalContextPool.Push(pContext);
}
/// <summary>
/// Gets a realized chore from the idle pool, creating a new one if the idle pool is empty.
/// </summary>
RealizedChore * SchedulerBase::GetRealizedChore(TaskProc pFunction, void * pParameters)
{
RealizedChore * pChore = m_realizedChorePool.Pop();
if (pChore == NULL)
{
pChore = new RealizedChore(pFunction, pParameters);
}
else
{
pChore->Initialize(pFunction, pParameters);
}
return pChore;
}
///<summary>
/// Releases an external context of the to the scheduler's idle pool, destroying it if the idle pool is full.
///</summary>
void SchedulerBase::ReleaseRealizedChore(RealizedChore * pChore)
{
// Try to maintain the max size of the free pool somewhere close to num vprocs * 32. It is
// not an exact upper limit.
if (m_realizedChorePool.Count() < (m_virtualProcessorCount << 5))
{
m_realizedChorePool.Push(pChore);
}
else
{
delete pChore;
}
}
/// <summary>
/// References the anonymous schedule group, creating it if it doesn't exists, and returns a pointer to it.
/// </summary>
ScheduleGroupBase* SchedulerBase::GetAnonymousScheduleGroup()
{
ContextBase *pContext = FastCurrentContext();
if (pContext == NULL || pContext->IsExternal() || pContext->GetScheduler() != this)
{
return GetNextSchedulingRing()->GetAnonymousScheduleGroup();
}
else
{
// The current context is an internal context on the 'this' scheduler.
InternalContextBase * pInternalContext = static_cast<InternalContextBase*> (pContext);
return pInternalContext->GetScheduleGroup()->GetSchedulingRing()->GetAnonymousScheduleGroup();
}
}
/// <returns>
/// Returns a copy of the policy this scheduler is using. No error state.
/// </returns>
SchedulerPolicy SchedulerBase::GetPolicy() const
{
return m_policy;
}
/// <summary>
/// Increments a reference count to this scheduler to manage lifetimes over composition.
/// This reference count is known as the scheduler reference count.
/// </summary>
/// <returns>
/// The resulting reference count is returned. No error state.
/// </returns>
unsigned int SchedulerBase::Reference()
{
ASSERT(m_internalContextCountPlusOne > 0);
LONG val = InterlockedIncrement(&m_refCount);
if (val == 1)
{
//
// This could be an initial reference upon the scheduler from a creation path or it could be
// the case that an unblocked context from a scheduler decided to attach the scheduler somewhere.
// In the second case, we need to resurrect the scheduler and halt the shutdown attempt.
//
if (m_initialReference > 0)
{
//
// This should only come from an **INTERNAL** context on this scheduler; otherwise, the client is
// being very bad and racing with shutdown for a nice big crash.
//
ContextBase* pCurrentContext = SchedulerBase::FastCurrentContext();
if ((pCurrentContext == NULL ) || (pCurrentContext->IsExternal()) || (pCurrentContext->GetScheduler() != this))
{
throw improper_scheduler_reference();
}
Resurrect();
}
else
{
InterlockedExchange(&m_initialReference, 1);
}
}
return (unsigned int)val;
}
/// <summary>
/// Decrements this scheduler抯 reference count to manage lifetimes over composition.
/// A scheduler starts the shutdown protocol when the scheduler reference count goes to zero.
/// <summary>
unsigned int SchedulerBase::Release()
{
LONG val = InterlockedDecrement(&m_refCount);
if (val == 0)
{
PhaseOneShutdown();
}
return (unsigned int)val;
}
/// <summary>
/// Causes the OS event object 慹ventObject
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -