📄 schedulerproxy.cpp
字号:
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// SchedulerProxy.cpp
//
// RM proxy for a scheduler instance
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
#include "concrtinternal.h"
namespace Concurrency
{
namespace details
{
/// <summary>
/// Constructs a scheduler proxy.
/// </summary>
SchedulerProxy::SchedulerProxy(IScheduler * pScheduler, ResourceManager * pResourceManager, const SchedulerPolicy &policy) :
m_pResourceManager(pResourceManager),
m_pThreadProxyFactory(NULL),
m_pAllocatedNodes(NULL),
m_pSortedNodeOrder(NULL),
m_nodeCount(0),
m_numAllocatedCores(0),
m_numBorrowedCores(0),
m_numExternalThreads(0),
m_numExternalThreadCores(0),
m_numFixedCores(0),
m_queueLength(0),
m_pHillClimbing(NULL),
m_fCoresStolen(false),
m_fNeedsExternalThreadAllocation(false),
m_currentConcurrency(0)
{
ASSERT(pScheduler != NULL);
m_pScheduler = pScheduler;
m_maxConcurrency = policy.GetPolicyValue(::Concurrency::MaxConcurrency);
m_minConcurrency = policy.GetPolicyValue(::Concurrency::MinConcurrency);
m_targetOversubscriptionFactor = policy.GetPolicyValue(::Concurrency::TargetOversubscriptionFactor);
m_contextStackSize = policy.GetPolicyValue(::Concurrency::ContextStackSize);
m_contextPriority = policy.GetPolicyValue(::Concurrency::ContextPriority);
m_fDoHillClimbing = policy.GetPolicyValue(::Concurrency::DynamicProgressFeedback) == ::Concurrency::ProgressFeedbackEnabled;
if (m_contextPriority == INHERIT_THREAD_PRIORITY)
{
m_contextPriority = (char) GetThreadPriority(GetCurrentThread());
}
m_id = m_pScheduler->GetId();
ASSERT(m_id != -1);
unsigned int coreCount = m_pResourceManager->GetCoreCount();
m_physicalResourceCount = coreCount;
ASSERT(coreCount > 0 && coreCount <= INT_MAX);
ASSERT(m_minConcurrency >= 0 && m_maxConcurrency > 0 && m_maxConcurrency >= m_minConcurrency);
unsigned int originalTof = m_targetOversubscriptionFactor;
// Find the mininum target oversubscription factor required to satisfy MaxConcurrency with the cores available.
unsigned int minTof = (m_maxConcurrency + coreCount - 1)/coreCount;
if (originalTof < minTof)
{
// Adjust target oversubscription factor to ensure that we can satisfy MaxConcurrency with the cores on the system.
m_targetOversubscriptionFactor = minTof;
// The scheduler needs all the cores on the machine to satisfy max threads. Moreover we will need to oversubscribe
// more than the user indicated.
m_desiredHardwareThreads = coreCount;
}
else
{
m_desiredHardwareThreads = (m_maxConcurrency + originalTof - 1)/originalTof;
}
// Now adjust target oversubscription factor to ensure that MaxConcurrency virtual processors are evenly distributed
// over the desired number of hardware threads (i.e each core gets either m_tof vprocs or m_tof - 1 vprocs). Also
// calculate how many of the assigned cores will get m_tof vprocs.
if ((m_maxConcurrency % m_desiredHardwareThreads) == 0)
{
// This is the common case. We have a simple distribution and every allocated core will get tof vprocs.
m_targetOversubscriptionFactor = m_maxConcurrency/m_desiredHardwareThreads;
m_numFullySubscribedCores = m_desiredHardwareThreads;
m_minimumHardwareThreads = (m_minConcurrency + m_targetOversubscriptionFactor - 1)/m_targetOversubscriptionFactor;
}
else
{
// We have an uneven distribution; some cores will get tof vprocs and some will get tof - 1.
ASSERT(m_targetOversubscriptionFactor > 1);
m_targetOversubscriptionFactor = (m_maxConcurrency + m_desiredHardwareThreads - 1)/m_desiredHardwareThreads;
m_numFullySubscribedCores = m_desiredHardwareThreads - ((m_desiredHardwareThreads * m_targetOversubscriptionFactor) - m_maxConcurrency);
// Calculate min hardware threads. We need to make sure that given the way vprocs are distributed to cores
// (where some cores could get tof vprocs and some could get tof - 1 vprocs), the scheduler proxy will never go below
// min concurrency if it is left with just the minimum number of cores (and all of those cores happen to have tof -1
// vprocs assigned to them).
if (((m_desiredHardwareThreads - m_numFullySubscribedCores) * (m_targetOversubscriptionFactor - 1)) >= m_minConcurrency)
{
m_minimumHardwareThreads = (m_minConcurrency + m_targetOversubscriptionFactor - 2)/(m_targetOversubscriptionFactor - 1);
}
else
{
m_minimumHardwareThreads = (m_desiredHardwareThreads - m_numFullySubscribedCores);
unsigned int remainingThreads = (m_minConcurrency - (m_minimumHardwareThreads * (m_targetOversubscriptionFactor - 1)));
ASSERT(remainingThreads < m_minConcurrency);
m_minimumHardwareThreads += (remainingThreads + m_targetOversubscriptionFactor - 1)/m_targetOversubscriptionFactor;
}
}
ASSERT(m_maxConcurrency <= m_targetOversubscriptionFactor * m_desiredHardwareThreads);
ASSERT(m_numFullySubscribedCores <= m_desiredHardwareThreads);
ASSERT(m_targetOversubscriptionFactor > 1 || m_numFullySubscribedCores == m_desiredHardwareThreads);
ASSERT(m_targetOversubscriptionFactor > 0 && m_targetOversubscriptionFactor <= INT_MAX);
ASSERT(m_desiredHardwareThreads > 0 && m_desiredHardwareThreads <= coreCount);
ASSERT(m_minimumHardwareThreads >= 0 && m_desiredHardwareThreads > 0 && m_minimumHardwareThreads <= m_desiredHardwareThreads);
// Hold a reference to the resource manager.
int ref = m_pResourceManager->Reference();
(ref);
CORE_ASSERT(ref > 1);
if (m_fDoHillClimbing)
{
m_pHillClimbing = new HillClimbing(m_id, coreCount, this);
}
#if defined(CONCRT_TRACING)
m_drmInitialState = NULL;
#endif
}
/// <summary>
/// Called by a scheduler in order make an initial request for an allocation of virtual processors. The request
/// is driven by policies within the scheduler queried via the IScheduler::GetPolicy method. If the request
/// can be satisfied via the rules of allocation, it is communicated to the scheduler as a call to
/// IScheduler::AddVirtualProcessors.
/// </summary>
/// <param name="doSubscribeCurrentThread">
/// Whether to subscribe the current thread and account for it during resource allocation.
/// </param>
/// <returns>
/// The IExecutionResource instance representing current thread if doSubscribeCurrentThread was true; NULL otherwise.
/// </returns>
IExecutionResource * SchedulerProxy::RequestInitialVirtualProcessors(bool doSubscribeCurrentThread)
{
return m_pResourceManager->RequestInitialVirtualProcessors(this, doSubscribeCurrentThread);
}
/// <summary>
/// Called in order to notify the resource manager that the given scheduler is shutting down. This
/// will cause the resource manager to immediately reclaim all resources granted to the scheduler.
/// </summary>
void SchedulerProxy::Shutdown()
{
m_pResourceManager->Shutdown(this);
}
/// <summary>
/// Gets a new thread proxy from the factory.
/// </summary>
IThreadProxy * SchedulerProxy::GetNewThreadProxy(IExecutionContext * pContext)
{
if (m_pThreadProxyFactory == NULL)
{
// Populate the cached pointer from the one in the RM
m_pThreadProxyFactory = GetResourceManager()->GetThreadProxyFactoryManager()->GetFreeThreadProxyFactory();
}
FreeThreadProxy * pProxy = static_cast<FreeThreadProxy *>(m_pThreadProxyFactory->RequestProxy(ContextStackSize(), ContextPriority()));
pProxy->AssociateExecutionContext(pContext);
return pProxy;
}
/// <summary>
/// Ensures that a context is bound to a thread proxy. This API should *NOT* be called in the vast majority of circumstances.
/// The IThreadProxy::SwitchTo will perform late binding to thread proxies as necessary. There are, however, circumstances
/// where it is necessary to pre-bind a context to ensure that the SwitchTo operation switches to an already bound context. This
/// is the case on a UMS scheduling context as it cannot call allocation APIs.
/// </summary>
/// <param name="pContext">
/// The context to bind.
/// </param>
void SchedulerProxy::BindContext(IExecutionContext * pContext)
{
if (pContext == NULL)
{
throw std::invalid_argument("pContext");
}
// Find out if this context already has a thread proxy, if not we have to request one from the factory.
if (pContext->GetProxy() == NULL)
{
// Find a thread proxy from the pool that corresponds to the stack size and priority we need.
GetNewThreadProxy(pContext);
}
}
/// <summary>
/// Returns an **unstarted** thread proxy attached to pContext, to the thread proxy factory.
/// Such a thread proxy **must** be unstarted.
/// This API should *NOT* be called in the vast majority of circumstances.
/// </summary>
/// <param name="pContext">
/// The context to unbind.
/// </param>
void SchedulerProxy::UnbindContext(IExecutionContext * pContext)
{
if (pContext == NULL)
{
throw std::invalid_argument("pContext");
}
FreeThreadProxy * pProxy = static_cast<FreeThreadProxy *> (pContext->GetProxy());
ASSERT(pProxy != NULL);
pProxy->ReturnIdleProxy();
}
/// <summary>
/// This function retrieves the execution resource associated with this thread, if one exists
/// </summary>
/// <returns>
/// The ExecutionResource instance representing current thread in the runtime.
/// </returns>
ExecutionResource * SchedulerProxy::GetCurrentThreadExecutionResource()
{
ExecutionResource * pExecutionResource = NULL;
DWORD tlsSlot = GetResourceManager()->GetExecutionResourceTls();
void * tlsPointer = TlsGetValue(tlsSlot);
size_t tlsValue = (size_t) tlsPointer;
if ((tlsPointer != NULL) && ((tlsValue & TlsResourceBitMask) == TlsResourceInResource))
{
pExecutionResource = (ExecutionResource *) tlsValue;
}
return pExecutionResource;
}
/// <summary>
/// This function retrieves the execution resource associated with this thread, if one exists,
/// and updates the reference count on it for better bookkeeping.
/// </summary>
/// <returns>
/// The ExecutionResource instance representing current thread in the runtime.
/// </returns>
ExecutionResource * SchedulerProxy::ReferenceCurrentThreadExecutionResource()
{
ExecutionResource * pExecutionResource = NULL;
DWORD tlsSlot = GetResourceManager()->GetExecutionResourceTls();
void * tlsPointer = TlsGetValue(tlsSlot);
if (tlsPointer != NULL)
{
size_t tlsValue = (size_t) tlsPointer;
if ((tlsValue & TlsResourceBitMask) == TlsResourceInResource)
{
pExecutionResource = (ExecutionResource *) tlsValue;
VirtualProcessorRoot * pVPRoot = pExecutionResource->GetVirtualProcessorRoot();
// If this is a nested subscribe call then if there was a virtual processor root,
// it could not have been removed, because it would have been marked as "fixed".
ASSERT(pVPRoot == NULL || !pVPRoot->IsRootRemoved());
pExecutionResource->IncrementUseCounts();
}
else if ((tlsValue & TlsResourceBitMask) == TlsResourceInProxy)
{
FreeThreadProxy * pThreadProxy = (FreeThreadProxy *) (((size_t) tlsValue) & ~TlsResourceInProxy);
pExecutionResource = pThreadProxy->GetVirtualProcessorRoot()->GetExecutionResource();
VirtualProcessorRoot * pVPRoot = pExecutionResource->GetVirtualProcessorRoot();
if (pVPRoot != NULL && pVPRoot->IsRootRemoved())
{
// The virtual processor root that this thread is running on has been removed. We have to
// create a new execution resource abstraction for the current thread and perform an external
// thread allocation for this scheduler proxy.
pExecutionResource = NULL;
}
else
{
pExecutionResource->IncrementUseCounts();
}
}
else
{
ASSERT((tlsValue & TlsResourceBitMask) == TlsResourceInUMSProxy);
UMSFreeThreadProxy * pThreadProxy = (UMSFreeThreadProxy *) (((size_t) tlsValue) & ~TlsResourceInUMSProxy);
// For a UMS thread proxy we need to be in a critical region while accessing the virtual processor root, and
// until we increment a count on the execution resource, making the underlying vproc 'fixed', if it is not
// already so.
pThreadProxy->EnterCriticalRegion();
pExecutionResource = pThreadProxy->GetVirtualProcessorRoot()->GetExecutionResource();
VirtualProcessorRoot * pVPRoot = pExecutionResource->GetVirtualProcessorRoot();
if (pVPRoot != NULL && pVPRoot->IsRootRemoved())
{
// The virtual processor root that this thread is running on has been removed. We have to
// create a new execution resource abstraction for the current thread and perform an external
// thread allocation for this scheduler proxy.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -