message_queue.hpp

来自「Boost provides free peer-reviewed portab」· HPP 代码 · 共 632 行 · 第 1/2 页

HPP
632
字号
         msg_hdr  = reinterpret_cast<detail::msg_hdr_t*>                        (reinterpret_cast<char*>(msg_hdr)+r_max_msg_size);      }   }   public:   //Pointer to the index   offset_ptr<msg_hdr_ptr_t>  mp_index;   //Maximum number of messages of the queue   const std::size_t          m_max_num_msg;   //Maximum size of messages of the queue   const std::size_t          m_max_msg_size;   //Current number of messages   std::size_t                m_cur_num_msg;   //Mutex to protect data structures   interprocess_mutex         m_mutex;   //Condition block receivers when there are no messages   interprocess_condition     m_cond_recv;   //Condition block senders when the queue is full   interprocess_condition     m_cond_send;};//!This is the atomic functor to be executed when creating or opening //!shared memory. Never throwsclass initialization_func_t{   public:   initialization_func_t(std::size_t maxmsg = 0,                          std::size_t maxmsgsize = 0)      : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {}   bool operator()(void *address, std::size_t, bool created)   {      char      *mptr;      if(created){         mptr     = reinterpret_cast<char*>(address);         //Construct the message queue header at the beginning         BOOST_TRY{            new (mptr) mq_hdr_t(m_maxmsg, m_maxmsgsize);         }         BOOST_CATCH(...){            return false;            }         BOOST_CATCH_END      }      return true;   }   const std::size_t m_maxmsg;   const std::size_t m_maxmsgsize;};}  //namespace detail {inline message_queue::~message_queue(){}inline std::size_t message_queue::get_mem_size   (std::size_t max_msg_size, std::size_t max_num_msg){  return detail::mq_hdr_t::get_mem_size(max_msg_size, max_num_msg);   }inline message_queue::message_queue(create_only_t create_only,                                    const char *name,                                     std::size_t max_num_msg,                                     std::size_t max_msg_size)      //Create shared memory and execute functor atomically   :  m_shmem(create_only,               name,               get_mem_size(max_msg_size, max_num_msg),              read_write,              static_cast<void*>(0),              //Prepare initialization functor              detail::initialization_func_t (max_num_msg, max_msg_size)){}inline message_queue::message_queue(open_or_create_t open_or_create,                                    const char *name,                                     std::size_t max_num_msg,                                     std::size_t max_msg_size)      //Create shared memory and execute functor atomically   :  m_shmem(open_or_create,               name,               get_mem_size(max_msg_size, max_num_msg),              read_write,              static_cast<void*>(0),              //Prepare initialization functor              detail::initialization_func_t (max_num_msg, max_msg_size)){}inline message_queue::message_queue(open_only_t open_only,                                    const char *name)   //Create shared memory and execute functor atomically   :  m_shmem(open_only,               name,              read_write,              static_cast<void*>(0),              //Prepare initialization functor              detail::initialization_func_t ()){}inline void message_queue::send   (const void *buffer, std::size_t buffer_size, unsigned int priority){  this->do_send(blocking, buffer, buffer_size, priority, ptime()); }inline bool message_queue::try_send   (const void *buffer, std::size_t buffer_size, unsigned int priority){  return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); }inline bool message_queue::timed_send   (const void *buffer, std::size_t buffer_size   ,unsigned int priority, const boost::posix_time::ptime &abs_time){   if(abs_time == boost::posix_time::pos_infin){      this->send(buffer, buffer_size, priority);      return true;   }   return this->do_send(timed, buffer, buffer_size, priority, abs_time);}inline bool message_queue::do_send(block_t block,                                const void *buffer,      std::size_t buffer_size,                                 unsigned int priority,   const boost::posix_time::ptime &abs_time){   detail::mq_hdr_t *p_hdr = static_cast<detail::mq_hdr_t*>(m_shmem.get_user_address());   //Check if buffer is smaller than maximum allowed   if (buffer_size > p_hdr->m_max_msg_size) {      throw interprocess_exception(size_error);   }   //---------------------------------------------   scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);   //---------------------------------------------   {      //If the queue is full execute blocking logic      if (p_hdr->is_full()) {         switch(block){            case non_blocking :               return false;            break;            case blocking :               do{                  p_hdr->m_cond_send.wait(lock);               }               while (p_hdr->is_full());            break;            case timed :               do{                  if(!p_hdr->m_cond_send.timed_wait(lock, abs_time))                     return !p_hdr->is_full();               }               while (p_hdr->is_full());            break;            default:               throw interprocess_exception();         }      }            //Get the first free message from free message queue      detail::msg_hdr_t *free_msg = p_hdr->free_msg();      if (free_msg == 0) {         throw interprocess_exception();      }      //Copy control data to the free message      free_msg->priority = priority;      free_msg->len      = buffer_size;      //Copy user buffer to the message      std::memcpy(free_msg->data(), buffer, buffer_size);//      bool was_empty = p_hdr->is_empty();      //Insert the first free message in the priority queue      p_hdr->queue_free_msg();            //If this message changes the queue empty state, notify it to receivers//      if (was_empty){         p_hdr->m_cond_recv.notify_one();//      }   }  // Lock end   return true;}inline void message_queue::receive(void *buffer,              std::size_t buffer_size,                                    std::size_t &recvd_size,   unsigned int &priority){  this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); }inline bool   message_queue::try_receive(void *buffer,              std::size_t buffer_size,                               std::size_t &recvd_size,   unsigned int &priority){  return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); }inline bool   message_queue::timed_receive(void *buffer,              std::size_t buffer_size,                                 std::size_t &recvd_size,   unsigned int &priority,                                const boost::posix_time::ptime &abs_time){   if(abs_time == boost::posix_time::pos_infin){      this->receive(buffer, buffer_size, recvd_size, priority);      return true;   }   return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time);}inline bool   message_queue::do_receive(block_t block,                          void *buffer,              std::size_t buffer_size,                           std::size_t &recvd_size,   unsigned int &priority,                          const boost::posix_time::ptime &abs_time){   detail::mq_hdr_t *p_hdr = static_cast<detail::mq_hdr_t*>(m_shmem.get_user_address());   //Check if buffer is big enough for any message   if (buffer_size < p_hdr->m_max_msg_size) {      throw interprocess_exception(size_error);   }   //---------------------------------------------   scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);   //---------------------------------------------   {      //If there are no messages execute blocking logic      if (p_hdr->is_empty()) {         switch(block){            case non_blocking :               return false;            break;            case blocking :               do{                  p_hdr->m_cond_recv.wait(lock);               }               while (p_hdr->is_empty());            break;            case timed :               do{                  if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time))                     return !p_hdr->is_empty();               }               while (p_hdr->is_empty());            break;            //Paranoia check            default:               throw interprocess_exception();         }      }      //Thre is at least message ready to pick, get the top one      detail::msg_hdr_t *top_msg = p_hdr->top_msg();      //Paranoia check      if (top_msg == 0) {         throw interprocess_exception();      }      //Get data from the message      recvd_size     = top_msg->len;      priority       = top_msg->priority;      //Copy data to receiver's bufers      std::memcpy(buffer, top_msg->data(), recvd_size);//      bool was_full = p_hdr->is_full();      //Free top message and put it in the free message list      p_hdr->free_top_msg();      //If this reception changes the queue full state, notify senders//      if (was_full){         p_hdr->m_cond_send.notify_one();//      }   }  //Lock end   return true;}inline std::size_t message_queue::get_max_msg() const{     detail::mq_hdr_t *p_hdr = static_cast<detail::mq_hdr_t*>(m_shmem.get_user_address());   return p_hdr ? p_hdr->m_max_num_msg : 0;  }inline std::size_t message_queue::get_max_msg_size() const{     detail::mq_hdr_t *p_hdr = static_cast<detail::mq_hdr_t*>(m_shmem.get_user_address());   return p_hdr ? p_hdr->m_max_msg_size : 0;  }inline std::size_t message_queue::get_num_msg(){     detail::mq_hdr_t *p_hdr = static_cast<detail::mq_hdr_t*>(m_shmem.get_user_address());   if(p_hdr){      //---------------------------------------------      scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);      //---------------------------------------------      return p_hdr->m_cur_num_msg;   }   return 0;  }inline bool message_queue::remove(const char *name){  return shared_memory_object::remove(name);  }/// @endcond}} //namespace boost{  namespace interprocess{#include <boost/interprocess/detail/config_end.hpp>#endif   //#ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?