📄 agents.h
字号:
// Internal lock used for synchronization
_LockType _M_lock;
// Count to indicate that an iterator is active
volatile long _M_iteratorCount;
// A vector of all pending link remove operations
::Concurrency::details::_Dynamic_array<_EType> _M_pendingRemove;
// Underlying link registry
_LinkRegistry _M_links;
// Target block holding this source link manager
ITarget<typename _Block::source_type> * volatile _M_pLinkedTarget;
};
/// <summary>
/// The valid responses for an offer of a <c>message</c> object to a block.
/// </summary>
/**/
enum message_status
{
/// <summary>
/// The target accepted the message.
/// </summary>
/**/
accepted,
/// <summary>
/// The target did not accept the message.
/// </summary>
/**/
declined,
/// <summary>
/// The target postponed the message.
/// </summary>
/**/
postponed,
/// <summary>
/// The target tried to accept the message, but it was no longer available.
/// </summary>
/**/
missed
};
/// <summary>
/// The basic message envelope containing the data payload being passed between
/// messaging blocks.
/// </summary>
/// <typeparam name="_Type">
/// The data type of the payload within the message.
/// </typeparam>
/// <remarks>
/// For more information, see <see cref="Asynchronous Message Blocks"/>.
/// </remarks>
/**/
template<class _Type>
class message : public ::Concurrency::details::_Runtime_object
{
friend class ::Concurrency::details::_Queue<message<_Type>>;
friend class ::Concurrency::details::_Async_send_queue<message<_Type>>;
public:
/// <summary>
/// Constructs a <c>message</c> object.
/// </summary>
/// <param name="_P">
/// The payload of this message.
/// </param>
/// <remarks>
/// This method throws an <see cref="invalid_argument Class">invalid_argument</see> exception
/// if the parameter <paramref name="_Msg"/> is <c>NULL</c>.
/// </remarks>
/**/
message(_Type const &_P) : payload(_P), _M_pNext(NULL), _M_refCount(0) { }
/// <summary>
/// Constructs a <c>message</c> object.
/// </summary>
/// <param name="_Msg">
/// A reference or pointer to a <c>message</c> object.
/// </param>
/// <remarks>
/// This method throws an <see cref="invalid_argument Class">invalid_argument</see> exception
/// if the parameter <paramref name="_Msg"/> is <c>NULL</c>.
/// </remarks>
/**/
message(message const & _Msg) : payload(_Msg.payload), _M_pNext(NULL), _M_refCount(0) { }
/// <summary>
/// Constructs a <c>message</c> object.
/// </summary>
/// <param name="_Msg">
/// A reference or pointer to a <c>message</c> object.
/// </param>
/// <remarks>
/// This method throws an <see cref="invalid_argument Class">invalid_argument</see> exception
/// if the parameter <paramref name="_Msg"/> is <c>NULL</c>.
/// </remarks>
/**/
message(message const * _Msg) : payload((_Msg == NULL) ? NULL : _Msg->payload), _M_pNext(NULL), _M_refCount(0)
{
if (_Msg == NULL)
{
throw std::invalid_argument("_Msg");
}
}
/// <summary>
/// Destroys the <c>message</c> object.
/// </summary>
/**/
virtual ~message() { }
/// <summary>
/// Returns the ID of the <c>message</c> object.
/// </summary>
/// <returns>
/// The <c>runtime_object_identity</c> of the <c>message</c> object.
/// </returns>
/**/
runtime_object_identity msg_id() const
{
return _M_id;
}
/// <summary>
/// The payload of the <c>message</c> object.
/// </summary>
/**/
_Type const payload;
/// <summary>
/// Adds to the reference count for the <c>message</c> object. Used for message blocks that
/// need reference counting to determine message lifetimes.
/// </summary>
/// <returns>
/// The new value of the reference count.
/// </returns>
/**/
long add_ref()
{
return _InterlockedIncrement(&_M_refCount);
}
/// <summary>
/// Subtracts from the reference count for the <c>message</c> object. Used for message blocks that
/// need reference counting to determine message lifetimes.
/// </summary>
/// <returns>
/// The new value of the reference count.
/// </returns>
/**/
long remove_ref()
{
return _InterlockedDecrement(&_M_refCount);
}
/// <summary>
/// A type alias for <typeparamref name="_Type"/>.
/// </summary>
/**/
typedef typename _Type type;
private:
// The intrusive next pointer used by blocks that need
// to chain messages it's holding together
message * _M_pNext;
// Avoid warnings about not generating assignment operators.
message<_Type> const &operator =(message<_Type> const &);
// A reference count for the message
volatile long _M_refCount;
};
//**************************************************************************
// Message processor:
//**************************************************************************
/// <summary>
/// The <c>message_processor</c> class is the abstract base class for processing of
/// <c>message</c> objects. There is no guarantee on the ordering of the messages.
/// </summary>
/// <typeparam name="_Type">
/// The data type of the payload within messages handled by this <c>message_processor</c> object.
/// </typeparam>
/// <seealso cref="ordered_message_processor Class"/>
/**/
template<class _Type>
class message_processor
{
public:
/// <summary>
/// A type alias for <typeparamref name="_Type"/>.
/// </summary>
/**/
typedef typename _Type type;
/// <summary>
/// When overridden in a derived class, places messages into the block asynchronously.
/// </summary>
/// <param name="_Msg">
/// A <c>message</c> object to send asynchronously.
/// </param>
/// <remarks>
/// Processor implementations should override this method.
/// </remarks>
/**/
virtual void async_send(message<_Type> * _Msg) = 0;
/// <summary>
/// When overridden in a derived class, places messages into the block synchronously.
/// </summary>
/// <param name="_Msg">
/// A <c>message</c> object to send synchronously.
/// </param>
/// <remarks>
/// Processor implementations should override this method.
/// </remarks>
/**/
virtual void sync_send(message<_Type> * _Msg) = 0;
/// <summary>
/// When overridden in a derived class, waits for all asynchronous operations to complete.
/// </summary>
/// <remarks>
/// Processor implementations should override this method.
/// </remarks>
/**/
virtual void wait() = 0;
protected:
/// <summary>
/// When overridden in a derived class, performs the forward processing of
/// messages into the block. Called once every time a new message is added and
/// the queue is found to be empty.
/// </summary>
/// <remarks>
/// Message block implementations should override this method.
/// </remarks>
/**/
virtual void process_incoming_message() = 0;
/// <summary>
/// Wrapper for <c>process_incoming_message</c> suitable for use as a argument to
/// <c>CreateThread</c> and other similar methods.
/// </summary>
/// <param name="_Data">
/// A pointer to a message processor passed as a void pointer.
/// </param>
/**/
static void __cdecl _Process_incoming_message_wrapper(void * _Data)
{
message_processor<_Type> * _PMessageProcessor = (message_processor<_Type> *) _Data;
_PMessageProcessor->process_incoming_message();
}
};
/// <summary>
/// An <c>ordered_message_processor</c> is a <c>message_processor</c> that allows message blocks
/// to process messages in the order they were received.
/// </summary>
/// <typeparam name="_Type">
/// The payload type of messages handled by the processor.
/// </typeparam>
/**/
template<class _Type>
class ordered_message_processor : public message_processor<_Type>
{
public:
/// <summary>
/// The signature of the callback method invoked while processing messages.
/// </summary>
/**/
typedef std::tr1::function<void(message<_Type> *)> _Handler_method;
/// <summary>
/// A type alias for <typeparamref name="_Type"/>.
/// </summary>
/**/
typedef _Type type;
/// <summary>
/// Constructs an <c>ordered_message_processor</c> object.
/// </summary>
/// <remarks>
/// This <c>ordered_message_processor</c> will not schedule asynchronous or synchronous
/// handlers until the <c>initialize</c> function is called.
/// </remarks>
/**/
ordered_message_processor() :
_M_queuedDataCount(0),
_M_stopProcessing(1),
_M_lwtCount(0),
_M_pScheduler(NULL),
_M_pScheduleGroup(NULL),
_M_handler(NULL)
{
}
/// <summary>
/// Destroys the <c>ordered_message_processor</c> object.
/// </summary>
/// <remarks>
/// Waits for all outstanding asynchronous operations before destroying the processor.
/// </remarks>
/**/
virtual ~ordered_message_processor()
{
wait();
}
/// <summary>
/// Initializes the <c>ordered_message_processor</c> object with the appropriate
/// callback function, scheduler and schedule group.
/// </summary>
/// <param name="_PScheduler">
/// A pointer to the scheduler to be used for scheduling light-weight tasks.
/// </param>
/// <param name="_PScheduleGroup">
/// A pointer to the schedule group to be used for scheduling light-weight tasks.
/// </param>
/// <param name="_Handler">
/// The handler functor invoked during callback.
/// </param>
/// <seealso cref="Scheduler Class"/>
/// <seealso cref="ScheduleGroup Class"/>
/**/
void initialize(Scheduler * _PScheduler, ScheduleGroup * _PScheduleGroup, _Handler_method const& _Handler)
{
_M_pScheduler = _PScheduler;
_M_pScheduleGroup = _PScheduleGroup;
_M_handler = _Handler;
_M_stopProcessing = 0;
}
/// <summary>
/// Synchronously queues up messages and starts a processing task, if this has not been done
/// already.
/// </summary>
/// <param name="_Msg">
/// A pointer to a message.
/// </param>
/**/
virtual void sync_send(message<_Type> * _Msg)
{
if (_M_handler == NULL)
{
throw invalid_operation("sync_send called without registering a callback");
}
// Indicate that an LWT is in progress. This will cause the
// destructor to block.
_InterlockedIncrement(&_M_lwtCount);
// Message block destructors sets the _M_stopProcessing flag to stop
// processing any more messages. This is required to guarantee
// that the destructor's wait_for_async_sends will complete
if (_M_stopProcessing != 0)
{
// Destructor is running. Do not process the message
// Delete the msg, if any.
if (_Msg != NULL)
{
delete _Msg;
}
}
else
{
if (_Msg != NULL)
{
_M_queuedMessages._Enqueue(_Msg);
}
_InterlockedIncrement(&_M_queuedDataCount);
_Process_message_helper();
}
// If we get there then no task was scheduled. Decrement LWT count to reflect this fact
_InterlockedDecrement(&_M_lwtCount);
}
/// <summary>
/// Asynchronously queues up messages and starts a processing task, if this has not been done
/// already.
/// </summary>
/// <param name="_Msg">
/// A pointer to a message.
/// </param>
/**/
virtual void async_send(message<_Type> * _Msg)
{
if (_M_handler == NULL)
{
throw invalid_operation("async_send called without registering a callback");
}
// Indicate that an LWT is in progress. This will cause the
// destructor to block.
_InterlockedIncrement(&_M_lwtCount);
// Message block destructors sets the _M_stopProcessing flag to stop
// processing any more messages. This is required to guarantee
// that the destructor's wait_for_async_Sends will complete
if (_M_stopProcessing != 0)
{
// Destructor is running. Do not scheduler an LWT.
// Delete the msg, if any.
if (_Msg != NULL)
{
delete _Msg;
}
}
else
{
//
// If there is a message to send, enqueue it in the processing queue.
// async_send can be sent a NULL message if the block wishes to reprocess
// the messages that are in its queue. For example, an unbounded_buffer
// that has its head node released after reservation.
//
if (_Msg != NULL)
{
_M_queuedMessages._Enqueue(_Msg);
}
if (_InterlockedIncrement(&_M_queuedDataCount) == 1)
{
_ASSERTE(_M_lwtCount > 0);
TaskProc _Proc = &::Concurrency::ordered_message_processor<_Type>::_Process_incoming_message_wrapper;
if (_M_pScheduleGroup != NULL)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -