📄 agents.h
字号:
/***
* ==++==
*
* Copyright (c) Microsoft Corporation. All rights reserved.
*
* ==--==
* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
*
* agents.h
*
* Main public header file for ConcRT's asynchronous agents layer. This is the only header file a
* C++ program should have to include in order to avail itself of asynchronous agents.
*
* The core runtime, PPL, and resource manager are in separate headers.
* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
****/
#pragma once
#include <crtdefs.h>
#include <concrt.h>
#include <stdexcept>
#include <functional>
#include <tuple>
#include <type_traits>
#include <vector>
#pragma pack(push,_CRT_PACKING)
// Forward declarations
/// <summary>
/// The <c>Concurrency</c> namespace provides classes and functions that give you access to the Concurrency Runtime,
/// a concurrent programming framework for C++. For more information, see <see cref="Concurrency Runtime"/>.
/// </summary>
/**/
namespace Concurrency
{
/// <summary>
/// Each message instance has an identity that follows it as it is
/// cloned and passed between messaging components. This cannot be the
/// address of the message object.
/// </summary>
/**/
typedef __int32 runtime_object_identity;
/// <summary>
/// A lock holder that acquires a non-reentrant lock on instantiation and releases
/// it on destruction.
/// </summary>
/**/
typedef ::Concurrency::details::_NonReentrantPPLLock::_Scoped_lock _NR_lock;
/// <summary>
/// A lock holder that acquires a reentrant lock on instantiation and releases
/// it on destruction
/// </summary>
/**/
typedef ::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock;
//***************************************************************************
// Internal namespace:
//
// Concurrency::details contains definitions to support routines in the public namespaces and macros.
// Clients should not directly interact with this namespace.
//***************************************************************************
namespace details
{
//**************************************************************************
// Core Messaging Support:
//**************************************************************************
//
// A base class to derive from that keeps unique ids on its derived classes
//
class _Runtime_object : public _AllocBase
{
public:
// Creates a new runtime object.
_CRTIMP2 _Runtime_object();
// Creates a runtime object from an identity.
_CRTIMP2 _Runtime_object(::Concurrency::runtime_object_identity _Id);
// Gets the runtime object identity.
virtual ::Concurrency::runtime_object_identity _GetId() const
{
return _M_id;
}
protected:
// The runtime object identity.
::Concurrency::runtime_object_identity _M_id;
};
// A queue used to hold the messages for the messaging blocks
template<class _Message>
class _Queue : public _AllocBase
{
protected:
// A pointer to the head of the queue.
_Message * _M_pHead;
// A pointer to a pointer to the tail of the queue.
_Message ** _M_ppTail;
// The number of elements presently stored in the queue.
size_t _M_count;
public:
typedef typename _Message type;
// Create a Queue
_Queue() : _M_pHead(NULL), _M_ppTail(&_M_pHead), _M_count(0)
{
}
// Destroy the queue
~_Queue()
{
// Delete any messages that may be remaining in the queue
_Message * _Msg = _Dequeue();
while (_Msg != NULL)
{
delete _Msg;
_Msg = _Dequeue();
}
}
// Returns the count of items in the queue
size_t _Count() const
{
return _M_count;
}
// Add an item to the tail of the queue
//
// Returns a Boolean indicating whether the operation succeeded.
bool _Enqueue(_Message *_Element)
{
_ASSERTE(_Element->_M_pNext == NULL);
_ASSERTE(*_M_ppTail == NULL);
*_M_ppTail = _Element;
_Element->_M_pNext = NULL;
_M_ppTail = &(_Element->_M_pNext);
_M_count++;
return true;
}
// Remove the specified element from the queue
//
// Returns a Boolean indicating whether the operation succeeded, i.e. that the message was found in the queue.
bool _Remove(_Message * _OldElement)
{
bool _Result = false;
_ASSERTE(_OldElement != NULL);
if (_M_pHead == _OldElement)
{
_M_pHead = _OldElement->_M_pNext;
if (_M_pHead == NULL)
{
_M_ppTail = &_M_pHead;
}
_OldElement->_M_pNext = NULL;
_M_count--;
_Result = true;
}
else
{
_Message * _Next = NULL;
for (_Message * _Node = _M_pHead; _Node != NULL; _Node = _Next)
{
_Next = _Node->_M_pNext;
if (_Node->_M_pNext == _OldElement)
{
_Node->_M_pNext = _OldElement->_M_pNext;
// if this is the last element of the _Queue
if (_Node->_M_pNext == NULL && _M_count == 1)
{
_M_ppTail = &_M_pHead;
}
_OldElement->_M_pNext = NULL;
_M_count--;
_Result = true;
break;
}
}
}
return _Result;
}
// Dequeue an item from the head of queue
//
// Returns a pointer to the message found at the head of the queue.
_Message * _Dequeue()
{
if (_M_pHead == NULL)
{
return NULL;
}
_Message * _Result = _M_pHead;
_M_pHead = _Result->_M_pNext;
if (_M_pHead == NULL)
{
_M_ppTail = &_M_pHead;
}
_Result->_M_pNext = NULL;
_M_count--;
return _Result;
}
// Return the item at the head of the queue, without dequeuing
//
// Returns a pointer to the message found at the head of the queue.
_Message * _Peek()
{
return _M_pHead;
}
// Return true if the id matches the message at the head of the queue
bool _Is_head(runtime_object_identity _MsgId)
{
// Peek at the next message in the message buffer. Use it to
// check if the ids match
_Message * _Msg = _M_pHead;
if (_Msg == NULL || _Msg->msg_id() != _MsgId)
{
return false;
}
return true;
}
};
//
// The async_send_queue class puts a lock around the Enqueue/Dequeue operations on
// a Queue. This class is here mainly to support the message propagation queue
// It holds incoming messages and will be written to on async_send(). Messages in
// this queue will be moved to the internal storage of the block within an LWT.
//
template<class _Message>
class _Async_send_queue
{
public:
// Create an async send queue
_Async_send_queue()
{
}
// Return the count of items in the async send queue
size_t _Count() const
{
return _M_queue._Count();
}
// Add an item to the back of the async send queue
//
// Returns a Boolean indicating whether the operation succeeded.
bool _Enqueue(_Message * _Element)
{
_NR_lock _Lock(_M_lock);
return (_M_queue._Enqueue(_Element));
}
// Remove an item from the async send queue
//
// Returns a Boolean indicating whether the operation succeeded, i.e. that the message was found in the async send queue.
bool _Remove(_Message * _OldElement)
{
_NR_lock _Lock(_M_lock);
return (_M_queue._Remove(_OldElement));
}
// Remove an item from the head of the async send queue
//
// Returns a pointer to the message found at the head of the async send queue.
_Message * _Dequeue()
{
_NR_lock _Lock(_M_lock);
return (_M_queue._Dequeue());
}
// Return the item at the head of the async send queue, without dequeuing
//
// Returns a pointer to the message found at the head of the async send queue.
_Message * _Peek()
{
return _M_queue._Peek();
}
private:
// A lock to guard the queue
::Concurrency::details::_NonReentrantPPLLock _M_lock;
// Underlying queue
_Queue<_Message> _M_queue;
};
//
// _Dynamic_array implements a container very similar to std::vector.
// However, it exposes a reduced subset of functionality that is
// geared towards use in network_link_registry. The array acess is not
// thread-safe.
//
template<class _Type>
class _Dynamic_array
{
public:
typedef _Dynamic_array<_Type> _Myt;
typedef _Type& reference;
typedef _Type const& const_reference;
//
// Construct a dynamic array
//
_Dynamic_array()
{
_Init();
}
//
// Release any resources used by dynamic array
//
~_Dynamic_array()
{
_Clear();
}
//
// Assignment operator. Copy the contents of _Right
//
_Myt& operator=(const _Myt& _Right)
{
if (this != &_Right)
{
// Remove all the elements
_Clear();
// Allocate space for the new elements
size_t _Size = _Right._Size();
_Grow(_Size);
// Copy over the new elements
for (size_t _I=0; _I < _Size; _I++)
{
_Push_back(_Right[_I]);
}
}
return *this;
}
//
// Clear all the elements in the array
//
void _Clear()
{
if (_M_array != NULL)
{
delete [] _M_array;
_Init();
}
}
//
// Add an element to the end of the array
//
void _Push_back(_Type const& _Element)
{
if (_M_index >= _M_size)
{
// Not enough space. Grow the array
size_t _NewSize = (_M_index + 1) * _S_growthFactor;
_Grow(_NewSize);
}
_ASSERTE(_M_index < _M_size);
_M_array[_M_index] = _Element;
_M_index++;
}
//
// Index operation. Retrieve an element at the specified index. No bounds check is done.
//
reference operator[](size_t _Pos)
{
_ASSERTE(_Pos < _M_size);
return _M_array[_Pos];
}
//
// Index operation. Retrieve an element at the specified index. No bounds check is done.
//
const_reference operator[](size_t _Pos) const
{
_ASSERTE(_Pos < _M_size);
return _M_array[_Pos];
}
//
// Returns the count of elements in the array
//
size_t _Size() const
{
return _M_index;
}
//
// Swap the contents of this array with _Right
//
void _Swap(_Myt& _Right)
{
if (this != &_Right)
{
// Swap the details.
_Type * _Array = _M_array;
size_t _Index = _M_index;
size_t _Size = _M_size;
_M_array = _Right._M_array;
_M_index = _Right._M_index;
_M_size = _Right._M_size;
_Right._M_array = _Array;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -