message_queue.hpp
来自「Boost provides free peer-reviewed portab」· HPP 代码 · 共 632 行 · 第 1/2 页
HPP
632 行
msg_hdr = reinterpret_cast<detail::msg_hdr_t*> (reinterpret_cast<char*>(msg_hdr)+r_max_msg_size); } } public: //Pointer to the index offset_ptr<msg_hdr_ptr_t> mp_index; //Maximum number of messages of the queue const std::size_t m_max_num_msg; //Maximum size of messages of the queue const std::size_t m_max_msg_size; //Current number of messages std::size_t m_cur_num_msg; //Mutex to protect data structures interprocess_mutex m_mutex; //Condition block receivers when there are no messages interprocess_condition m_cond_recv; //Condition block senders when the queue is full interprocess_condition m_cond_send;};//!This is the atomic functor to be executed when creating or opening //!shared memory. Never throwsclass initialization_func_t{ public: initialization_func_t(std::size_t maxmsg = 0, std::size_t maxmsgsize = 0) : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {} bool operator()(void *address, std::size_t, bool created) { char *mptr; if(created){ mptr = reinterpret_cast<char*>(address); //Construct the message queue header at the beginning BOOST_TRY{ new (mptr) mq_hdr_t(m_maxmsg, m_maxmsgsize); } BOOST_CATCH(...){ return false; } BOOST_CATCH_END } return true; } const std::size_t m_maxmsg; const std::size_t m_maxmsgsize;};} //namespace detail {inline message_queue::~message_queue(){}inline std::size_t message_queue::get_mem_size (std::size_t max_msg_size, std::size_t max_num_msg){ return detail::mq_hdr_t::get_mem_size(max_msg_size, max_num_msg); }inline message_queue::message_queue(create_only_t create_only, const char *name, std::size_t max_num_msg, std::size_t max_msg_size) //Create shared memory and execute functor atomically : m_shmem(create_only, name, get_mem_size(max_msg_size, max_num_msg), read_write, static_cast<void*>(0), //Prepare initialization functor detail::initialization_func_t (max_num_msg, max_msg_size)){}inline message_queue::message_queue(open_or_create_t open_or_create, const char *name, std::size_t max_num_msg, std::size_t max_msg_size) //Create shared memory and execute functor atomically : m_shmem(open_or_create, name, get_mem_size(max_msg_size, max_num_msg), read_write, static_cast<void*>(0), //Prepare initialization functor detail::initialization_func_t (max_num_msg, max_msg_size)){}inline message_queue::message_queue(open_only_t open_only, const char *name) //Create shared memory and execute functor atomically : m_shmem(open_only, name, read_write, static_cast<void*>(0), //Prepare initialization functor detail::initialization_func_t ()){}inline void message_queue::send (const void *buffer, std::size_t buffer_size, unsigned int priority){ this->do_send(blocking, buffer, buffer_size, priority, ptime()); }inline bool message_queue::try_send (const void *buffer, std::size_t buffer_size, unsigned int priority){ return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); }inline bool message_queue::timed_send (const void *buffer, std::size_t buffer_size ,unsigned int priority, const boost::posix_time::ptime &abs_time){ if(abs_time == boost::posix_time::pos_infin){ this->send(buffer, buffer_size, priority); return true; } return this->do_send(timed, buffer, buffer_size, priority, abs_time);}inline bool message_queue::do_send(block_t block, const void *buffer, std::size_t buffer_size, unsigned int priority, const boost::posix_time::ptime &abs_time){ detail::mq_hdr_t *p_hdr = static_cast<detail::mq_hdr_t*>(m_shmem.get_user_address()); //Check if buffer is smaller than maximum allowed if (buffer_size > p_hdr->m_max_msg_size) { throw interprocess_exception(size_error); } //--------------------------------------------- scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); //--------------------------------------------- { //If the queue is full execute blocking logic if (p_hdr->is_full()) { switch(block){ case non_blocking : return false; break; case blocking : do{ p_hdr->m_cond_send.wait(lock); } while (p_hdr->is_full()); break; case timed : do{ if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)) return !p_hdr->is_full(); } while (p_hdr->is_full()); break; default: throw interprocess_exception(); } } //Get the first free message from free message queue detail::msg_hdr_t *free_msg = p_hdr->free_msg(); if (free_msg == 0) { throw interprocess_exception(); } //Copy control data to the free message free_msg->priority = priority; free_msg->len = buffer_size; //Copy user buffer to the message std::memcpy(free_msg->data(), buffer, buffer_size);// bool was_empty = p_hdr->is_empty(); //Insert the first free message in the priority queue p_hdr->queue_free_msg(); //If this message changes the queue empty state, notify it to receivers// if (was_empty){ p_hdr->m_cond_recv.notify_one();// } } // Lock end return true;}inline void message_queue::receive(void *buffer, std::size_t buffer_size, std::size_t &recvd_size, unsigned int &priority){ this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); }inline bool message_queue::try_receive(void *buffer, std::size_t buffer_size, std::size_t &recvd_size, unsigned int &priority){ return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); }inline bool message_queue::timed_receive(void *buffer, std::size_t buffer_size, std::size_t &recvd_size, unsigned int &priority, const boost::posix_time::ptime &abs_time){ if(abs_time == boost::posix_time::pos_infin){ this->receive(buffer, buffer_size, recvd_size, priority); return true; } return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time);}inline bool message_queue::do_receive(block_t block, void *buffer, std::size_t buffer_size, std::size_t &recvd_size, unsigned int &priority, const boost::posix_time::ptime &abs_time){ detail::mq_hdr_t *p_hdr = static_cast<detail::mq_hdr_t*>(m_shmem.get_user_address()); //Check if buffer is big enough for any message if (buffer_size < p_hdr->m_max_msg_size) { throw interprocess_exception(size_error); } //--------------------------------------------- scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); //--------------------------------------------- { //If there are no messages execute blocking logic if (p_hdr->is_empty()) { switch(block){ case non_blocking : return false; break; case blocking : do{ p_hdr->m_cond_recv.wait(lock); } while (p_hdr->is_empty()); break; case timed : do{ if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)) return !p_hdr->is_empty(); } while (p_hdr->is_empty()); break; //Paranoia check default: throw interprocess_exception(); } } //Thre is at least message ready to pick, get the top one detail::msg_hdr_t *top_msg = p_hdr->top_msg(); //Paranoia check if (top_msg == 0) { throw interprocess_exception(); } //Get data from the message recvd_size = top_msg->len; priority = top_msg->priority; //Copy data to receiver's bufers std::memcpy(buffer, top_msg->data(), recvd_size);// bool was_full = p_hdr->is_full(); //Free top message and put it in the free message list p_hdr->free_top_msg(); //If this reception changes the queue full state, notify senders// if (was_full){ p_hdr->m_cond_send.notify_one();// } } //Lock end return true;}inline std::size_t message_queue::get_max_msg() const{ detail::mq_hdr_t *p_hdr = static_cast<detail::mq_hdr_t*>(m_shmem.get_user_address()); return p_hdr ? p_hdr->m_max_num_msg : 0; }inline std::size_t message_queue::get_max_msg_size() const{ detail::mq_hdr_t *p_hdr = static_cast<detail::mq_hdr_t*>(m_shmem.get_user_address()); return p_hdr ? p_hdr->m_max_msg_size : 0; }inline std::size_t message_queue::get_num_msg(){ detail::mq_hdr_t *p_hdr = static_cast<detail::mq_hdr_t*>(m_shmem.get_user_address()); if(p_hdr){ //--------------------------------------------- scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); //--------------------------------------------- return p_hdr->m_cur_num_msg; } return 0; }inline bool message_queue::remove(const char *name){ return shared_memory_object::remove(name); }/// @endcond}} //namespace boost{ namespace interprocess{#include <boost/interprocess/detail/config_end.hpp>#endif //#ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?