📄 schedulerproxy.cpp
字号:
pExecutionResource = NULL;
}
else
{
pExecutionResource->IncrementUseCounts();
}
pThreadProxy->ExitCriticalRegion();
}
}
if (pExecutionResource != NULL)
{
return GetResourceForNewSubscription(pExecutionResource);
}
return pExecutionResource;
}
/// <summary>
/// Creates or reuses an execution resource for the thread subscription
/// </summary>
ExecutionResource * SchedulerProxy::GetResourceForNewSubscription(ExecutionResource * pParentExecutionResource)
{
ExecutionResource * pExecutionResource = NULL;
if (pParentExecutionResource->GetSchedulerProxy() != this)
{
pExecutionResource = new ExecutionResource(this, pParentExecutionResource);
pExecutionResource->IncrementUseCounts();
}
else
{
pExecutionResource = pParentExecutionResource;
}
return pExecutionResource;
}
/// <summary>
/// Registers that a call to SubscribeCurrentThread has occured for this core, making this core immovable.
/// </summary>
void SchedulerProxy::IncrementFixedCoreCount(unsigned int nodeId, unsigned int coreIndex, bool isExternalThread)
{
SchedulerCore * pCore = &m_pAllocatedNodes[nodeId].m_pCores[coreIndex];
if (pCore->m_numFixedThreads++ == 0)
{
SchedulerNode * pNode = &m_pAllocatedNodes[nodeId];
pNode->m_numFixedCores++;
m_numFixedCores++;
if (pCore->IsBorrowed())
{
// When a core becomes fixed, we temporarily remove the borrowed flag on it, and restore it when it
// becomes movable again.
pCore->m_fPreviouslyBorrowed = true;
ToggleBorrowedState(pNode, coreIndex);
}
}
// Increment the external thread count on the core, which helps account for all the resources running on that core.
if (isExternalThread)
{
m_numExternalThreads++;
pCore->m_numExternalThreads++;
}
}
/// <summary>
/// Registers that a call to IExecutionResource::Release has occured, potentially freeing this core.
/// </summary>
void SchedulerProxy::DecrementFixedCoreCount(unsigned int nodeId, unsigned int coreIndex, bool isExternalThread)
{
SchedulerCore * pCore = &m_pAllocatedNodes[nodeId].m_pCores[coreIndex];
// Decrement external thread count on the core which helps account for all the resources running on that core.
if (isExternalThread)
{
ASSERT(pCore->m_numExternalThreads > 0);
pCore->m_numExternalThreads--;
m_numExternalThreads--;
}
ASSERT(pCore->m_numFixedThreads > 0);
if (--pCore->m_numFixedThreads == 0)
{
SchedulerNode * pNode = &m_pAllocatedNodes[nodeId];
ASSERT(pCore->m_numExternalThreads == 0);
m_numFixedCores--;
pNode->m_numFixedCores--;
if (pCore->m_fPreviouslyBorrowed)
{
// If this was a borrowed core convereted to fixed due to a subscription request, we restore the state
// back to borrowed, here.
ASSERT(!pCore->IsBorrowed());
ToggleBorrowedState(pNode, coreIndex);
pCore->m_fPreviouslyBorrowed = false;
}
// If this core was owned only due to an external thread being on it, then there is
// no more reason for it to be marked as such.
if (isExternalThread && m_pAllocatedNodes[nodeId].m_pCores[coreIndex].m_numAssignedThreads == 0)
{
m_numExternalThreadCores--;
}
}
}
/// <summary>
/// This API registers the current thread with the resource manager associating it with this scheduler proxy,
/// and returns an instance of IExecutionResource back to the scheduler for bookkeeping and maintenance.
/// </summary>
/// <returns>
/// The IExecutionResource instance representing current thread in the runtime.
/// </returns>
IExecutionResource * SchedulerProxy::SubscribeCurrentThread()
{
return m_pResourceManager->SubscribeCurrentThread(this);
}
/// <summary>
/// Creates a new execution resource for the external thread and registers it with the scheduler proxy.
/// </summary>
ExecutionResource * SchedulerProxy::CreateExternalThreadResource(SchedulerNode * pNode, unsigned int coreIndex)
{
ExecutionResource * pExecutionResource = new ExecutionResource(this, pNode, coreIndex);
pExecutionResource->IncrementUseCounts();
return pExecutionResource;
}
/// <summary>
/// Adds the execution resource to the list of subscribed threads
/// </summary>
void SchedulerProxy::AddThreadSubscription(ExecutionResource * pExecutionResource)
{
m_threadSubscriptions.AddTail(pExecutionResource);
}
/// <summary>
/// Removes the execution resource from the list of subscribed threads
/// </summary>
void SchedulerProxy::RemoveThreadSubscription(ExecutionResource * pExecutionResource)
{
m_threadSubscriptions.Remove(pExecutionResource);
delete pExecutionResource;
}
/// <summary>
/// Finds the core allocated by the RM on which a single subscribed external thread should run, OR
/// if doOversubscribeCore is true, find the core with the smallest use count to oversubscribe.
/// </summary>
ExecutionResource * SchedulerProxy::GrantExternalThreadAllocation(bool doOversubscribeCore)
{
unsigned int lowestUseCount = (unsigned int) -1;
unsigned int lowestUseCoreIndex = (unsigned int) -1;
SchedulerNode * pLowestUseNode = NULL;
unsigned int currentNodeIndex = (unsigned int) -1;
if (doOversubscribeCore)
{
currentNodeIndex = m_pResourceManager->GetCurrentNodeAndCore(NULL);
}
for (unsigned int nodeIndex = 0; nodeIndex < m_nodeCount; ++nodeIndex)
{
SchedulerNode * pNode = &m_pAllocatedNodes[nodeIndex];
if (pNode->m_allocatedCores > 0)
{
for(unsigned int coreIndex = 0; coreIndex < pNode->m_coreCount; ++coreIndex)
{
SchedulerCore * pCore = &pNode->m_pCores[coreIndex];
if (pCore->m_coreState == ProcessorCore::Allocated)
{
unsigned int totalUseCount = pCore->m_numAssignedThreads + pCore->m_numExternalThreads;
if (doOversubscribeCore)
{
ASSERT(totalUseCount > 0);
// If this core has the lowest use count, remember it
// If this core has the same use count as the lowest and is on the affinitized node, remember it
if (totalUseCount < lowestUseCount || (totalUseCount == lowestUseCount && nodeIndex == currentNodeIndex))
{
lowestUseCount = totalUseCount;
pLowestUseNode = pNode;
lowestUseCoreIndex = coreIndex;
}
}
else if (totalUseCount == 0)
{
m_numExternalThreadCores++;
m_numAllocatedCores++;
return CreateExternalThreadResource(pNode, coreIndex);
}
}
}
}
}
ASSERT(doOversubscribeCore);
ASSERT(pLowestUseNode != NULL);
return CreateExternalThreadResource(pLowestUseNode, lowestUseCoreIndex);
}
/// <summary>
/// Called by the RM when it is done allocating cores for the scheduler proxy. Gives the proxy
/// an array of nodes and cores.
/// </summary>
ExecutionResource * SchedulerProxy::GrantAllocation(SchedulerNode * pAllocatedNodes, unsigned int nodeCount, unsigned int numberAllocated, bool doExternalThreadAllocation)
{
ASSERT(m_pAllocatedNodes == NULL);
ASSERT(m_numAllocatedCores == 0);
ASSERT(m_numExternalThreads == 0);
m_nodeCount = nodeCount;
// The RM provides the scheduler proxy with an array of nodes and cores, with 'numberAllocated' of those cores marked
// as ProcessorCore::Allocated. These are the cores that the RM has deemed sufficient to satisfy the request of this
// scheduler proxy based on its policy values and the availability of resources.
m_pAllocatedNodes = pAllocatedNodes;
m_numAllocatedCores = numberAllocated;
m_pSortedNodeOrder = new unsigned int[m_nodeCount];
for (unsigned int i = 0; i < m_nodeCount; ++i)
{
m_pSortedNodeOrder[i] = i;
}
// Calculate the number of virtual processors we will give this scheduler based on the core allocation
// we received. Each core will be allocated either m_tof vprocs or m_tof - 1 vprocs, based on the
// desired hardware threads and the value for max concurrency.
unsigned int vprocCores = m_numAllocatedCores;
bool hasExternalThreadCore = false;
// If we have an external thread we may need to adjust the number of vprocs cores we have. Note that we use m_minimumHardwareThreads
// and not MinHWThreads() (the latter includes the external thread count).
if (doExternalThreadAllocation && vprocCores > m_minimumHardwareThreads)
{
hasExternalThreadCore = true;
vprocCores--;
}
unsigned int vprocCount = 0;
ASSERT(m_numFullySubscribedCores > 0 && m_numFullySubscribedCores <= m_desiredHardwareThreads);
if (vprocCores <= m_numFullySubscribedCores)
{
vprocCount = vprocCores * m_targetOversubscriptionFactor;
}
else
{
vprocCount = (m_numFullySubscribedCores * m_targetOversubscriptionFactor) +
((vprocCores - m_numFullySubscribedCores) * (m_targetOversubscriptionFactor - 1));
}
ASSERT(vprocCount >= m_minConcurrency && vprocCount <= m_maxConcurrency);
m_numAssignedThreads = vprocCount;
ExecutionResource * pExecutionResource = NULL;
IVirtualProcessorRoot** vprocArray = new IVirtualProcessorRoot *[vprocCount];
unsigned int vprocIndex = 0;
unsigned int coresAssigned = 0;
bool externalThreadAllocated = !doExternalThreadAllocation;
for (unsigned int nodeIndex = 0; coresAssigned < m_numAllocatedCores && nodeIndex < m_nodeCount; ++nodeIndex)
{
SchedulerNode * pNode = &m_pAllocatedNodes[nodeIndex];
if (pNode->m_allocatedCores > 0)
{
for(unsigned int coreIndex = 0; coresAssigned < m_numAllocatedCores && coreIndex < pNode->m_coreCount; ++coreIndex)
{
SchedulerCore * pCore = &pNode->m_pCores[coreIndex];
if (pCore->m_coreState == ProcessorCore::Allocated)
{
ASSERT(pCore->m_numAssignedThreads == 0 && pCore->m_numFixedThreads == 0);
++coresAssigned;
// If the external thread also needs a core, first try to put it in a node whose affinity is a superset of the hardware thread
// it is currently running on. If not, reaffinitize it.
if (!externalThreadAllocated && ((coresAssigned == m_numAllocatedCores) || (m_pResourceManager->GetCurrentNodeAndCore(NULL) == nodeIndex)))
{
// Create an execution resource and affinitize it to this node
pExecutionResource = CreateExternalThreadResource(pNode, coreIndex);
externalThreadAllocated = true;
// If this core was reserved for the external thread, there is no need to allocate any vprocs on it.
if (hasExternalThreadCore)
{
m_numExternalThreadCores++;
continue;
}
}
// Create virtual processor roots in the scheduler proxy, corresponding to the node and core we're currently looking at.
unsigned int numVprocs = 0;
if (m_numFullySubscribedCores > 0)
{
numVprocs = m_targetOversubscriptionFactor;
// As we assign m_tof threads to a core, we decrement this value. This value is also updated in
// AddCore and RemoveCore. After the scheduler proxy has been given its initial allocation
// or resources, this variable keeps track of how many out of the remaining quota of cores the
// scheduler proxy could acquire (desired - allocated) would get tof threads per core if they
// were added to the scheduler during dynamic core migration.
--m_numFullySubscribedCores;
}
else
{
numVprocs = m_targetOversubscriptionFactor - 1;
}
pCore->m_numAssignedThreads += numVprocs;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -