📄 linklistener.hpp
字号:
// file : ACE_TMCast/LinkListener.hpp// author : Boris Kolpackov <boris@dre.vanderbilt.edu>// cvs-id : LinkListener.hpp,v 1.10 2006/03/01 23:48:42 shuston Exp// OS primitives#include <ace/OS_NS_string.h>#include <ace/OS_NS_stdlib.h>#include <ace/Synch.h>#include <ace/SOCK_Dgram_Mcast.h>#include <ace/Refcounted_Auto_Ptr.h>#include "Messaging.hpp"#include "Protocol.hpp"namespace ACE_TMCast{ // // // class LinkFailure : public virtual Message {}; // // // class LinkData : public virtual Message { public: LinkData (Protocol::MessageHeader const* header, void* payload, size_t size) : size_ (size) { ACE_OS::memcpy (&header_, header, sizeof (Protocol::MessageHeader)); ACE_OS::memcpy (payload_, payload, size_); } Protocol::MessageHeader const& header () const { return header_; } void const* payload () const { return payload_; } size_t size () const { return size_; } private: Protocol::MessageHeader header_; char payload_[Protocol::MAX_MESSAGE_SIZE]; size_t size_; }; typedef ACE_Refcounted_Auto_Ptr<LinkData, ACE_Null_Mutex> LinkDataPtr; // // // class LinkListener { private: class Terminate : public virtual Message {}; public: LinkListener (ACE_SOCK_Dgram_Mcast& sock, MessageQueue& out) : sock_(sock), out_ (out) { ACE_thread_t unused; if (ACE_OS::thr_create (&thread_thunk, this, THR_JOINABLE, &unused, &thread_) != 0) ACE_OS::abort (); } ~LinkListener () { { MessageQueueAutoLock lock (control_); control_.push (MessagePtr (new Terminate)); } if (ACE_OS::thr_join (thread_, 0) != 0) ACE_OS::abort (); // cerr << "Link listener is down." << endl; } static ACE_THR_FUNC_RETURN thread_thunk (void* arg) { LinkListener* obj = reinterpret_cast<LinkListener*> (arg); obj->execute (); return 0; } void execute () { char msg[Protocol::MAX_MESSAGE_SIZE]; ssize_t header_size = sizeof (Protocol::MessageHeader); // OS::Time timeout (1000000); // one millisecond ACE_Time_Value timeout (0, 1000); // one millisecond try { while (true) { // Check control message queue { MessageQueueAutoLock lock (control_); if (!control_.empty ()) break; } ACE_INET_Addr junk; ssize_t n = sock_.recv (msg, Protocol::MAX_MESSAGE_SIZE, junk, 0, &timeout); if (n != -1) { if (n < header_size) throw false; Protocol::MessageHeader* header = reinterpret_cast<Protocol::MessageHeader*> (msg); MessageQueueAutoLock lock (out_); out_.push (MessagePtr (new LinkData (header, msg + header_size, n - header_size))); } } } catch (...) { MessageQueueAutoLock lock (out_); out_.push (MessagePtr (new LinkFailure)); } } private: typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; ACE_hthread_t thread_; ACE_SOCK_Dgram_Mcast& sock_; MessageQueue& out_; MessageQueue control_; };}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -