📄 event.cpp
字号:
} while (spinWait._SpinOnce());
//
// Give up and block, first putting our context on a stack-based
// list of waiting contexts for this event.
//
SingleWaitBlock block;
EventWaitNode node(&block);
bool fSatisfied = false;
{
critical_section::scoped_lock lockGuard(_M_lock);
if (_M_pWaitChain == EVENT_SIGNALED)
fSatisfied = true;
else
{
node.m_pNext = Sweep(reinterpret_cast <EventWaitNode *> (_M_pWaitChain));
_M_pWaitChain = &node;
}
}
if (!fSatisfied )
{
bool bSkip = block.m_smSkip_BlockUnblock == WaitBlock::SKIP // Avoid unneccessary InterlockedCompareExchange for optimizing.
|| InterlockedCompareExchange(&block.m_smSkip_BlockUnblock, WaitBlock::DONT_SKIP, WaitBlock::UNDECIDED) == WaitBlock::SKIP;
if(!bSkip)
Context::Block();
}
return 0;
}
/// <summary>
/// Resets the specified event.
/// </summary>
void event::reset()
{
critical_section::scoped_lock lockGuard(_M_lock);
if (_M_pWaitChain == EVENT_SIGNALED)
{
EventWaitNode *pRoot = NULL;
EventWaitNode *pNext = NULL;
EventWaitNode *pNode = reinterpret_cast <EventWaitNode *> (_M_pResetChain);
_M_pResetChain = NULL;
for (; pNode != NULL; pNode = pNext)
{
pNext = pNode->m_pNext;
if (pNode->Reset())
{
//
// We need to shift this back to the wait list. The wait hasn't been satisfied and
// this reset impacts the block.
//
pNode->m_pNext = pRoot;
pRoot = pNode;
}
}
_M_pWaitChain = pRoot;
}
}
/// <summary>
/// Sets the specified event.
/// </summary>
void event::set()
{
Context **pContexts = NULL;
ULONG nodeCount = 0;
{
critical_section::scoped_lock lockGuard(_M_lock);
//
// Although it's not technically necessary to interlock this, it allows an optimization for light-weight events
// in that they are able to spin for a period before blocking. Without the fence here, they would not.
//
EventWaitNode *pOldChain;
pOldChain = reinterpret_cast <EventWaitNode *> (
InterlockedExchangePointer (reinterpret_cast <volatile PVOID *> (&_M_pWaitChain), EVENT_SIGNALED)
);
if (pOldChain > EVENT_SIGNALED)
{
ASSERT(_M_pResetChain == NULL);
EventWaitNode *pNext;
//
// Note that the lock grabbed above is within the event, so it's entirely possible that the moment we unblock
// the context, the lock is gone. We also don't want to diddle in the scheduler lists while under a hot
// lock, so build the list of contexts to unblock, release the lock, and then diddle in the scheduler.
//
nodeCount = 0;
for (EventWaitNode *pNode = pOldChain; pNode != NULL; pNode = pNode->m_pNext)
nodeCount++;
pContexts = reinterpret_cast <Context **> (_malloca(sizeof (Context *) * nodeCount));
nodeCount = 0;
for (EventWaitNode *pNode = pOldChain; pNode != NULL; pNode = pNext)
{
//
// Need to cache the next pointer, since as soon as we unblock,
// the stack-based EventWaitNode may be deallocated.
//
pNext = pNode->m_pNext;
Context *pContext;
if (pNode->Satisfy(&pContext))
{
//
// If Satisfy returned true, we need to track the node as it's part of
// a wait-for-all and a reset on this event could impact it.
//
pNode->m_pNext = reinterpret_cast <EventWaitNode *> (_M_pResetChain);
//
// Guarded via the spinlock.
//
_M_pResetChain = pNode;
}
if (pContext != NULL)
pContexts[nodeCount++] = pContext;
}
}
}
//
// Unblock contexts outside the given dispatch lock.
//
while(nodeCount-- > 0)
{
pContexts[nodeCount]->Unblock();
}
if (pContexts != NULL)
_freea(pContexts);
}
#pragma warning(disable:26010)
/// <summary>
/// Waits for multiple events to become signaled.
/// </summary>
/// <param name="pEvents">
/// An array of events to wait upon
/// </param>
/// <param name="count">
/// A count of events within the array
/// </param>
/// <param name="fWaitAll">
/// An indication of whether to wait for all events or just a single one
/// </param>
size_t event::wait_for_multiple(event** pEvents, size_t count, bool fWaitAll, unsigned int timeout)
{
//
// Handle some trivial cases up front
//
if (pEvents == NULL)
{
throw std::invalid_argument("pEvents");
}
//
// Nothing to wait on.
//
if (count == 0)
return 0;
//
// Optimize for any caller which decides to call this to wait on a single event. All waits with timeouts
// flow through here as we need the heavier weight mechanism.
//
if (count == 1 && (timeout == 0 || timeout == COOPERATIVE_TIMEOUT_INFINITE))
{
if (pEvents[0] == NULL)
{
throw std::invalid_argument("pEvents");
}
return pEvents[0]->wait(timeout);
}
for (size_t i = 0; i < count; i++)
{
if (pEvents[i] == NULL)
{
throw std::invalid_argument("pEvents");
}
}
MultiWaitBlockHolder waitBlock(fWaitAll, count, timeout != COOPERATIVE_TIMEOUT_INFINITE, (timeout != 0 && timeout != COOPERATIVE_TIMEOUT_INFINITE));
MultiWaitBlock *pWaitBlock = waitBlock.GetWaitBlock();
//
// Chain to each event, carefully checking signal state for each as we go. Note that a wait
// any can be satisfied immediately if any fail due to an event already being signaled. In
// that case, we must carefully dispose the rest of the nodes and make sure the counters are
// appropriate for wait block disposal as the chained ones get dechained later on other event
// set/reset/destruction.
//
bool fSatisfied = false;
for (size_t i = 0; i < count; i++)
{
event *pEvent = pEvents[i];
Context *pSatisfiedContext;
critical_section::scoped_lock lockGuard(pEvent->_M_lock);
EventWaitNode *pWaitNode = waitBlock.GetWaitNode(i);
waitBlock.Release();
EventWaitNode *pOldChain = reinterpret_cast <EventWaitNode *> (pEvent->_M_pWaitChain);
if (pOldChain == EVENT_SIGNALED)
{
//
// Event was signaled before we could add ourself to the wait list... We must be
// very careful here. For a "wait any", we are satisfied but need to take care
// to ensure that the heap blocks get appropriately freed and dechained. For a wait
// all, we need to chain to the reset list as it's possible that the event is reset
// before some other event that would satisfy the wait is signaled.
//
if (fWaitAll)
{
if (pWaitNode->Satisfy(&pSatisfiedContext))
{
pWaitNode->m_pNext = Sweep(reinterpret_cast <EventWaitNode *> (pEvent->_M_pResetChain));
pEvent->_M_pResetChain = pWaitNode;
}
if (pSatisfiedContext != NULL)
{
ASSERT(i == count - 1);
fSatisfied = true;
}
}
else
{
//
// The wait is satisfied.
//
pWaitNode->Satisfy(&pSatisfiedContext);
if(pSatisfiedContext != NULL)
fSatisfied = true;
for (size_t j = i + 1; j < count; j++)
{
pWaitNode = waitBlock.GetWaitNode(j);
waitBlock.Release();
pWaitNode->Satisfy(&pSatisfiedContext);
ASSERT(pSatisfiedContext == NULL);
}
break;
}
}
else
{
pWaitNode->m_pNext = Sweep(pOldChain);
pEvent->_M_pWaitChain = pWaitNode;
}
}
if (!fSatisfied )
{
//
// For explanation of skipping Block/Unblock please see the comments in MultiWaitBlock::SingleSatisfy() method.
//
bool bSkip = pWaitBlock->m_smSkip_BlockUnblock == WaitBlock::SKIP // Avoid unneccessary InterlockedCompareExchange for optimizing.
|| InterlockedCompareExchange(&pWaitBlock->m_smSkip_BlockUnblock, WaitBlock::DONT_SKIP, WaitBlock::UNDECIDED) == WaitBlock::SKIP;
if( !bSkip )
{
//
// Handle timeouts of zero specially. We don't want to block the thread.
//
if (timeout == 0)
{
if (InterlockedIncrement(&pWaitBlock->m_finalTrigger) == 1)
{
pWaitBlock->m_pSatisfiedBy = NULL;
fSatisfied = true;
}
else
{
Context::Block();
}
}
else
{
if (timeout != COOPERATIVE_TIMEOUT_INFINITE)
{
if (pWaitBlock->m_finalTrigger == 0)
{
if (!CreateTimerQueueTimer(&pWaitBlock->m_hTimer,
GetSharedTimerQueue(),
MultiWaitBlock::DispatchEventTimer,
pWaitBlock,
timeout,
0,
WT_EXECUTEINTIMERTHREAD))
{
//
// Note that the thread is left in a state unexplicable by the scheduler here. It's quite possible someone ::Unblocks this context in
// the future. With this error, we make no attempt to unwind that.
//
throw std::bad_alloc();
}
waitBlock.Release();
}
}
Context::Block();
}
}
}
return (pWaitBlock->m_pSatisfiedBy == NULL) ? COOPERATIVE_WAIT_TIMEOUT : waitBlock.GetIndexOfNode(pWaitBlock->m_pSatisfiedBy);
}
#pragma warning(default:26010)
namespace details
{
/// <summary>
/// Constructs a holder for a single allocation wait block which gets split into a wait block and a series of wait nodes,
/// one per wait object.
/// </summary>
MultiWaitBlockHolder::MultiWaitBlockHolder(bool fWaitAll, size_t count, bool timeout, bool timer) : m_count(0)
{
//
// Allocate a single block comprised of all the wait nodes / block that we need to satisfy
// the wait for multiple.
//
m_blockSize = ALIGNED_SIZE(fWaitAll ? sizeof(WaitAllBlock) : sizeof(WaitAnyBlock), P2_ALIGN);
m_nodeSize = ALIGNED_SIZE(sizeof(EventWaitNode), P2_ALIGN);
m_totalBlockSize = m_blockSize + m_nodeSize * count;
m_pMemBlock = new BYTE[m_totalBlockSize];
m_pWaitBlock = reinterpret_cast<MultiWaitBlock *> (m_pMemBlock);
if (fWaitAll)
new(m_pMemBlock) WaitAllBlock(count, timeout, timer);
else
new(m_pMemBlock) WaitAnyBlock(count, timeout, timer);
BYTE *pWaitNodeAddr = m_pMemBlock + m_blockSize;
for (size_t i = 0; i < count; i++)
{
new(pWaitNodeAddr) EventWaitNode(m_pWaitBlock);
pWaitNodeAddr += m_nodeSize;
}
//
// The number of references on the block is the number of wait objects plus the timer plus one for
// the stack frame of the WaitForMultiple which initialized us. The block gets freed when NotifyCompletedNode
// is called m_refs number of times. This object is responsible, in normal cases, for releasing the single
// stack frame reference. It's also responsible for cleaning up and releasing any references that won't come
// from wait objects / timers due to exceptions thrown in the midst of setting up the wait.
//
m_refs = count + (timer ? 2 : 1);
}
/// <summary>
/// Destructor for the wait block holder. Releases any references on the block which will not come as the result
/// of a release.
/// </summary>
MultiWaitBlockHolder::~MultiWaitBlockHolder()
{
while(m_count++ < m_refs)
m_pWaitBlock->NotifyCompletedNode();
}
/// <summary>
/// Called in order to satisfy the wait. This handles a single wait/timer combination. Any multi-wait semantic
/// must override this and call the base class in order to present a single-wait semantic.
/// </summary>
void MultiWaitBlock::SingleSatisfy(Context **pContextOut, EventWaitNode *pNode)
{
//
// If there is a timeout, the timer may already have unblocked the context.
//
Context *pContext = m_pContext;
bool fSatisfied = true;
if (m_fTimeout)
{
if (InterlockedIncrement(&m_finalTrigger) != 1)
fSatisfied = false;
}
if (fSatisfied)
{
m_pSatisfiedBy = pNode;
if (m_hTimer)
{
for(;;)
{
if (!DeleteTimerQueueTimer(GetSharedTimerQueue(), m_hTimer, INVALID_HANDLE_VALUE))
{
if (GetLastError() == ERROR_IO_PENDING)
break;
}
else
break;
}
//
// Now, we need to answer the question of whether the timer fired and incremented the
// trigger or not. That will answer the question of when we delete the wait block.
//
if (m_finalTrigger == 1)
NotifyCompletedNode();
}
//
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -