📄 concurrent_queue.cpp
字号:
/***
* ==++==
*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Microsoft would like to acknowledge that this concurrency data structure implementation
* is based on Intel抯 implementation in its Threading Building Blocks ("Intel Material").
*
* ==--==
* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
*
* concurrent_queue.cpp
*
* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
****/
/*
Intel Material Copyright 2005-2008 Intel Corporation. All Rights Reserved.
*/
#include "concrtinternal.h"
#include "concurrent_queue.h"
#include "cds_cache_aligned_allocator.h"
using namespace std;
#if defined(_MSC_VER) && defined(_Wp64)
// Workaround for compiler warnings in /Wp64 mode
#pragma warning (disable: 4267)
#endif /* _MSC_VER && _Wp64 */
namespace Concurrency
{
namespace details
{
class _Micro_queue::_Push_finalizer
{
private:
_Ticket my_ticket;
_Micro_queue& my_queue;
public:
_Push_finalizer( _Micro_queue& queue, _Ticket k )
: my_ticket(k), my_queue(queue)
{
}
~_Push_finalizer()
{
my_queue._Tail_counter = my_ticket;
}
};
class _Micro_queue::_Pop_finalizer
{
private:
_Ticket my_ticket;
_Micro_queue& my_queue;
_Concurrent_queue_base::_Page* my_page;
_Concurrent_queue_base &base;
public:
_Pop_finalizer( _Micro_queue& queue, _Concurrent_queue_base& b, _Ticket k, _Concurrent_queue_base::_Page* p )
: my_ticket(k), my_queue(queue), my_page(p), base(b)
{
}
~_Pop_finalizer()
{
_Concurrent_queue_base::_Page* p = my_page;
if( p )
{
_SpinLock lock(my_queue._Page_mutex_flag);
_Concurrent_queue_base::_Page* q = p->_Next;
my_queue._Head_page = q;
if( !q )
{
my_queue._Tail_page = NULL;
}
}
my_queue._Head_counter = my_ticket;
if( p )
base._Deallocate_page( p );
}
};
#pragma warning( push )
// unary minus operator applied to unsigned type, result still unsigned
#pragma warning( disable: 4146 )
static void* invalid_page;
//------------------------------------------------------------------------
// _Micro_queue
//------------------------------------------------------------------------
void _Micro_queue::_Push( const void* item, _Ticket k, _Concurrent_queue_base& base )
{
static _Concurrent_queue_base::_Page dummy = {static_cast<_Concurrent_queue_base::_Page*>((void*)1), 0};
k &= -_Concurrent_queue_rep::_N_queue;
_Concurrent_queue_base::_Page* p = NULL;
size_t index = (k/_Concurrent_queue_rep::_N_queue & base._Items_per_page-1);
if( !index )
{
try
{
p = base._Allocate_page();
} catch (...)
{
// mark it so that no more pushes are allowed.
invalid_page = &dummy;
_SpinLock lock(_Page_mutex_flag);
_Tail_counter = k+_Concurrent_queue_rep::_N_queue+1;
if( _Concurrent_queue_base::_Page* q = _Tail_page )
q->_Next = static_cast<_Concurrent_queue_base::_Page*>(invalid_page);
else
_Head_page = static_cast<_Concurrent_queue_base::_Page*>(invalid_page);
_Tail_page = static_cast<_Concurrent_queue_base::_Page*>(invalid_page);
throw;
}
p->_Mask = 0;
p->_Next = NULL;
}
{
_Push_finalizer finalizer( *this, k+_Concurrent_queue_rep::_N_queue );
if( _Tail_counter!=k ) {
_SpinWaitBackoffNone spinWait;
do
{
spinWait._SpinOnce();
// no memory. throws an exception
if( _Tail_counter&0x1 )
throw std::bad_alloc();
} while( _Tail_counter!=k ) ;
}
if( p )
{
_SpinLock lock(_Page_mutex_flag);
if( _Concurrent_queue_base::_Page* q = _Tail_page )
q->_Next = p;
else
_Head_page = p;
_Tail_page = p;
}
else
{
p = _Tail_page;
}
base._Copy_item( *p, index, item );
// If no exception was thrown, mark item as present.
p->_Mask |= size_t(1)<<index;
}
}
bool _Micro_queue::_Pop( void* dst, _Ticket k, _Concurrent_queue_base& base )
{
k &= -_Concurrent_queue_rep::_N_queue;
SpinwaitUntilEq( _Head_counter, k );
SpinwaitWhileEq( _Tail_counter, k );
_Concurrent_queue_base::_Page& p = *_Head_page;
_ASSERTE( &p );
size_t index = (k/_Concurrent_queue_rep::_N_queue & base._Items_per_page-1);
bool success = false;
{
_Pop_finalizer finalizer( *this, base, k+_Concurrent_queue_rep::_N_queue, index==base._Items_per_page-1 ? &p : NULL );
if( p._Mask & size_t(1)<<index )
{
success = true;
base._Assign_and_destroy_item( dst, p, index );
}
}
return success;
}
#pragma warning( pop )
//------------------------------------------------------------------------
// _Concurrent_queue_base
//------------------------------------------------------------------------
_CRTIMP2 _Concurrent_queue_base_v4::_Concurrent_queue_base_v4( size_t _Item_size )
{
_Items_per_page = _Item_size<=8 ? 32 :
_Item_size<=16 ? 16 :
_Item_size<=32 ? 8 :
_Item_size<=64 ? 4 :
_Item_size<=128 ? 2 :
1;
_My_rep = cache_aligned_allocator<_Concurrent_queue_rep>().allocate(1);
_ASSERTE( (size_t)_My_rep % NFS_GetLineSize()==0 ); // alignment error
_ASSERTE( (size_t)&_My_rep->_Head_counter % NFS_GetLineSize()==0 ); // alignment error
_ASSERTE( (size_t)&_My_rep->_Tail_counter % NFS_GetLineSize()==0 ); // alignment error
_ASSERTE( (size_t)&_My_rep->_Array % NFS_GetLineSize()==0 ); // alignment error
memset(_My_rep,0,sizeof(_Concurrent_queue_rep));
this->_Item_size = _Item_size;
}
_CRTIMP2 _Concurrent_queue_base_v4::~_Concurrent_queue_base_v4()
{
size_t nq = _My_rep->_N_queue;
for( size_t i=0; i<nq; i++ )
_ASSERTE( _My_rep->_Array[i]._Tail_page==NULL ); // pages were not freed properly
cache_aligned_allocator<_Concurrent_queue_rep>().deallocate(_My_rep,1);
}
_CRTIMP2 void _Concurrent_queue_base_v4::_Internal_push( const void* src )
{
_Concurrent_queue_rep& r = *_My_rep;
_Ticket tail = r._Tail_counter++;
r._Choose( tail )._Push( src, tail, *this );
}
_CRTIMP2 bool _Concurrent_queue_base_v4::_Internal_pop_if_present( void* dst )
{
_Concurrent_queue_rep& r = *_My_rep;
_Ticket head;
do
{
head = r._Head_counter;
for(;;)
{
if( head == r._Tail_counter )
{
// Queue is empty
return false;
}
// Queue had item with ticket k when we looked. Attempt to get that item.
_Ticket oldHead=head;
head = r._Head_counter._CompareAndSwap( oldHead+1, oldHead );
if( head==oldHead )
break;
// Another thread snatched the item, retry.
}
} while( !r._Choose( head )._Pop( dst, head, *this ) );
return true;
}
_CRTIMP2 size_t _Concurrent_queue_base_v4::_Internal_size() const
{
return static_cast<size_t>(_My_rep->_Tail_counter-_My_rep->_Head_counter);
}
_CRTIMP2 bool _Concurrent_queue_base_v4::_Internal_empty() const
{
_Ticket t0 = _My_rep->_Tail_counter;
_Ticket h = _My_rep->_Head_counter;
_Ticket t1 = _My_rep->_Tail_counter; // Load tail again to test consistency
if (t0 == t1) {
// We got a consistent snapshot, so it was empty when we looked and saw the tail and head were equal.
return t0 == h;
}
// t0 != t1, meaning some other thread must have pushed an item -- it was therefore not empty when we looked
return false;
// ... Of course by the time we get here, the result is obsolete.
}
_CRTIMP2 void _Concurrent_queue_base_v4::_Internal_finish_clear()
{
size_t nq = _My_rep->_N_queue;
for( size_t i=0; i<nq; i++ )
{
_Page* tp = _My_rep->_Array[i]._Tail_page;
_ASSERTE( _My_rep->_Array[i]._Head_page==tp ); //at most one page should remain
if( tp!=NULL)
{
if( tp!=invalid_page )
_Deallocate_page( tp );
_My_rep->_Array[i]._Tail_page = NULL;
}
}
}
_CRTIMP2 void _Concurrent_queue_base_v4::_Internal_throw_exception() const
{
throw bad_alloc();
}
//------------------------------------------------------------------------
// _Concurrent_queue_iterator_rep
//------------------------------------------------------------------------
class _Concurrent_queue_iterator_rep
{
public:
_Ticket _Head_counter;
const _Concurrent_queue_base& my_queue;
_Concurrent_queue_base::_Page* array[_Concurrent_queue_rep::_N_queue];
_Concurrent_queue_iterator_rep( const _Concurrent_queue_base& queue )
: _Head_counter(queue._My_rep->_Head_counter),
my_queue(queue)
{
const _Concurrent_queue_rep& rep = *queue._My_rep;
for( size_t k=0; k<_Concurrent_queue_rep::_N_queue; ++k )
array[k] = rep._Array[k]._Head_page;
}
// Get pointer to kth element
void* choose( size_t k )
{
if( k==my_queue._My_rep->_Tail_counter )
{
return NULL;
}
else
{
_Concurrent_queue_base::_Page* p = array[_Concurrent_queue_rep::_Index(k)];
_ASSERTE(p);
size_t i = k/_Concurrent_queue_rep::_N_queue & my_queue._Items_per_page-1;
return static_cast<unsigned char*>(static_cast<void*>(p+1)) + my_queue._Item_size*i;
}
}
};
//------------------------------------------------------------------------
// concurrent_queue_iterator_base
//------------------------------------------------------------------------
_CRTIMP2 _Concurrent_queue_iterator_base_v4::_Concurrent_queue_iterator_base_v4( const _Concurrent_queue_base& queue )
{
_My_rep = cache_aligned_allocator<_Concurrent_queue_iterator_rep>().allocate(1);
new( _My_rep ) _Concurrent_queue_iterator_rep(queue);
_My_item = _My_rep->choose(_My_rep->_Head_counter);
}
_CRTIMP2 void _Concurrent_queue_iterator_base_v4::_Assign( const concurrent_queue_iterator_base& other )
{
if( _My_rep!=other._My_rep )
{
if( _My_rep )
{
cache_aligned_allocator<_Concurrent_queue_iterator_rep>().deallocate(_My_rep, 1);
_My_rep = NULL;
}
if( other._My_rep )
{
_My_rep = cache_aligned_allocator<_Concurrent_queue_iterator_rep>().allocate(1);
new( _My_rep ) _Concurrent_queue_iterator_rep( *other._My_rep );
}
}
_My_item = other._My_item;
}
_CRTIMP2 void _Concurrent_queue_iterator_base_v4::_Advance()
{
_ASSERTE( _My_item ); // attempt to increment iterator past end of queue
size_t k = _My_rep->_Head_counter;
const _Concurrent_queue_base& queue = _My_rep->my_queue;
_ASSERTE( _My_item==_My_rep->choose(k) );
size_t i = k/_Concurrent_queue_rep::_N_queue & queue._Items_per_page-1;
if( i==queue._Items_per_page-1 )
{
_Concurrent_queue_base::_Page*& root = _My_rep->array[_Concurrent_queue_rep::_Index(k)];
root = root->_Next;
}
_My_rep->_Head_counter = k+1;
_My_item = _My_rep->choose(k+1);
}
_CRTIMP2 _Concurrent_queue_iterator_base_v4::~_Concurrent_queue_iterator_base_v4()
{
cache_aligned_allocator<_Concurrent_queue_iterator_rep>().deallocate(_My_rep, 1);
_My_rep = NULL;
}
} // namespace details
} // namespace Concurrency
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -