📄 umsthreadscheduler.cpp
字号:
{
pContext->ExitHyperCriticalRegion();
}
// Defer to the base class to cancel the remaining contexts
SchedulerBase::CancelAllContexts();
}
/// <summary>
/// Create a background thread for UT creation
/// </summary>
void UMSThreadScheduler::InitializeSchedulerEventHandlers()
{
// We need to create background thread to create reserved contexts. The
// external context exit handler thread could be reused. For now, we have a
// dedicated background thread for UMS.
m_numReservedContexts = 0;
m_pendingRequests = 0;
m_fCancelContextCreationHandler = false;
// Auto reset event.
m_hCreateContext = CreateEventW(NULL, FALSE, FALSE, NULL);
// Event handlers take an internal reference on the scheduler which is released when they exit.
IncrementInternalContextCount();
HANDLE threadHandle = LoadLibraryAndCreateThread(NULL,
DEFAULTCONTEXTSTACKSIZE,
BackgroundContextCreationProc,
this,
0,
NULL);
CloseHandle(threadHandle);
// Allow base class to register any handlers if required.
SchedulerBase::InitializeSchedulerEventHandlers();
}
/// <summary>
/// Main thread procedure for the background threads
/// </summary>
DWORD CALLBACK UMSThreadScheduler::BackgroundContextCreationProc(LPVOID lpParameter)
{
UMSThreadScheduler * pScheduler = reinterpret_cast<UMSThreadScheduler*>(lpParameter);
pScheduler->WaitForContextCreationRequest();
FreeLibraryAndDestroyThread(0);
return 0;
}
/// <summary>
/// Wait loop for reserved context creation
/// </summary>
void UMSThreadScheduler::WaitForContextCreationRequest()
{
ULONG lastReplenishTime = 0;
for (;;)
{
WaitForSingleObject(m_hCreateContext, INFINITE);
if (m_fCancelContextCreationHandler)
{
ASSERT(m_pendingRequests == 0);
ASSERT(m_numReservedContexts == 0);
break;
}
//
// In order to prevent an explosion of threads in the case where we perform an operation like:
//
// parallel_for*(large_range)
// {
// latent_blocking_operation();
// }
//
// We will throttle the creation of threads here. Note that this is only a mitigation and eventually the throttling
// should become pervasive throughout the scheduler. No matter the level of throttling here, if we hit a system cap on the number
// of threads, this will still throw.
//
ULONG delay = ThrottlingTime(4);
ULONG curTime = GetTickCount();
ULONG delta = curTime - lastReplenishTime; // okay with wrap
if (delay > 0 && delta < delay)
{
Sleep(delay - delta);
}
lastReplenishTime = curTime;
ReplenishReservedContexts();
}
CloseHandle(m_hCreateContext);
// NOTE: Decrementing the internal context context count could finalize the scheduler - it is not safe to touch
// *this* after this point.
DecrementInternalContextCount();
}
/// <summary>
/// Destroy the UT creation background thread
/// </summary>
void UMSThreadScheduler::DestroySchedulerEventHandlers()
{
// Cancel the UT creation thread
m_fCancelContextCreationHandler = true;
SetEvent(m_hCreateContext);
// Allow base class to destroy any handlers it has
SchedulerBase::DestroySchedulerEventHandlers();
}
/// <summary>
/// Signal threads waiting for reserved contexts to become
/// available.
/// </summary>
void UMSThreadScheduler::SignalReservedContextsAvailable()
{
bool activated = false;
// UMS virtual processors attempt to create SFW context when
// the executing context is UMS blocked (non-cooperative blocking).
// If we are unable to create one at that time, the vproc will
// deactivate. They need to be woken up when the background thread
// manages to create SFW contexts. All such vprocs need to be woken
// up even if we do not have enough reserved contexts. Their requests
// would fail and wake up the background thread...
// Activate vprocs that are waiting for reserved contexts
for (int node = 0; node < m_nodeCount; ++node)
{
SchedulingNode * pNode = m_nodes[node];
if (pNode != NULL)
{
if (ActivateAllReservedVirtualProcessors(pNode))
{
activated = true;
}
}
}
// At least 1 vproc needs to be activated since pending requests blocks
// scheduler shutdown. Activate a vproc after clearing the pending request
// count to restart scheduler shutdown.
if (!activated)
{
StartupIdleVirtualProcessor(GetAnonymousScheduleGroup());
}
}
/// <summary>
/// Activate all vprocs that were waiting for reserved cotnexts to become
/// available.
/// </summary>
bool UMSThreadScheduler::ActivateAllReservedVirtualProcessors(SchedulingNode * pNode)
{
CORE_ASSERT(pNode != NULL);
bool activated = false;
int idx;
UMSThreadVirtualProcessor *pVirtualProcessor = static_cast<UMSThreadVirtualProcessor *>(pNode->GetFirstVirtualProcessor(&idx));
while (pVirtualProcessor != NULL)
{
if (pVirtualProcessor->IsWaitingForReservedContext() && pVirtualProcessor->ClaimExclusiveOwnership())
{
pVirtualProcessor->StartupWorkerContext(GetAnonymousScheduleGroup());
activated = true;
}
pVirtualProcessor = static_cast<UMSThreadVirtualProcessor *>(pNode->GetNextVirtualProcessor(&idx));
}
return activated;
}
/// <summary>
/// Called in order to move the completion list to the runnables lists.
/// </summary>
/// <param name="pBias">
/// Bias any awakening of virtual processors to the scheduling node that
/// pBias belongs to.
/// </param>
/// <returns>
/// Whether there was anything on the completion list when queried.
/// </returns>
bool UMSThreadScheduler::MoveCompletionListToRunnables(VirtualProcessor *pBias)
{
bool fHadItems = false;
//
// This *ABSOLUTELY* must be in a hyper-critical region. Deadlock can ensue if not. Anything outward from this
// must follow the set of rules governing a hyper-critical region.
//
ContextBase *pCurrentContext = SchedulerBase::FastCurrentContext();
if (pCurrentContext != NULL)
pCurrentContext->EnterHyperCriticalRegion();
IUMSUnblockNotification *pUnblock = m_pCompletionList->GetUnblockNotifications();
while (pUnblock != NULL)
{
fHadItems = true;
IUMSUnblockNotification *pNext = pUnblock->GetNextUnblockNotification();
UMSThreadInternalContext *pContext = static_cast<UMSThreadInternalContext *> (pUnblock->GetContext());
VCMTRACE(MTRACE_EVT_PULLEDFROMCOMPLETION, pContext, (pCurrentContext && !pCurrentContext->IsExternal()) ? static_cast<InternalContextBase *>(pCurrentContext)->m_pVirtualProcessor : NULL, pCurrentContext);
#if defined(_DEBUG)
pContext->SetDebugBits(CTX_DEBUGBIT_PULLEDFROMCOMPLETIONLIST);
#endif // _DEBUG
//
// In order to know what to do with this particular item, we need to know *why* it blocked. If the primary hasn't gotten to telling us that,
// we must spin.
//
UMSThreadInternalContext::BlockingType blockingType = pContext->SpinOnAndReturnBlockingType();
CORE_ASSERT(blockingType != UMSThreadInternalContext::BlockingNone);
//
// Make a determination of where this item goes. There are several cases here:
//
// - It might have UMS blocked during a normal critical region (e.g.: the main dispatch loop blocked on the heap lock or some
// other similar object). If the context was inside a critical region, we have special determinations to make.
//
// - It might just be a runnable.
//
switch(blockingType)
{
case UMSThreadInternalContext::BlockingCritical:
//
// This is the single special context allowed to be "inside a critical region" on the virtual processor. Signal the virtual
// processor specially.
//
VCMTRACE(MTRACE_EVT_CRITICALNOTIFY, pContext, (pCurrentContext && !pCurrentContext->IsExternal()) ? static_cast<InternalContextBase *>(pCurrentContext)->m_pVirtualProcessor : NULL, pCurrentContext);
#if defined(_DEBUG)
pContext->SetDebugBits(CTX_DEBUGBIT_CRITICALNOTIFY);
#endif // _DEBUG
pContext->m_pLastVirtualProcessor->CriticalNotify();
break;
case UMSThreadInternalContext::BlockingNormal:
//
// If it's a normal runnable, it just goes on the runnables list. We pass along the bias to indicate which virtual processor
// (or owning node) we prefer to awaken due to the addition of runnables.
//
pContext->AddToRunnables(pBias);
break;
}
pUnblock = pNext;
}
if (pCurrentContext != NULL)
pCurrentContext->ExitHyperCriticalRegion();
return fHadItems;
}
/// <summary>
/// Static initialization common to UMS schedulers.
/// </summary>
void UMSThreadScheduler::OneShotStaticConstruction()
{
t_dwSchedulingContextIndex = TlsAlloc();
if (t_dwSchedulingContextIndex == TLS_OUT_OF_INDEXES)
{
throw scheduler_resource_allocation_error(HRESULT_FROM_WIN32(GetLastError()));
}
}
/// <summary>
/// Static destruction common to UMS schedulers.
/// </summary>
void UMSThreadScheduler::OneShotStaticDestruction()
{
TlsFree(t_dwSchedulingContextIndex);
t_dwSchedulingContextIndex = 0;
}
} // namespace details
} // namespace Concurrency
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -