⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 workstealingqueue.h

📁 C语言库函数的原型,有用的拿去
💻 H
📖 第 1 页 / 共 2 页
字号:
        /// </summary>
        int Push(T* elem)
        {
            int t = m_tail;
            //
            // Careful here since we might interleave with Steal.
            // This is no problem since we just conservatively check if there is
            // enough space left (t < m_head + size). However, Steal might just have
            // incremented m_head and we could potentially overwrite the old m_head
            // entry, so we always leave at least one extra 'buffer' element and
            // check (m_tail < m_head + size - 1). This also plays nicely with our
            // initial m_mask of 0, where size is 2^0 == 1, but the tasks array is
            // still null.
            //
            if (t < m_head + m_mask)  // == t < m_head + size - 1
            {
                m_pTasks[t & m_mask] = elem;
                m_tail = t + 1;       // only increment once we have initialized the task entry.
                return t + m_cookieBase;
            }
            else
            {
                //
                // failure: we need to resize or re-index
                //
                return SyncPush(elem);
            }
        }

        /// <summary>
        ///     Only called from the bound thread, this sweeps the work stealing queue under the steal lock for any chores matching the
        ///     specified predicate.
        /// </summary>
        void Sweep(SweepPredicate pPredicate, void *pData, SweepFunction pSweepFn)
        {
            LOCK::_Scoped_lock lockHolder(*m_pLock);

            int nt = m_tail;
            int t = m_tail - 1;

            while (t >= m_head)
            {
                T* pResult = m_pTasks[t & m_mask];
                if (pResult != NULL)
                {
                    if (pPredicate(pResult, pData))
                    {
                        if (pSweepFn(pResult, pData))
                        {
                            //
                            // If it's atop the WSQ, just decrement the tail (nt == new tail); otherwise, 
                            // make sure to NULL out the entry to indicate an out-of-order rip.
                            //
                            if (t + 1 == nt)
                                nt--;
                            else
                                m_pTasks[t & m_mask] = NULL;
                        }
                    }
                }
                
                t--;
            }

            InterlockedExchange((volatile LONG *)&m_tail, nt);
        }

        /// <summary>
        ///     Marks the work stealing queue as detached.  The current head pointer marks the end point of detachment.  Note
        ///     that this should only be called when there is a guarantee of no concurrent pushes or pops from the owning thread.
        /// </summary>
        void MarkDetachment()
        {
            LOCK::_Scoped_lock lockHolder(*m_pLock);
            m_fMarkedForDetachment = true;
            m_detachmentTail = m_tail;
        }

        /// <summary>
        ///     Destroys a work stealing queue.
        /// </summary>
        ~WorkStealingQueue()
        {      
            delete [] m_pTasks;
        }

    private:

        // The m_head and m_tail are volatile as they can be updated from different OS threads.
        // The "m_head" is only updated by foreign threads as they Steal a task from
        // this queue. By putting a lock in Steal, there is at most one foreign thread
        // changing m_head at a time. The m_tail is only updated by the bound thread.
        //
        // invariants:
        //   tasks.length is a power of 2
        //   m_mask == tasks.length-1
        //   m_head is only written to by foreign threads
        //   m_tail is only written to by the bound thread
        //   At most one foreign thread can do a Steal
        //   All methods except Steal are executed from a single bound thread
        //   m_tail points to the first unused location
        //

        static const int s_initialSize = 64;  // must be a power of 2
        volatile int m_head;                  // only updated by Steal 
        volatile int m_tail;                  // only updated by Push and Pop 
        
        int m_mask;                           // the m_mask for taking modulus 
        int m_cookieBase;                     // the base cookie index

        LOCK *m_pLock;                        // the lock that guards stealing from push/pops

        bool m_fMarkedForDetachment;          // Indicates whether or not the work stealing queue is marked for detachment
        int m_detachmentTail;                 // The tail pointer for detachment.  When the head crosses this, the mark ends

        T**  m_pTasks;                        // the array of tasks 

        /// <summary>
        ///     Pushes an element onto the work stealing queue under the queue lock.  This guarantees that no steal
        ///     interleaves and guarantees the ability to reallocate the physical store.  The returned value is a cookie
        ///     per Push().
        /// </summary>
        int SyncPush(T* elem)
        {
            //
            // Because WorkStealingQueue is used for LRC and LRC needs to be searched in a SFW from a UMS primary, the lock here is a hyper
            // lock and no memory allocations can happen inside its scope.  Preallocate everything up front!
            //
            // Keep in mind that the only thing that's going to happen without the lock held is a steal.  No one else will try to resize,
            // pop, push, etc...
            //
            //
            // == (count >= size-1)
            //
            int oldsize = m_mask + 1;
            int newsize = 2 * oldsize; // highly unlikely, but throw out-of-memory if this overflows
            int count = m_tail - m_head;

            //
            // Yes -- it's entirely possible that we allocate and DON'T need to in rare circumstances - steal just opened up a slot.  In that particular
            // case, we will just do the resizing since it's almost full.
            //
            T** pNewTasks = new T*[newsize];
            //
            // Again, for reasons of UMS, we cannot delete the old array until after we release the hyper lock.  Stash it away
            // and defer the deletion.
            //
            T** pOldTasks = m_pTasks;

            {
                //
                // ensure that no Steal interleaves here
                //
                LOCK::_Scoped_lock lockHolder(*m_pLock);

                //
                // cache m_head, and calculate number of tasks
                //
                int h = m_head;
                count = m_tail - h;

                //
                // normalize indices
                //
                h = h & m_mask;           // normalize m_head
                m_cookieBase += m_tail - (h + count);
                m_head = h;
                m_tail = h + count;

#pragma warning(disable:26010)

                // we get here the first time we've overflowed,
                // so as long as m_mask >= 1, which is asserted in the ctor, there's plenty of room
                CORE_ASSERT(count < newsize);
                CORE_ASSERT(pNewTasks != NULL);
                for (int i = 0; i < count; ++i)
                    pNewTasks[i] = m_pTasks[(h + i) & m_mask];

                m_pTasks = pNewTasks;

#pragma warning(default:26010)

                //
                // Rebase the cookie index.  We can't hand out duplicate cookies due to this.
                //
                m_cookieBase += m_head;

                //
                // Rebase the detachment point if necessary.
                //
                if (m_fMarkedForDetachment)
                {
                    CORE_ASSERT(m_detachmentTail >= m_head);
                    m_detachmentTail -= m_head;
                }

                m_mask = newsize - 1;
                m_head = 0;
                m_tail = count;

                CORE_ASSERT(count < m_mask);

                //
                // push the element
                //
                int t = m_tail;
                m_pTasks[t & m_mask] = elem;
                m_tail = t + 1;

            }

            delete[] pOldTasks;
            return m_tail - 1 + m_cookieBase;
        }

        /// <summary>
        ///     Synchronously pops an element from the work stealing queue.  Note that this is called in the case where
        ///     a Pop() call and a Steal() call interleave.
        /// </summary>
        T* SyncPop()
        {
            //
            // ensure that no Steal interleaves with this pop
            //
            LOCK::_Scoped_lock lockHolder(*m_pLock);

            T* pResult = NULL;
            int t = m_tail - 1;
            m_tail = t;
            if (m_head <= t)
            {
                //
                // == (m_head <= m_tail)
                //
                pResult = m_pTasks[t & m_mask];

                //
                // Because this was a single element / interleave with steal, there is nothing
                // below this in the WSQ in the event of a NULL return.  Hence, we do not need
                // to perform an explicit skip as in Pop().
                //
            }
            else
            {
                m_tail = t + 1;       // restore m_tail
            }
            if (m_head >= t)
            {
                //
                // Rebase the cookie index so we guarantee that currently handed out cookie values are
                // still valid until they are trypop()'d.
                //
                m_cookieBase += m_head;

                //
                // queue is empty: reset m_head and m_tail
                //
                m_head = 0;
                m_tail = 0;
                m_detachmentTail = 0;
                m_fMarkedForDetachment = false;
            }

            return pResult;
        }

    };
} // namespace details
} // namespace Concurrency

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -