📄 workstealingqueue.h
字号:
/// </summary>
int Push(T* elem)
{
int t = m_tail;
//
// Careful here since we might interleave with Steal.
// This is no problem since we just conservatively check if there is
// enough space left (t < m_head + size). However, Steal might just have
// incremented m_head and we could potentially overwrite the old m_head
// entry, so we always leave at least one extra 'buffer' element and
// check (m_tail < m_head + size - 1). This also plays nicely with our
// initial m_mask of 0, where size is 2^0 == 1, but the tasks array is
// still null.
//
if (t < m_head + m_mask) // == t < m_head + size - 1
{
m_pTasks[t & m_mask] = elem;
m_tail = t + 1; // only increment once we have initialized the task entry.
return t + m_cookieBase;
}
else
{
//
// failure: we need to resize or re-index
//
return SyncPush(elem);
}
}
/// <summary>
/// Only called from the bound thread, this sweeps the work stealing queue under the steal lock for any chores matching the
/// specified predicate.
/// </summary>
void Sweep(SweepPredicate pPredicate, void *pData, SweepFunction pSweepFn)
{
LOCK::_Scoped_lock lockHolder(*m_pLock);
int nt = m_tail;
int t = m_tail - 1;
while (t >= m_head)
{
T* pResult = m_pTasks[t & m_mask];
if (pResult != NULL)
{
if (pPredicate(pResult, pData))
{
if (pSweepFn(pResult, pData))
{
//
// If it's atop the WSQ, just decrement the tail (nt == new tail); otherwise,
// make sure to NULL out the entry to indicate an out-of-order rip.
//
if (t + 1 == nt)
nt--;
else
m_pTasks[t & m_mask] = NULL;
}
}
}
t--;
}
InterlockedExchange((volatile LONG *)&m_tail, nt);
}
/// <summary>
/// Marks the work stealing queue as detached. The current head pointer marks the end point of detachment. Note
/// that this should only be called when there is a guarantee of no concurrent pushes or pops from the owning thread.
/// </summary>
void MarkDetachment()
{
LOCK::_Scoped_lock lockHolder(*m_pLock);
m_fMarkedForDetachment = true;
m_detachmentTail = m_tail;
}
/// <summary>
/// Destroys a work stealing queue.
/// </summary>
~WorkStealingQueue()
{
delete [] m_pTasks;
}
private:
// The m_head and m_tail are volatile as they can be updated from different OS threads.
// The "m_head" is only updated by foreign threads as they Steal a task from
// this queue. By putting a lock in Steal, there is at most one foreign thread
// changing m_head at a time. The m_tail is only updated by the bound thread.
//
// invariants:
// tasks.length is a power of 2
// m_mask == tasks.length-1
// m_head is only written to by foreign threads
// m_tail is only written to by the bound thread
// At most one foreign thread can do a Steal
// All methods except Steal are executed from a single bound thread
// m_tail points to the first unused location
//
static const int s_initialSize = 64; // must be a power of 2
volatile int m_head; // only updated by Steal
volatile int m_tail; // only updated by Push and Pop
int m_mask; // the m_mask for taking modulus
int m_cookieBase; // the base cookie index
LOCK *m_pLock; // the lock that guards stealing from push/pops
bool m_fMarkedForDetachment; // Indicates whether or not the work stealing queue is marked for detachment
int m_detachmentTail; // The tail pointer for detachment. When the head crosses this, the mark ends
T** m_pTasks; // the array of tasks
/// <summary>
/// Pushes an element onto the work stealing queue under the queue lock. This guarantees that no steal
/// interleaves and guarantees the ability to reallocate the physical store. The returned value is a cookie
/// per Push().
/// </summary>
int SyncPush(T* elem)
{
//
// Because WorkStealingQueue is used for LRC and LRC needs to be searched in a SFW from a UMS primary, the lock here is a hyper
// lock and no memory allocations can happen inside its scope. Preallocate everything up front!
//
// Keep in mind that the only thing that's going to happen without the lock held is a steal. No one else will try to resize,
// pop, push, etc...
//
//
// == (count >= size-1)
//
int oldsize = m_mask + 1;
int newsize = 2 * oldsize; // highly unlikely, but throw out-of-memory if this overflows
int count = m_tail - m_head;
//
// Yes -- it's entirely possible that we allocate and DON'T need to in rare circumstances - steal just opened up a slot. In that particular
// case, we will just do the resizing since it's almost full.
//
T** pNewTasks = new T*[newsize];
//
// Again, for reasons of UMS, we cannot delete the old array until after we release the hyper lock. Stash it away
// and defer the deletion.
//
T** pOldTasks = m_pTasks;
{
//
// ensure that no Steal interleaves here
//
LOCK::_Scoped_lock lockHolder(*m_pLock);
//
// cache m_head, and calculate number of tasks
//
int h = m_head;
count = m_tail - h;
//
// normalize indices
//
h = h & m_mask; // normalize m_head
m_cookieBase += m_tail - (h + count);
m_head = h;
m_tail = h + count;
#pragma warning(disable:26010)
// we get here the first time we've overflowed,
// so as long as m_mask >= 1, which is asserted in the ctor, there's plenty of room
CORE_ASSERT(count < newsize);
CORE_ASSERT(pNewTasks != NULL);
for (int i = 0; i < count; ++i)
pNewTasks[i] = m_pTasks[(h + i) & m_mask];
m_pTasks = pNewTasks;
#pragma warning(default:26010)
//
// Rebase the cookie index. We can't hand out duplicate cookies due to this.
//
m_cookieBase += m_head;
//
// Rebase the detachment point if necessary.
//
if (m_fMarkedForDetachment)
{
CORE_ASSERT(m_detachmentTail >= m_head);
m_detachmentTail -= m_head;
}
m_mask = newsize - 1;
m_head = 0;
m_tail = count;
CORE_ASSERT(count < m_mask);
//
// push the element
//
int t = m_tail;
m_pTasks[t & m_mask] = elem;
m_tail = t + 1;
}
delete[] pOldTasks;
return m_tail - 1 + m_cookieBase;
}
/// <summary>
/// Synchronously pops an element from the work stealing queue. Note that this is called in the case where
/// a Pop() call and a Steal() call interleave.
/// </summary>
T* SyncPop()
{
//
// ensure that no Steal interleaves with this pop
//
LOCK::_Scoped_lock lockHolder(*m_pLock);
T* pResult = NULL;
int t = m_tail - 1;
m_tail = t;
if (m_head <= t)
{
//
// == (m_head <= m_tail)
//
pResult = m_pTasks[t & m_mask];
//
// Because this was a single element / interleave with steal, there is nothing
// below this in the WSQ in the event of a NULL return. Hence, we do not need
// to perform an explicit skip as in Pop().
//
}
else
{
m_tail = t + 1; // restore m_tail
}
if (m_head >= t)
{
//
// Rebase the cookie index so we guarantee that currently handed out cookie values are
// still valid until they are trypop()'d.
//
m_cookieBase += m_head;
//
// queue is empty: reset m_head and m_tail
//
m_head = 0;
m_tail = 0;
m_detachmentTail = 0;
m_fMarkedForDetachment = false;
}
return pResult;
}
};
} // namespace details
} // namespace Concurrency
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -