📄 concurrent_queue.h
字号:
/***
* ==++==
*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Microsoft would like to acknowledge that this concurrency data structure implementation
* is based on Intel抯 implementation in its Threading Building Blocks ("Intel Material").
*
* ==--==
* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
*
* concurrent_queue.h
*
* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
****/
/*
Intel Material Copyright 2005-2008 Intel Corporation. All Rights Reserved.
*/
#pragma once
#include <crtdefs.h>
#include <memory>
#include <cstddef>
#include <crtdbg.h>
#include <concrt.h>
#if !(defined (_M_AMD64) || defined (_M_IX86))
#error ERROR: Concurrency Runtime is supported only on X64 and X86 architectures.
#endif /* !(defined (_M_AMD64) || defined (_M_IX86)) */
#if defined (_M_CEE)
#error ERROR: Concurrency Runtime is not supported when compiling /clr.
#endif /* defined (_M_CEE) */
#pragma pack(push,_CRT_PACKING)
/// <summary>
/// The <c>Concurrency</c> namespace provides classes and functions that give you access to the Concurrency Runtime,
/// a concurrent programming framework for C++. For more information, see <see cref="Concurrency Runtime"/>.
/// </summary>
/**/
namespace Concurrency
{
template<typename _Ty, class _Ax = std::allocator<_Ty> >
class concurrent_queue;
namespace details
{
class _Concurrent_queue_rep;
typedef size_t _Ticket;
class _Concurrent_queue_iterator_rep;
class _Concurrent_queue_iterator_base_v4;
template<typename _Container, typename _Value> class _Concurrent_queue_iterator;
// Type-independent portion of concurrent_queue.
class _Concurrent_queue_base_v4
{
// Internal representation
_Concurrent_queue_rep* _My_rep;
friend class _Concurrent_queue_rep;
friend struct _Micro_queue;
friend class _Micro_queue_pop_finalizer;
friend class _Concurrent_queue_iterator_rep;
friend class _Concurrent_queue_iterator_base_v4;
protected:
// Prefix on a page
struct _Page
{
_Page* _Next;
size_t _Mask;
};
// Always a power of 2
size_t _Items_per_page;
// Size of an item
size_t _Item_size;
private:
virtual void _Copy_item( _Page& _Dst, size_t _Index, const void* _Src ) = 0;
virtual void _Assign_and_destroy_item( void* _Dst, _Page& _Src, size_t _Index ) = 0;
protected:
_CRTIMP2 _Concurrent_queue_base_v4( size_t _Item_size );
_CRTIMP2 virtual ~_Concurrent_queue_base_v4();
// Enqueue item at tail of queue
_CRTIMP2 void _Internal_push( const void* _Src );
// Attempt to dequeue item from queue.
/** NULL if there was no item to dequeue. */
_CRTIMP2 bool _Internal_pop_if_present( void* _Dst );
// Get size of queue
_CRTIMP2 size_t _Internal_size() const;
// Test instantaneous queue empty
_CRTIMP2 bool _Internal_empty() const;
// custom allocator
virtual _Page *_Allocate_page() = 0;
// custom de-allocator
virtual void _Deallocate_page( _Page *p ) = 0;
// free any remaining pages
_CRTIMP2 void _Internal_finish_clear() ;
// throw an exception
_CRTIMP2 void _Internal_throw_exception() const;
private:
// Deny copy construction
_Concurrent_queue_base_v4( const _Concurrent_queue_base_v4& );
// Deny assignment
void operator=( const _Concurrent_queue_base_v4& );
};
typedef _Concurrent_queue_base_v4 _Concurrent_queue_base ;
// A queue using simple locking.
/** For efficiency, this class has no constructor.
The caller is expected to zero-initialize it. */
struct _Micro_queue
{
class _Pop_finalizer;
class _Push_finalizer;
_Subatomic<_Concurrent_queue_base::_Page*> _Head_page;
_Subatomic<_Ticket> _Head_counter;
_Subatomic<_Concurrent_queue_base::_Page*> _Tail_page;
_Subatomic<_Ticket> _Tail_counter;
volatile long _Page_mutex_flag;
void _Push( const void* _Item, _Ticket _K, _Concurrent_queue_base& _Base );
bool _Pop( void* _Dest, _Ticket _K, _Concurrent_queue_base& _Base );
};
// Disable warning C4324: structure was padded due to __declspec(align())
// This padding is expected and necessary.
#pragma warning(push)
#pragma warning(disable: 4324)
// Internal representation of a ConcurrentQueue.
/** For efficiency, this class has no constructor.
The caller is expected to zero-initialize it. */
class _Concurrent_queue_rep
{
private:
friend struct _Micro_queue;
// Approximately n_queue/golden ratio
static const size_t _Phi = 3;
public:
// Must be power of 2
static const size_t _N_queue = 8;
// Map ticket to an array index
static size_t _Index( _Ticket _K )
{
return _K*_Phi%_N_queue;
}
__declspec(align(64))
_Subatomic<_Ticket> _Head_counter;
__declspec(align(64))
_Subatomic<_Ticket> _Tail_counter;
__declspec(align(64))
_Micro_queue _Array[_N_queue];
_Micro_queue& _Choose( _Ticket _K )
{
// The formula here approximates LRU in a cache-oblivious way.
return _Array[_Index(_K)];
}
};
#pragma warning(pop)
// Type-independent portion of _Concurrent_queue_iterator.
class _Concurrent_queue_iterator_base_v4 {
// Concurrentconcurrent_queue over which we are iterating.
/** NULL if one past last element in queue. */
_Concurrent_queue_iterator_rep* _My_rep;
template<typename _C, typename _Ty, typename _U>
friend bool operator==( const _Concurrent_queue_iterator<_C,_Ty>&, const _Concurrent_queue_iterator<_C,_U>& );
template<typename _C, typename _Ty, typename _U>
friend bool operator!=( const _Concurrent_queue_iterator<_C,_Ty>&, const _Concurrent_queue_iterator<_C,_U>& );
protected:
// Pointer to current item
mutable void* _My_item;
// Default constructor
_Concurrent_queue_iterator_base_v4()
: _My_rep(NULL), _My_item(NULL)
{
}
// Copy constructor
_Concurrent_queue_iterator_base_v4( const _Concurrent_queue_iterator_base_v4& _I )
: _My_rep(NULL), _My_item(NULL)
{
_Assign(_I);
}
// Construct iterator pointing to head of queue.
_CRTIMP2 _Concurrent_queue_iterator_base_v4( const _Concurrent_queue_base& );
// Assignment
_CRTIMP2 void _Assign( const _Concurrent_queue_iterator_base_v4& );
// Advance iterator one step towards tail of queue.
_CRTIMP2 void _Advance();
// Destructor
_CRTIMP2 ~_Concurrent_queue_iterator_base_v4();
};
typedef _Concurrent_queue_iterator_base_v4 concurrent_queue_iterator_base;
// Meets requirements of a forward iterator for STL.
/** _Value is either the _Ty or const _Ty type of the container. */
template<typename _Container, typename _Value>
class _Concurrent_queue_iterator: public _Concurrent_queue_iterator_base_v4
{
template<typename _Ty, class _Ax> friend class ::Concurrency::concurrent_queue;
// Construct iterator pointing to head of queue.
_Concurrent_queue_iterator( const _Concurrent_queue_base& _Queue )
: _Concurrent_queue_iterator_base_v4(_Queue)
{
}
public:
_Concurrent_queue_iterator()
{
}
/** If _Value==_Container::value_type, then this routine is the copy constructor.
If _Value==const _Container::value_type, then this routine is a conversion constructor. */
_Concurrent_queue_iterator( const _Concurrent_queue_iterator<_Container,typename _Container::value_type>& _Other )
: _Concurrent_queue_iterator_base_v4(_Other)
{
}
// Iterator assignment
_Concurrent_queue_iterator& operator=( const _Concurrent_queue_iterator& _Other )
{
_Assign(_Other);
return *this;
}
// Reference to current item
_Value& operator*() const
{
return *static_cast<_Value*>(_My_item);
}
_Value* operator->() const
{
return &operator*();
}
// Advance to next item in queue
_Concurrent_queue_iterator& operator++()
{
_Advance();
return *this;
}
// Post increment
_Concurrent_queue_iterator operator++(int)
{
_Concurrent_queue_iterator _Result = *this;
_Advance();
return _Result;
}
}; // _Concurrent_queue_iterator
template<typename _C, typename _Ty, typename _U>
bool operator==( const _Concurrent_queue_iterator<_C,_Ty>& _I, const _Concurrent_queue_iterator<_C,_U>& _J )
{
return _I._My_item==_J._My_item;
}
template<typename _C, typename _Ty, typename _U>
bool operator!=( const _Concurrent_queue_iterator<_C,_Ty>& _I, const _Concurrent_queue_iterator<_C,_U>& _J )
{
return _I._My_item!=_J._My_item;
}
} // namespace details;
/// <summary>
/// The <c>concurrent_queue</c> class is a sequence container class that allows first-in,
/// first-out access to its elements. It enables a limited set of concurrency-safe operations, such as
/// <c>push</c> and <c>try_pop</c>, to name a few.
/// </summary>
/// <typeparam name="_Ty">
/// The data type of the elements to be stored in the queue.
/// </typeparam>
/// <typeparam name="_Ax">
/// The type that represents the stored allocator object that encapsulates details about the allocation and
/// deallocation of memory for this concurrent queue. This argument is optional and the default value is
/// <c>allocator<</c><typeparamref name="_Ty"/><c>></c>.
/// </typeparam>
/// <remarks>
/// For more information, see <see cref="Parallel Containers and Objects"/>.
/// </remarks>
/**/
template<typename _Ty, class _Ax>
class concurrent_queue: public ::Concurrency::details::_Concurrent_queue_base_v4
{
template<typename _Container, typename _Value> friend class ::Concurrency::details::_Concurrent_queue_iterator;
// allocator type
typedef typename _Ax::template rebind<char>::other _Page_allocator_type;
_Page_allocator_type _My_allocator;
// Class used to ensure exception-safety of method "pop"
class _Destroyer
{
private:
_Ty& _My_value;
void operator=(const _Destroyer&); // prevent warning: assign operator can't be generated
public:
_Destroyer( _Ty& _Value )
: _My_value(_Value)
{
}
~_Destroyer()
{
_My_value.~_Ty();
}
};
_Ty& _Get_ref( _Page& _Pg, size_t _Index )
{
_ASSERTE( _Index<_Items_per_page );
return static_cast<_Ty*>(static_cast<void*>(&_Pg+1))[_Index];
}
/*override*/ virtual void _Copy_item( _Page& _Dst, size_t _Index, const void* _Src )
{
new( &_Get_ref(_Dst,_Index) ) _Ty(*static_cast<const _Ty*>(_Src));
}
/*override*/ virtual void _Assign_and_destroy_item( void* _Dst, _Page& _Src, size_t _Index )
{
_Ty& _From = _Get_ref(_Src,_Index);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -