📄 schedulerproxy.cpp
字号:
m_pScheduler->NotifyResourcesExternallyBusy(pRoots, numThreadsIndex);
}
else
{
m_pScheduler->NotifyResourcesExternallyIdle(pRoots, numThreadsIndex);
}
} // end locked region
if (pRoots!= pRootArray)
{
delete [] pRoots;
}
}
/// <summary>
/// Removes a root from the scheduler proxy and destroys it. This API is called in response to a scheduler
/// informing the RM that it is done with a virtual processor root.
/// </summary>
void SchedulerProxy::DestroyVirtualProcessorRoot(VirtualProcessorRoot * pRoot)
{
// Synchronize with other concurrent calls that are adding/removing virtual processor roots.
{ // begin locked region
_ReentrantBlockingLock::_Scoped_lock lock(m_lock);
SchedulerNode * pNode = &m_pAllocatedNodes[pRoot->GetNodeId()];
ASSERT(pNode->m_id == pRoot->GetNodeId());
// NOTE: This API is called in response to a scheduler being done with a virtual processor root.
// The scheduler is expected not to invoke ISchedulerProxy::Shutdown, which destroys
// all remaining roots in the proxy, until all individual calls for removing virtual processor
// roots have completed.
pNode->m_pCores[pRoot->GetCoreIndex()].m_resources.Remove(pRoot->GetExecutionResource());
if (!pRoot->IsOversubscribed())
{
// Oversubscibed vprocs do not contribute towards concurrency level
ASSERT(m_currentConcurrency > 0);
--m_currentConcurrency;
}
} // end locked region
pRoot->DeleteThis();
}
/// <summary>
/// Removes an execution resource from the scheduler proxy, and destroys it. This API is called in response to a scheduler
/// informing the RM that it is done with an execution resource.
/// </summary>
void SchedulerProxy::DestroyExecutionResource(ExecutionResource * pExecutionResource)
{
// NOTE: This function should be called with the RM lock held.
SchedulerNode * pNode = &m_pAllocatedNodes[pExecutionResource->GetNodeId()];
SchedulerCore * pCore = &pNode->m_pCores[pExecutionResource->GetCoreIndex()];
ASSERT(pNode->m_id == pExecutionResource->GetNodeId());
// Mark this core as available to others if this was the last resource on it
// If this is the last running resource on this core then mark it as available again
if (pCore->m_numAssignedThreads + pCore->m_numExternalThreads == 0)
{
// If there are no vprocs or external threads, then core cannot be fixed
ASSERT(!pCore->IsFixed());
ASSERT(pNode->m_allocatedCores > 0 && pNode->m_allocatedCores <= pNode->m_coreCount);
pNode->m_allocatedCores--;
ASSERT(m_numAllocatedCores > MinHWThreads());
pCore->m_coreState = ProcessorCore::Available;
m_numAllocatedCores--;
ASSERT(m_numAllocatedCores <= DesiredHWThreads());
m_pResourceManager->DecrementCoreUseCount(pExecutionResource->GetNodeId(), pExecutionResource->GetCoreIndex());
}
// Synchronize with other concurrent calls that are adding/removing execution resources.
{ // begin locked region
_ReentrantBlockingLock::_Scoped_lock lock(m_lock);
pCore->m_resources.Remove(pExecutionResource);
} // end locked region
delete pExecutionResource;
}
/// <summary>
/// Called to assist dynamic resourcemanagement in determining whether cores assigned to schedulers
/// are idle. An idle core is one whose subscription level is 0.
/// </summary>
void SchedulerProxy::IncrementCoreSubscription(ExecutionResource * pExecutionResource)
{
unsigned int nodeId = pExecutionResource->GetNodeId();
unsigned int coreIndex = pExecutionResource->GetCoreIndex();
if ((InterlockedIncrement(&m_pAllocatedNodes[nodeId].m_pCores[coreIndex].m_subscriptionLevel) == 1) &&
(m_pResourceManager->GetNumSchedulersForNotifications() > (ShouldReceiveNotifications() ? 1UL : 0UL)))
{
// We've incremented the local subscription from 0 to 1 -> this may warrant notifications.
// Note that the number of schedulers needing notifications may change right after we read it, but any
// missed notifications will be sent at the next Dynamic RM Poll.
// We simply set the dynamic RM event here. Note -> there may not yet be a dynamic RM thread at this point.
// We clearly have 2 schedulers, but it could be that the second one is just being created. In that case,
// notifications will be sent when the dynamic RM starts up (right after the second scheduler has finished
// receiving all its resources). We may even race with shutdown for the penultimate scheduler. If the DRM
// thread wakes up and there is only one scheduler left, it will go back to waiting.
m_pResourceManager->WakeupDynamicRMWorker();
}
}
/// <summary>
/// Called to assist dynamic resourcemanagement in determining whether cores assigned to schedulers
/// are idle. An idle core is one whose subscription level is 0.
/// </summary>
void SchedulerProxy::DecrementCoreSubscription(ExecutionResource * pExecutionResource)
{
unsigned int nodeId = pExecutionResource->GetNodeId();
unsigned int coreIndex = pExecutionResource->GetCoreIndex();
if ((InterlockedDecrement(&m_pAllocatedNodes[nodeId].m_pCores[coreIndex].m_subscriptionLevel) == 0) &&
(m_pResourceManager->GetNumSchedulersForNotifications() > (ShouldReceiveNotifications() ? 1UL : 0UL)))
{
// We've decremented the local subscription from 1 to 0 -> this may warrant notifications.
// Note that the number of schedulers needing notifications may change right after we read it, but any
// missed notifications will be sent at the next Dynamic RM Poll.
// We simply set the dynamic RM event here. Note -> there may not yet be a dynamic RM thread at this point.
// We clearly have 2 schedulers, but it could be that the second one is just being created. In that case,
// notifications will be sent when the dynamic RM starts up (right after the second scheduler has finished
// receiving all its resources). We may even race with shutdown for the penultimate scheduler. If the DRM
// thread wakes up and there is only one scheduler left, it will go back to waiting.
m_pResourceManager->WakeupDynamicRMWorker();
}
}
/// <summary>
/// Called to adjust the suggested allocation such that we do not exceed maxConcurrency.
/// This routine takes into account vprocs that are marked for removal but haven't yet been
/// retired by the scheduler. The suggested allocation would be decreased to account for such
/// vprocs.
/// </summary>
unsigned int SchedulerProxy::AdjustAllocationIncrease(unsigned int suggestedAllocation) const
{
ASSERT(suggestedAllocation >= GetNumAllocatedCores());
ASSERT(suggestedAllocation <= DesiredHWThreads());
// Figure out the max number of new cores we can add
unsigned int newCores = 0;
// Since we could be not holding the scheduler proxy lock the value in m_currentConcurrency could
// be changing. This is fine since a later DRM sweep will migrate appropriate number of cores.
if (m_maxConcurrency > m_currentConcurrency)
{
unsigned int remainingConcurrency = m_maxConcurrency - m_currentConcurrency;
remainingConcurrency = m_maxConcurrency - m_currentConcurrency;
// Convert remaining concurrency to number of cores
unsigned int fullySubscribedConcurrency = m_numFullySubscribedCores * m_targetOversubscriptionFactor;
if (fullySubscribedConcurrency >= remainingConcurrency)
{
newCores = remainingConcurrency / m_targetOversubscriptionFactor;
}
else
{
ASSERT(m_targetOversubscriptionFactor > 1);
newCores = m_numFullySubscribedCores;
newCores += ((remainingConcurrency - fullySubscribedConcurrency) / (m_targetOversubscriptionFactor - 1));
}
}
unsigned int maxAllocation = (GetNumAllocatedCores() + newCores);
// Cores used exclusively by external threads are included in numAllocatedCores. As a result
// maxAllocation could go above desired.
maxAllocation = min(maxAllocation, DesiredHWThreads());
#if defined(CONCRT_TRACING)
if (maxAllocation < suggestedAllocation)
{
TRACE(CONCRT_TRACE_DYNAMIC_RM, L"Scheduler %d: Allocated: %d, Suggested: %d, Adjusted Suggested: %d",
GetId(), GetNumAllocatedCores(), suggestedAllocation, maxAllocation);
}
#endif
return min(maxAllocation, suggestedAllocation);
}
SchedulerProxy::~SchedulerProxy()
{
//
// Clean up anything which might be used during the asynchronous delete.
//
for (unsigned int i = 0; i < m_nodeCount; ++i)
{
m_pAllocatedNodes[i].Cleanup();
}
delete [] m_pAllocatedNodes;
delete [] m_pSortedNodeOrder;
#if defined(CONCRT_TRACING)
delete [] m_drmInitialState;
#endif
//
// Release the reference on the Resource manager
//
m_pResourceManager->Release();
}
/// <summary>
/// Called to shutdown a scheduler proxy. Derived classes can override shutdown behavior based on this.
/// </summary>
void SchedulerProxy::FinalShutdown()
{
Cleanup();
DeleteThis();
}
/// <summary>
/// Cleans up resources associated with the scheduler.
/// </summary>
void SchedulerProxy::Cleanup()
{
//
// Delete vproc roots that exist in the allocated nodes at this time. The deletion here is a notification. It may happen asynchronously
// depending on the type of scheduler proxy. The data structures maintained for the scheduler proxy cannot go away until the deferred
// deletion happens.
//
for (unsigned int i = 0; i < m_nodeCount; ++i)
{
SchedulerNode * pNode = &m_pAllocatedNodes[i];
for (unsigned int j = 0; j < pNode->m_coreCount; ++j)
{
ExecutionResource * pExecutionResource = pNode->m_pCores[j].m_resources.First();
while (pExecutionResource != NULL)
{
ExecutionResource * pExecutionResourceToDelete = pExecutionResource;
pExecutionResource = pNode->m_pCores[j].m_resources.Next(pExecutionResource);
VirtualProcessorRoot * pVPRoot = pExecutionResourceToDelete->GetVirtualProcessorRoot();
ASSERT(pVPRoot != NULL);
// Since the root is going away, check if it contributes to the subscription count on the core, and
// fix up the count, if so.
pVPRoot->ResetSubscriptionLevel();
pVPRoot->DeleteThis();
}
}
}
delete m_pHillClimbing;
}
#if defined(CONCRT_TRACING)
/// <summary>
/// Sets or clears a flag indicating that the RM needs to do an external thread allocation for this
/// scheduler proxy.
/// </summary>
void SchedulerProxy::TraceInitialDRMState()
{
int traceCoreIndex = 0;
for (unsigned int nodeIndex = 0; nodeIndex < m_nodeCount; ++nodeIndex)
{
SchedulerNode * pAllocatedNode = &m_pAllocatedNodes[nodeIndex];
for (unsigned int coreIndex = 0; coreIndex < pAllocatedNode->m_coreCount; ++coreIndex)
{
SchedulerCore * pAllocatedCore = &pAllocatedNode->m_pCores[coreIndex];
SchedulerCoreData * pCoreData = &m_drmInitialState[traceCoreIndex++];
pCoreData->m_nodeIndex = (unsigned char)nodeIndex;
pCoreData->m_coreIndex = (unsigned char)coreIndex;
pCoreData->m_fAllocated = pAllocatedCore->m_coreState == ProcessorCore::Allocated;
pCoreData->m_fFixed = pAllocatedCore->IsFixed();
pCoreData->m_fBorrowed = pAllocatedCore->IsBorrowed();
pCoreData->m_fIdle = pAllocatedCore->IsIdle();
}
}
}
/// <summary>
/// Dumps the allocation, for this scheduler proxy.
/// </summary>
void SchedulerProxy::DumpAllocations()
{
printf("\nProxy(%p) - Scheduler(id=%d,allocated=%d,min=%d/%d,max=%d/%d,mult=%d,pri=%d,stk=%d)\n", this,
m_id, m_numAllocatedCores, m_minimumHardwareThreads, m_minConcurrency, m_desiredHardwareThreads,
m_maxConcurrency, m_targetOversubscriptionFactor, m_contextPriority, m_contextStackSize);
if (m_pAllocatedNodes != NULL)
{
for (unsigned int i = 0; i < m_nodeCount; ++i)
{
SchedulerNode * pNode = &m_pAllocatedNodes[i];
printf("node=%Ix,count=%d/%d,group=%d\n", pNode->m_id, pNode->m_allocatedCores, pNode->m_coreCount, pNode->m_processorGroup);
for (unsigned int j = 0; j < pNode->m_coreCount; ++j)
{
printf("allocated=%d,prcNumber=%d\n", pNode->m_pCores[j].m_coreState, pNode->m_pCores[j].m_processorNumber);
ExecutionResource *pExecutionResource = pNode->m_pCores[j].m_resources.First();
for (; pExecutionResource != NULL; pExecutionResource = pNode->m_pCores[j].m_resources.Next(pExecutionResource))
{
unsigned int nodeId = pExecutionResource->GetNodeId();
unsigned int maskId = pExecutionResource->GetExecutionResourceId();
printf("\tExecution Resource(nodeId=%d, maskId=%d, isVproc=%d)\n", nodeId, maskId, pExecutionResource->GetVirtualProcessorRoot() != NULL);
}
}
}
}
}
#endif
} // namespace details
} // namespace Concurrency
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -