📄 messagequeue.h
字号:
#ifndef UTILS_BASE_MESSAGEQUEUE_H_#define UTILS_BASE_MESSAGEQUEUE_H_#include <vector>#include <queue>#include <algorithm>#include "basictypes.h"#include "criticalsection.h"#include "socketserver.h"#include "time.h"namespace utils_base {struct Message;class MessageQueue;class MessageHandler;// MessageQueueManager does cleanup of of message queuesclass MessageQueueManager {public: static MessageQueueManager* Instance(); void Add(MessageQueue *message_queue); void Remove(MessageQueue *message_queue); void Clear(MessageHandler *handler);private: MessageQueueManager(); ~MessageQueueManager(); static MessageQueueManager* instance_; // This list contains 'active' MessageQueues. std::vector<MessageQueue *> message_queues_; CriticalSection crit_;};// Messages get dispatched to a MessageHandlerclass MessageHandler {public: virtual ~MessageHandler() { MessageQueueManager::Instance()->Clear(this); } virtual void OnMessage(Message *pmsg) = 0;};// Derive from this for specialized data// App manages lifetime, except when messages are purgedclass MessageData {public: MessageData() {} virtual ~MessageData() {}};template <class T>class TypedMessageData : public MessageData {public: TypedMessageData(const T& data) : data_(data) { } const T& data() const { return data_; } T& data() { return data_; }private: T data_;};template<class T>inline MessageData* WrapMessageData(const T& data) { return new TypedMessageData<T>(data);}template<class T>inline const T& UseMessageData(MessageData* data) { return static_cast< TypedMessageData<T>* >(data)->data();}template<class T>class DisposeData : public MessageData {public: DisposeData(T* data) : data_(data) { } virtual ~DisposeData() { delete data_; }private: T* data_;};const uint32 MQID_ANY = static_cast<uint32>(-1);const uint32 MQID_DISPOSE = static_cast<uint32>(-2);// No destructorstruct Message { Message() { memset(this, 0, sizeof(*this)); } MessageHandler *phandler; uint32 message_id; MessageData *pdata; uint32 ts_sensitive;};// DelayedMessage goes into a priority queue, sorted by trigger timeclass DelayedMessage {public: DelayedMessage(int cmsDelay, Message *pmsg) { cmsDelay_ = cmsDelay; msTrigger_ = GetMillisecondCount() + cmsDelay; msg_ = *pmsg; } bool operator< (const DelayedMessage& dmsg) const { return dmsg.msTrigger_ < msTrigger_; } int cmsDelay_; // for debugging uint32 msTrigger_; Message msg_;};class MessageQueue {public: MessageQueue(SocketServer* ss = 0); virtual ~MessageQueue(); SocketServer* socketserver() { return ss_; } void set_socketserver(SocketServer* ss); // Note: The behavior of MessageQueue has changed. When a MQ is stopped, // futher Posts and Sends will fail. However, any pending Sends and *ready* // Posts (as opposed to unexpired delayed Posts) will be delivered before // Get (or Peek) returns false. By guaranteeing delivery of those messages, // we eliminate the race condition when an MessageHandler and MessageQueue // may be destroyed independently of each other. virtual void Stop(); virtual bool IsStopping(); virtual void Restart(); // Get() will process I/O until: // 1) A message is available (returns true) // 2) cmsWait seconds have elapsed (returns false) // 3) Stop() is called (returns false) virtual bool Get(Message *pmsg, int cmsWait = kForever); virtual bool Peek(Message *pmsg, int cmsWait = 0); virtual void Post(MessageHandler *phandler, uint32 id = 0, MessageData *pdata = NULL, bool time_sensitive = false); virtual void PostDelayed(int cmsDelay, MessageHandler *phandler, uint32 id = 0, MessageData *pdata = NULL); virtual void Clear(MessageHandler *phandler, uint32 id = MQID_ANY); virtual void Dispatch(Message *pmsg); virtual void ReceiveSends(); virtual int GetDelay(); // Internally posts a message which causes the doomed object to be deleted template<class T> void Dispose(T* doomed) { if (doomed) { Post(NULL, MQID_DISPOSE, new utils_base::DisposeData<T>(doomed)); } }protected: void EnsureActive(); SocketServer* ss_; bool new_ss; bool fStop_; bool fPeekKeep_; Message msgPeek_; // A message queue is active if it has ever had a message posted to it. // This also corresponds to being in MessageQueueManager's global list. bool active_; std::queue<Message> msgq_; std::priority_queue<DelayedMessage> dmsgq_; CriticalSection crit_;};} // namespace utils_base#endif // UTILS_BASE_MESSAGEQUEUE_H_
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -