📄 workstealingqueue.h
字号:
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// WorkStealingQueue.h
//
// Header file containing the core implementation of the work stealing data structures and algorithms.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
#pragma once
namespace Concurrency
{
namespace details
{
/// <summary>
/// A WorkStealingQueue is a wait-free, lock-free structure associated with a single
/// thread that can Push and Pop elements. Other threads can do Steal operations
/// on the other end of the WorkStealingQueue with little contention.
/// </summary>
template <typename T, typename LOCK=_NonReentrantLock>
class WorkStealingQueue
{
// A 'WorkStealingQueue' always runs its code in a single OS thread. We call this the
// 'bound' thread. Only the code in the Steal operation can be executed by
// other 'foreign' threads that try to steal work.
//
// The queue is implemented as an array. The m_head and m_tail index this
// array. To avoid copying elements, the m_head and m_tail index the array modulo
// the size of the array. By making this a power of two, we can use a cheap
// bit-and operation to take the modulus. The "m_mask" is always equal to the
// size of the task array minus one (where the size is a power of two).
//
// The m_head and m_tail are volatile as they can be updated from different OS threads.
// The "m_head" is only updated by foreign threads as they Steal a task from
// this queue. By putting a lock in Steal, there is at most one foreign thread
// changing m_head at a time. The m_tail is only updated by the bound thread.
//
// invariants:
// tasks.length is a power of 2
// m_mask == tasks.length-1
// m_head is only written to by foreign threads
// m_tail is only written to by the bound thread
// At most one foreign thread can do a Steal
// All methods except Steal are executed from a single bound thread
// m_tail points to the first unused location
//
// This work stealing implementation also supports the notion of out-of-order waiting
// and out-of-order removal from the bound thread given that it is initialized to do so.
// There is additional cost to performing this.
//
public:
/// <summary>
/// The callback for a sweep of the workstealing queue. This will be called under the stealing lock on the owning thread
/// for every chore matching a predefined predicate. If true is returned, the item is pulled from the WSQ. If false is returned,
/// the item stays in the WSQ.
/// </summary>
typedef bool (*SweepFunction)(T *pObject, void *pData);
/// <summary>
/// A predicate for a WSQ sweep.
/// </summary>
typedef bool (*SweepPredicate)(T *pObject, void *pData);
/// <summary>
/// Constructs a new work stealing queue
/// </summary>
WorkStealingQueue(LOCK *pLock)
: m_pLock(pLock)
{
ASSERT(m_pLock != NULL);
ASSERT(s_initialSize > 1);
Reinitialize();
m_mask = s_initialSize - 1;
m_pTasks = new T*[s_initialSize];
}
/// <summary>
/// Reinitializes a workqueue to the state just after construction. This is used when recycling a work
/// queue from its ListArray
/// </summary>
void Reinitialize()
{
m_head = 0;
m_tail = 0;
m_detachmentTail = 0;
m_fMarkedForDetachment = false;
m_cookieBase = 0;
}
/// <summary>
/// Unlocked count
/// </summary>
int Count() const
{
return (m_tail - m_head);
}
/// <summary>
/// Unlocked check if empty
/// </summary>
bool Empty() const
{
return (m_tail <= m_head);
}
/// <summary>
/// Check whether to skip the steal
/// </summary>
bool MarkedForDetachment() const
{
return m_fMarkedForDetachment;
}
//
// Push/Pop and Steal can be executed interleaved. In particular:
// 1) A steal and pop should be careful when there is just one element
// in the queue. This is done by first incrementing the m_head/decrementing the m_tail
// and than checking if it interleaved (m_head > m_tail).
// 2) A push and steal can interleave in the sense that a push can overwrite the
// value that is just stolen. To account for this, we check conservatively in
// the push to assume that the size is one less than it actually is.
//
/// <summary>
/// Attempts to steal the oldest element in the queue. This handles potential interleaving with both
/// a Pop and TryPop operation.
/// </summary>
T* UnlockedSteal(bool fOnlyDetached = false)
{
T* pResult = NULL;
for (;;)
{
//
// increment the m_head. Save in local h for efficiency
//
int h = m_head;
InterlockedExchange((volatile LONG*)&m_head, h + 1);
//
// insert a memory fence here -- memory may not be sequentially consistent
//
if (h < m_tail)
{
//
// If the queue is detached and we've crossed the point of detachment, end the detachment marker.
//
if (m_fMarkedForDetachment && m_head >= m_detachmentTail)
m_fMarkedForDetachment = false;
//
// == (h+1 <= m_tail) == (m_head <= m_tail)
//
// When we allow out-of-order waits, it's entirely possible that a TryPop
// executing on the bound thread will grab this out from underneath us. Not
// only do we need guards against interleave with ordered pop, but we also
// need a guard against an out-of-order trypop.
//
pResult = reinterpret_cast<T*> (InterlockedExchangePointer(
reinterpret_cast<PVOID volatile *>( &(m_pTasks[h & m_mask])),
(PVOID) NULL
));
if (pResult != NULL)
break;
}
else
{
//
// failure: either empty or single element interleaving with pop
//
m_head = h; // restore the m_head
break;
}
}
return pResult;
}
// only used in a test
T* Steal()
{
LOCK::_Scoped_lock lockHolder(*m_pLock);
return UnlockedSteal();
}
/// <summary>
/// Attempts to pop the newest element on the work stealing queue. It may return NULL if there is no such
/// item (either unbalanced push/pop, a chore stolen)
/// </summary>
T* Pop()
{
for(;;)
{
//
// decrement the m_tail. Use local t for efficiency.
//
int t = m_tail - 1;
InterlockedExchange((volatile LONG*)&m_tail, t);
//
// insert a memory fence here (InterlockedExchange does the job) --
// memory may not be sequentially consistent
//
if (m_head <= t)
{
//
// == (m_head <= m_tail)
//
T* pResult = m_pTasks[t & m_mask];
//
// Out of order TryPops on the bound thread will set this without
// the need for a fence.
//
if (pResult == NULL) continue;
return pResult;
}
else
{
//
// failure: either empty or single element interleaving with steal
//
m_tail = t + 1; // restore the m_tail
return SyncPop(); // do a single-threaded pop
}
}
}
/// <summary>
/// Tries to pop a previously pushed element from the work stealing queue. Note that this executes
/// a potentially out-of-order wait.
/// </summary>
/// <param name="cookie">
/// The value returned from a Push() call for the work in question
/// </param>
T* TryPop(int cookie)
{
cookie = (cookie - m_cookieBase);
//
// TryPop() has Pop() semantics if we try the topmost element. We only need to do something
// "special" in the out of order case.
//
if (cookie == m_tail - 1) return Pop();
if (cookie >= m_tail || cookie < m_head) return NULL;
T* pResult = reinterpret_cast<T*> (InterlockedExchangePointer(
reinterpret_cast<PVOID volatile *>( &(m_pTasks[cookie & m_mask])),
(PVOID) NULL
));
return pResult;
}
/// <summary>
/// Pushes an element onto the work stealing queue. The returned cookie can be utilized to identify
/// the work item for a future TryPop() call. Note that the returned cookie is only valid until a Pop()
/// or TryPop() call removes the work in question.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -