📄 resourcemanager.cpp
字号:
return pExecutionResource;
}
/// <summary>
/// Removes an execution resource that was created for an external thread.
/// </summary>
void ResourceManager::RemoveExecutionResource(ExecutionResource * pExecutionResource)
{
bool signalDRM = false;
{ // begin locked region
_NonReentrantBlockingLock::_Scoped_lock lock(m_lock);
SchedulerProxy * pSchedulerProxy = pExecutionResource->GetSchedulerProxy();
pExecutionResource->DecrementUseCounts();
// We have to manually redistribute available cores in the case where the DRM thread is not running.
if ((pSchedulerProxy->GetNumAllocatedCores() < pSchedulerProxy->DesiredHWThreads()) && m_numSchedulers == 1)
{
ASSERT(m_dynamicRMWorkerState == Standby);
if (!DistributeCoresToSurvivingScheduler())
{
// Retry from the background thread
signalDRM = true;
}
}
} // end locked region
if (signalDRM)
{
WakeupDynamicRMWorker();
}
}
/// <summary>
/// Called in order to notify the resource manager that the given scheduler is shutting down. This
/// will cause the resource manager to immediately reclaim all resources granted to the scheduler.
/// </summary>
void ResourceManager::Shutdown(SchedulerProxy *pProxy)
{
bool signalDRM = false;
{ // begin locked region
_NonReentrantBlockingLock::_Scoped_lock lock(m_lock);
m_schedulers.Remove(pProxy);
SchedulerNode *pAllocatedNodes = pProxy->GetAllocatedNodes();
for (unsigned int i = 0; i < m_nodeCount; ++i)
{
SchedulerNode *pAllocatedNode = &pAllocatedNodes[i];
if (pAllocatedNode->m_allocatedCores > 0)
{
GlobalNode *pGlobalNode = &m_pGlobalNodes[i];
ASSERT(pAllocatedNode->m_id == pGlobalNode->m_id);
ASSERT(pAllocatedNode->m_coreCount == pGlobalNode->m_coreCount);
for (unsigned int j = 0; j < pGlobalNode->m_coreCount; ++j)
{
if (pAllocatedNode->m_pCores[j].m_coreState == ProcessorCore::Allocated)
{
GlobalCore *pGlobalCore = &pGlobalNode->m_pCores[j];
ASSERT(pGlobalCore->m_useCount > 0);
--pGlobalCore->m_useCount;
}
}
}
}
if (pProxy->ShouldReceiveNotifications())
{
--m_numSchedulersNeedingNotifications;
}
if (--m_numSchedulers == 1)
{
// Put the dynamic RM worker thread on standby.
ASSERT(m_dynamicRMWorkerState == LoadBalance);
m_dynamicRMWorkerState = Standby;
signalDRM = true;
}
} // end locked region
if (signalDRM)
{
// Set the event outside the lock to prevent the high priority thread from having to block immediately upon starting up.
WakeupDynamicRMWorker();
}
pProxy->FinalShutdown();
}
/// <summary>
/// When the number of schedulers in the RM goes from 2 to 1, this routine is invoked to make sure the remaining scheduler
/// has its desired number of cores, before putting the dynamic RM worker on standby. It is also called when there is just
/// one scheduler with external subscribed threads that it removes -> there is a chance that this move may allow us to allocate
/// more vprocs.
/// </summary>
bool ResourceManager::DistributeCoresToSurvivingScheduler()
{
// NOTE: This routine must be called while m_lock is held.
ASSERT(m_numSchedulers <= 1);
if (!m_schedulers.Empty())
{
SchedulerProxy * pSchedulerProxy = m_schedulers.First();
ASSERT(pSchedulerProxy != NULL);
ASSERT(pSchedulerProxy->GetNumAllocatedCores() <= pSchedulerProxy->DesiredHWThreads());
ASSERT(pSchedulerProxy->GetNumBorrowedCores() <= (pSchedulerProxy->DesiredHWThreads() - pSchedulerProxy->MinHWThreads()));
// Since this is the only scheduler in the RM, we should able to satisfy its MaxConcurrency.
if (pSchedulerProxy->GetNumAllocatedCores() < pSchedulerProxy->DesiredHWThreads() ||
pSchedulerProxy->GetNumBorrowedCores() > 0)
{
unsigned int suggestedAllocation = pSchedulerProxy->AdjustAllocationIncrease(pSchedulerProxy->DesiredHWThreads());
unsigned int remainingCores = suggestedAllocation - pSchedulerProxy->GetNumAllocatedCores();
SchedulerNode * pAllocatedNodes = pSchedulerProxy->GetAllocatedNodes();
unsigned int * pSortedNodeOrder = pSchedulerProxy->GetSortedNodeOrder();
// Sort the array of nodes in the proxy by number of allocated cores, largest first, if we're allocating
// to it less cores than the total available. This is so that we pack nodes as tightly as possible.
bool sortNodes = pSchedulerProxy->DesiredHWThreads() != s_physicalProcessorCount;
for (unsigned int i = 0; i < m_nodeCount; ++i)
{
// No need to sort nodes the next time around, if there are no more cores to add.
sortNodes &= remainingCores > 0;
if (sortNodes)
{
unsigned int maxAllocationIndex = i;
SchedulerNode *pMaxNode = &pAllocatedNodes[m_pSortedNodeOrder[maxAllocationIndex]];
for (unsigned int j = i + 1; j < m_nodeCount; ++j)
{
SchedulerNode * pNode = &pAllocatedNodes[pSortedNodeOrder[j]];
if (pNode->m_allocatedCores > pMaxNode->m_allocatedCores)
{
maxAllocationIndex = j;
pMaxNode = pNode;
}
}
if (i != maxAllocationIndex)
{
// Swap the index at 'maxAllocationIndex' with the index at 'i'. The next iteration will traverse nodes starting at
// m_pSortedNodeOrder[i + i].
unsigned int tempIndex = pSortedNodeOrder[i];
pSortedNodeOrder[i] = pSortedNodeOrder[maxAllocationIndex];
pSortedNodeOrder[maxAllocationIndex] = tempIndex;
}
}
// Assign cores until the desired number of cores is reached. In addition, check if there are
// any borrowed cores and switch them to owned.
SchedulerNode * pCurrentNode = &pAllocatedNodes[m_pSortedNodeOrder[i]];
for (unsigned int coreIndex = 0; coreIndex < pCurrentNode->m_coreCount; ++coreIndex)
{
SchedulerCore * pCore = &pCurrentNode->m_pCores[coreIndex];
GlobalCore * pGlobalCore = &(m_pGlobalNodes[m_pSortedNodeOrder[i]].m_pCores[coreIndex]);
if (pCore->m_coreState == ProcessorCore::Available)
{
if (remainingCores > 0)
{
GlobalCore* pGlobalCore = &(m_pGlobalNodes[m_pSortedNodeOrder[i]].m_pCores[coreIndex]);
ASSERT(pGlobalCore->m_useCount == 0);
++pGlobalCore->m_useCount;
pSchedulerProxy->AddCore(pCurrentNode, coreIndex, false);
--remainingCores;
}
}
else
{
ASSERT(pCore->m_coreState == ProcessorCore::Allocated);
if (pCore->IsBorrowed())
{
ASSERT(pGlobalCore->m_useCount == 1);
pSchedulerProxy->ToggleBorrowedState(pCurrentNode, coreIndex);
}
}
}
}
}
if (pSchedulerProxy->ShouldReceiveNotifications())
{
SendResourceNotifications();
}
#if defined(CONCRT_TRACING)
if (pSchedulerProxy->GetNumAllocatedCores() != pSchedulerProxy->DesiredHWThreads())
{
TRACE(CONCRT_TRACE_DYNAMIC_RM, L"Surviving Scheduler %d: Allocated: %d, Desired: %d",
pSchedulerProxy->GetId(), pSchedulerProxy->GetNumAllocatedCores(), pSchedulerProxy->DesiredHWThreads());
}
#endif
return (pSchedulerProxy->GetNumAllocatedCores() == pSchedulerProxy->DesiredHWThreads());
}
return true;
}
/// <summary>
/// Denote the doubles in the input array AllocationData[*].m_scaledAllocation by: r[1],..., r[n].
/// Split r[j] into b[j] and fract[j] where b[j] is the integral floor of r[j] and fract[j] is the fraction truncated.
/// Sort the set { r[j] | j = 1,...,n } from largest fract[j] to smallest.
/// For each j = 0, 1, 2,... if fract[j] > 0, then set b[j] += 1 and pay for the cost of 1-fract[j] by rounding
/// fract[j0] -> 0 from the end (j0 = n-1, n-2,...) -- stop before j > j0. b[j] is stored in AllocationData[*].m_allocation.
/// totalAllocated is the sum of all AllocationData[*].m_scaledAllocation upon entry, which after the function call is over will
/// necessarily be equal to the sum of all AllocationData[*].m_allocation.
/// </summary>
void ResourceManager::RoundUpScaledAllocations(AllocationData **ppData, unsigned int count, unsigned int totalAllocated)
{
ASSERT(count > 1 && ppData != NULL);
double epsilon = 1e-07; // epsilon allows forgiveness of reasonable round-off errors.
#if defined(_DEBUG)
double sumScaledAllocation = 0.0;
for (unsigned int i = 0; i < count; ++i)
{
sumScaledAllocation += ppData[i]->m_scaledAllocation;
}
ASSERT(sumScaledAllocation <= totalAllocated + epsilon && sumScaledAllocation >= totalAllocated - epsilon);
#endif
double fraction = 0.0;
for (unsigned int i = 0; i < count; ++i)
{
ppData[i]->m_allocation = (unsigned int) ppData[i]->m_scaledAllocation;
ppData[i]->m_scaledAllocation -= ppData[i]->m_allocation;
}
// Sort by scaledAllocation, highest first selection sort.
for (unsigned int i = 0; i < count; ++i)
{
unsigned int maxIndex = i;
for (unsigned int j = i + 1; j < count; ++j)
{
if (ppData[j]->m_scaledAllocation > ppData[maxIndex]->m_scaledAllocation + epsilon)
{
maxIndex = j;
}
}
if (i != maxIndex)
{
AllocationData * pTemp = ppData[i];
ppData[i] = ppData[maxIndex];
ppData[maxIndex] = pTemp;
}
}
// Round up those with the largest truncation, stealing the fraction from those with the least.
for (unsigned int i = 0, j = count - 1; i < count; ++i)
{
while (fraction > epsilon)
{
if (ppData[j]->m_scaledAllocation > epsilon)
{
do
{
ASSERT(j >= 0 && j < count);
fraction -= ppData[j]->m_scaledAllocation;
ppData[j]->m_scaledAllocation = 0.0;
--j;
}
while (fraction > epsilon);
ASSERT(i <= j+1);
}
else
{
--j;
ASSERT(i <= j && j < count);
}
}
if (i <= j)
{
ASSERT(j < count);
if (ppData[i]->m_scaledAllocation > epsilon)
{
fraction += (1.0 - ppData[i]->m_scaledAllocation);
ppData[i]->m_scaledAllocation = 0.0;
ppData[i]->m_allocation += 1;
}
}
else
break;
}
ASSERT(fraction <= epsilon && fraction >= -epsilon);
#if defined(_DEBUG)
unsigned int sumAllocation = 0;
for (unsigned int i = 0; i < count; ++i)
{
sumAllocation += ppData[i]->m_allocation;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -