📄 schedulerproxy.cpp
字号:
while (numVprocs-- > 0)
{
vprocArray[vprocIndex++] = CreateVirtualProcessorRoot(pNode, coreIndex);
}
ASSERT(vprocIndex <= vprocCount);
}
else
{
ASSERT(pCore->m_coreState == ProcessorCore::Available);
}
}
}
}
ASSERT(vprocIndex == vprocCount);
if (vprocCount > 0)
{
AddVirtualProcessorRoots(vprocArray, vprocCount);
}
#if defined(CONCRT_TRACING)
m_numTotalCores = m_nodeCount * m_pAllocatedNodes[0].m_coreCount;
m_drmInitialState = new SchedulerCoreData[m_numTotalCores];
memset(m_drmInitialState, 0, sizeof(SchedulerCoreData) * m_numTotalCores);
#endif
delete [] vprocArray;
return pExecutionResource;
}
/// <summary>
/// Causes the resource manager to create a new virtual processor root running atop the same hardware thread as this
/// execution resource. Typically, this is used when a scheduler wishes to oversubscribe a particular hardware thread
/// for a limited amount of time.
/// </summary>
/// <param name="pExecutionResource">
/// The execution resource abstraction on which to oversubscribe.
/// </param>
/// <returns>
/// A new virtual processor root running atop the same hardware thread as this execution resource.
/// </returns>
IVirtualProcessorRoot * SchedulerProxy::CreateOversubscriber(IExecutionResource * pExecutionResource)
{
// The scheduler proxy on the virtual processor root has to match 'this'
VirtualProcessorRoot * pOversubscribedRoot = NULL;
ExecutionResource * pResource = dynamic_cast<ExecutionResource *>(pExecutionResource);
bool isVprocRoot = false;
// If dynamic cast failed then we must have a virtual processor root.
if (pResource == NULL)
{
pResource = static_cast<VirtualProcessorRoot *>(pExecutionResource)->GetExecutionResource();
isVprocRoot = true;
}
// Cannot verify the scheduler proxy for external threads because they can "live" on
// multiple schedulers at the same time (nested).
if (isVprocRoot && pResource->GetSchedulerProxy() != this)
{
throw std::invalid_argument("pExecutionResource");
}
// Synchronize with other concurrent calls that are adding/removing virtual processor roots.
{
_ReentrantBlockingLock::_Scoped_lock lock(m_lock);
// Use the scheduler proxy to clone this virtual processor root.
SchedulerNode * pNode = &m_pAllocatedNodes[pResource->GetNodeId()];
unsigned int coreIndex = pResource->GetCoreIndex();
pOversubscribedRoot = CreateVirtualProcessorRoot(pNode, coreIndex);
// We mark these vproc roots as oversubscribed to indicate that they do not contribute
// towards concurrency levels bounded by the policy
pOversubscribedRoot->MarkAsOversubscribed();
pNode->m_pCores[coreIndex].m_resources.AddTail(pOversubscribedRoot->GetExecutionResource());
}
return pOversubscribedRoot;
}
/// <summary>
/// Creates a virtual processor root and adds it to the scheduler proxys list of roots.
/// </summary>
VirtualProcessorRoot * SchedulerProxy::CreateVirtualProcessorRoot(SchedulerNode * pNode, unsigned int coreIndex)
{
return new FreeVirtualProcessorRoot(this, pNode, coreIndex);
}
/// <summary>
/// Notifies the scheduler associated with this proxy to adds the virtual processor roots provided.
/// Called by the RM during initial allocation and dynamic core migration.
/// </summary>
void SchedulerProxy::AddVirtualProcessorRoots(IVirtualProcessorRoot ** vprocRoots, unsigned int count)
{
// Note, that we are holding the global RM allocation lock when this API is called.
{
_ReentrantBlockingLock::_Scoped_lock lock(m_lock);
for (unsigned int i = 0; i < count; ++i)
{
VirtualProcessorRoot * pRoot = static_cast<VirtualProcessorRoot *>(vprocRoots[i]);
// Add the resources associated with the roots to the corresponding lists in the scheduler proxy.
unsigned int nodeId = pRoot->GetNodeId();
unsigned int coreIndex = pRoot->GetCoreIndex();
m_pAllocatedNodes[nodeId].m_pCores[coreIndex].m_resources.AddTail(pRoot->GetExecutionResource());
}
m_pScheduler->AddVirtualProcessors((IVirtualProcessorRoot **) vprocRoots, count);
m_currentConcurrency += count;
}
}
/// <summary>
/// Adds an execution resource to the list of resources that run on a particular core.
/// </summary>
void SchedulerProxy::AddExecutionResource(ExecutionResource * pExecutionResource)
{
{
_ReentrantBlockingLock::_Scoped_lock lock(m_lock);
// Add the resource to the corresponding list in the scheduler proxy.
unsigned int nodeId = pExecutionResource->GetNodeId();
unsigned int coreIndex = pExecutionResource->GetCoreIndex();
m_pAllocatedNodes[nodeId].m_pCores[coreIndex].m_resources.AddTail(pExecutionResource);
}
}
/// <summary>
/// Toggles the state on a core from borrowed to owned (and vice versa), and updates necessary counts.
/// </summary>
void SchedulerProxy::ToggleBorrowedState(SchedulerNode * pNode, unsigned int coreIndex)
{
SchedulerCore * pCore = &pNode->m_pCores[coreIndex];
if (pCore->m_fBorrowed)
{
--m_numBorrowedCores;
--pNode->m_numBorrowedCores;
pCore->m_fBorrowed = false;
}
else
{
++m_numBorrowedCores;
++pNode->m_numBorrowedCores;
pCore->m_fBorrowed = true;
}
}
/// <summary>
/// Adds an appropriate number of virtual processor roots to the scheduler associated with this proxy.
/// Called by the RM during core migration when the RM decides to give this scheduler an additional
/// core.
/// </summary>
void SchedulerProxy::AddCore(SchedulerNode * pNode, unsigned int coreIndex, bool fBorrowed)
{
// Note, that we are holding the global RM allocation lock when this API is called.
// Decide how many virtual processors to give the scheduler on this core. Note that this value is required
// to be either m_tof or m_tof - 1.
unsigned int numThreads = 0;
if (m_numFullySubscribedCores > 0)
{
numThreads = m_targetOversubscriptionFactor;
--m_numFullySubscribedCores;
}
else
{
numThreads = m_targetOversubscriptionFactor - 1;
}
ASSERT(numThreads > 0 && numThreads <= INT_MAX);
ASSERT(pNode->m_allocatedCores < pNode->m_coreCount);
++pNode->m_allocatedCores;
ASSERT(m_numAllocatedCores < DesiredHWThreads());
++m_numAllocatedCores;
SchedulerCore * pCore = &pNode->m_pCores[coreIndex];
ASSERT(pCore->m_coreState == ProcessorCore::Available);
pCore->m_coreState = ProcessorCore::Allocated;
ASSERT(pCore->m_numAssignedThreads == 0);
pCore->m_numAssignedThreads = numThreads;
m_numAssignedThreads += pCore->m_numAssignedThreads;
ASSERT(m_numAssignedThreads <= m_maxConcurrency);
if (fBorrowed)
{
ASSERT(!pCore->IsBorrowed());
ToggleBorrowedState(pNode, coreIndex);
}
// Special case for when there is 1 vproc per core - this is likely to be the common case.
IVirtualProcessorRoot * pRoot;
IVirtualProcessorRoot ** pRootArray = (numThreads == 1) ? &pRoot : new IVirtualProcessorRoot *[numThreads];
for (unsigned int i = 0; i < numThreads; ++i)
{
pRootArray[i] = CreateVirtualProcessorRoot(pNode, coreIndex);
}
AddVirtualProcessorRoots(pRootArray, numThreads);
if (pRootArray != &pRoot)
{
delete [] pRootArray;
}
}
/// <summary>
/// Notifies the scheduler associated with this proxy to remove the virtual processor roots associated
/// with the core provided. Called by the RM during core migration.
/// </summary>
void SchedulerProxy::RemoveCore(SchedulerNode * pNode, unsigned int coreIndex)
{
// Note, that we are holding the global RM allocation lock when this API is called.
ASSERT(pNode->m_allocatedCores > 0 && pNode->m_allocatedCores <= pNode->m_coreCount);
--pNode->m_allocatedCores;
ASSERT(m_numAllocatedCores > MinVprocHWThreads());
--m_numAllocatedCores;
SchedulerCore * pCore = &pNode->m_pCores[coreIndex];
ASSERT(pCore->m_coreState == ProcessorCore::Allocated || pCore->m_coreState == ProcessorCore::Stolen);
pCore->m_coreState = ProcessorCore::Available;
ASSERT(pCore->m_numAssignedThreads == m_targetOversubscriptionFactor ||
pCore->m_numAssignedThreads == m_targetOversubscriptionFactor - 1);
if (pCore->m_numAssignedThreads == m_targetOversubscriptionFactor)
{
++m_numFullySubscribedCores;
}
m_numAssignedThreads -= pCore->m_numAssignedThreads;
ASSERT(m_numAssignedThreads >= m_minConcurrency && m_numAssignedThreads < m_maxConcurrency);
pCore->m_numAssignedThreads = 0;
if (pCore->m_fBorrowed)
{
ToggleBorrowedState(pNode, coreIndex);
}
pCore->m_fIdleDuringDRM = false;
// A lock is required around the iteration of nodes and the call to AddVirtualProcessors to synchronize with concurrent
// calls to DestroyVirtualProcessorRoot, which removes roots from the array and deletes them.
{ // begin locked region
_ReentrantBlockingLock::_Scoped_lock lock(m_lock);
ExecutionResource * pExecutionResource = pCore->m_resources.First();
while (pExecutionResource != NULL)
{
// Remember the next root before hand, since a IVirtualProcessorRoot::Remove call could happen inline
// for the root we're removing, and by the time we get back, that root could be deleted.
ExecutionResource * pNextExecutionResource = pCore->m_resources.Next(pExecutionResource);
VirtualProcessorRoot * pVPRoot = pExecutionResource->GetVirtualProcessorRoot();
if (pVPRoot != NULL && !pVPRoot->IsRootRemoved())
{
pVPRoot->MarkRootRemoved();
IVirtualProcessorRoot * pIRoot = pVPRoot;
m_pScheduler->RemoveVirtualProcessors(&pIRoot, 1);
}
pExecutionResource = pNextExecutionResource;
}
} // end locked region
}
/// <summary>
/// Called by the RM to instruct this scheduler proxy to notify its scheduler that this core is now
/// externally busy or externally idle.
/// </summary>
void SchedulerProxy::SendCoreNotification(SchedulerCore * pCore, bool isBusyNotification)
{
// Avoid a memory allocation under two locks if we have less than 8 roots per core - this is expected to be
// the common case.
IVirtualProcessorRoot * pRootArray[8];
IVirtualProcessorRoot ** pRoots= NULL;
// Note, that we are holding the global RM allocation lock when this API is called.
{ // begin locked region
_ReentrantBlockingLock::_Scoped_lock lock(m_lock);
unsigned int numThreadsIndex = 0;
if (pCore->m_resources.Count() > 8)
{
pRoots = new IVirtualProcessorRoot * [pCore->m_resources.Count()];
}
else
{
pRoots = pRootArray;
}
ExecutionResource * pExecutionResource = pCore->m_resources.First();
while (pExecutionResource != NULL)
{
ExecutionResource * pNextExecutionResource = pCore->m_resources.Next(pExecutionResource);
VirtualProcessorRoot * pVPRoot = pExecutionResource->GetVirtualProcessorRoot();
if (pVPRoot != NULL && !pVPRoot->IsRootRemoved())
{
pRoots[numThreadsIndex++] = pVPRoot;
}
pExecutionResource = pNextExecutionResource;
}
ASSERT(numThreadsIndex <= (unsigned int) pCore->m_resources.Count());
// Now that the array is populated, send notifications for this core
if (isBusyNotification)
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -