message_queue.hpp
来自「Boost provides free peer-reviewed portab」· HPP 代码 · 共 632 行 · 第 1/2 页
HPP
632 行
////////////////////////////////////////////////////////////////////////////////// (C) Copyright Ion Gaztanaga 2005-2008. Distributed under the Boost// Software License, Version 1.0. (See accompanying file// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)//// See http://www.boost.org/libs/interprocess for documentation.////////////////////////////////////////////////////////////////////////////////#ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP#define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP#include <boost/interprocess/detail/config_begin.hpp>#include <boost/interprocess/detail/workaround.hpp>#include <boost/interprocess/shared_memory_object.hpp>#include <boost/interprocess/detail/managed_open_or_create_impl.hpp>#include <boost/interprocess/sync/interprocess_condition.hpp>#include <boost/interprocess/sync/interprocess_mutex.hpp>#include <boost/interprocess/detail/utilities.hpp>#include <boost/interprocess/offset_ptr.hpp>#include <boost/interprocess/creation_tags.hpp>#include <boost/interprocess/exceptions.hpp>#include <boost/detail/no_exceptions_support.hpp>#include <boost/interprocess/detail/type_traits.hpp>#include <algorithm> //std::lower_bound#include <cstddef> //std::size_t#include <cstring> //memcpy//!\file//!Describes an inter-process message queue. This class allows sending//!messages between processes and allows blocking, non-blocking and timed//!sending and receiving.namespace boost{ namespace interprocess{//!A class that allows sending messages//!between processes.class message_queue{ /// @cond //Blocking modes enum block_t { blocking, timed, non_blocking }; message_queue(); /// @endcond public: //!Creates a process shared message queue with name "name". For this message queue, //!the maximum number of messages will be "max_num_msg" and the maximum message size //!will be "max_msg_size". Throws on error and if the queue was previously created. message_queue(create_only_t create_only, const char *name, std::size_t max_num_msg, std::size_t max_msg_size); //!Opens or creates a process shared message queue with name "name". //!If the queue is created, the maximum number of messages will be "max_num_msg" //!and the maximum message size will be "max_msg_size". If queue was previously //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters //!are ignored. Throws on error. message_queue(open_or_create_t open_or_create, const char *name, std::size_t max_num_msg, std::size_t max_msg_size); //!Opens a previously created process shared message queue with name "name". //!If the was not previously created or there are no free resources, //!throws an error. message_queue(open_only_t open_only, const char *name); //!Destroys *this and indicates that the calling process is finished using //!the resource. All opened message queues are still //!valid after destruction. The destructor function will deallocate //!any system resources allocated by the system for use by this process for //!this resource. The resource can still be opened again calling //!the open constructor overload. To erase the message queue from the system //!use remove(). ~message_queue(); //!Sends a message stored in buffer "buffer" with size "buffer_size" in the //!message queue with priority "priority". If the message queue is full //!the sender is blocked. Throws interprocess_error on error.*/ void send (const void *buffer, std::size_t buffer_size, unsigned int priority); //!Sends a message stored in buffer "buffer" with size "buffer_size" through the //!message queue with priority "priority". If the message queue is full //!the sender is not blocked and returns false, otherwise returns true. //!Throws interprocess_error on error. bool try_send (const void *buffer, std::size_t buffer_size, unsigned int priority); //!Sends a message stored in buffer "buffer" with size "buffer_size" in the //!message queue with priority "priority". If the message queue is full //!the sender retries until time "abs_time" is reached. Returns true if //!the message has been successfully sent. Returns false if timeout is reached. //!Throws interprocess_error on error. bool timed_send (const void *buffer, std::size_t buffer_size, unsigned int priority, const boost::posix_time::ptime& abs_time); //!Receives a message from the message queue. The message is stored in buffer //!"buffer", which has size "buffer_size". The received message has size //!"recvd_size" and priority "priority". If the message queue is empty //!the receiver is blocked. Throws interprocess_error on error. void receive (void *buffer, std::size_t buffer_size, std::size_t &recvd_size,unsigned int &priority); //!Receives a message from the message queue. The message is stored in buffer //!"buffer", which has size "buffer_size". The received message has size //!"recvd_size" and priority "priority". If the message queue is empty //!the receiver is not blocked and returns false, otherwise returns true. //!Throws interprocess_error on error. bool try_receive (void *buffer, std::size_t buffer_size, std::size_t &recvd_size,unsigned int &priority); //!Receives a message from the message queue. The message is stored in buffer //!"buffer", which has size "buffer_size". The received message has size //!"recvd_size" and priority "priority". If the message queue is empty //!the receiver retries until time "abs_time" is reached. Returns true if //!the message has been successfully sent. Returns false if timeout is reached. //!Throws interprocess_error on error. bool timed_receive (void *buffer, std::size_t buffer_size, std::size_t &recvd_size,unsigned int &priority, const boost::posix_time::ptime &abs_time); //!Returns the maximum number of messages allowed by the queue. The message //!queue must be opened or created previously. Otherwise, returns 0. //!Never throws std::size_t get_max_msg() const; //!Returns the maximum size of message allowed by the queue. The message //!queue must be opened or created previously. Otherwise, returns 0. //!Never throws std::size_t get_max_msg_size() const; //!Returns the number of messages currently stored. //!Never throws std::size_t get_num_msg(); //!Removes the message queue from the system. //!Returns false on error. Never throws static bool remove(const char *name); /// @cond private: typedef boost::posix_time::ptime ptime; bool do_receive(block_t block, void *buffer, std::size_t buffer_size, std::size_t &recvd_size, unsigned int &priority, const ptime &abs_time); bool do_send(block_t block, const void *buffer, std::size_t buffer_size, unsigned int priority, const ptime &abs_time); //!Returns the needed memory size for the shared message queue. //!Never throws static std::size_t get_mem_size(std::size_t max_msg_size, std::size_t max_num_msg); detail::managed_open_or_create_impl<shared_memory_object> m_shmem; /// @endcond};/// @condnamespace detail {//!This header is the prefix of each message in the queueclass msg_hdr_t { public: std::size_t len; // Message length unsigned int priority;// Message priority //!Returns the data buffer associated with this this message void * data(){ return this+1; } //};//!This functor is the predicate to order stored messages by priorityclass priority_functor{ public: bool operator()(const offset_ptr<msg_hdr_t> &msg1, const offset_ptr<msg_hdr_t> &msg2) const { return msg1->priority < msg2->priority; }};//!This header is placed in the beginning of the shared memory and contains //!the data to control the queue. This class initializes the shared memory //!in the following way: in ascending memory address with proper alignment//!fillings://!//!-> mq_hdr_t: //! Main control block that controls the rest of the elements//!//!-> offset_ptr<msg_hdr_t> index [max_num_msg]//! An array of pointers with size "max_num_msg" called index. Each pointer //! points to a preallocated message. The elements of this array are //! reordered in runtime in the following way://!//! When the current number of messages is "cur_num_msg", the first //! "cur_num_msg" pointers point to inserted messages and the rest//! point to free messages. The first "cur_num_msg" pointers are//! ordered by the priority of the pointed message and by insertion order //! if two messages have the same priority. So the next message to be //! used in a "receive" is pointed by index [cur_num_msg-1] and the first free//! message ready to be used in a "send" operation is index [cur_num_msg].//! This transforms index in a fixed size priority queue with an embedded free//! message queue.//!//!-> struct message_t//! { //! msg_hdr_t header;//! char[max_msg_size] data;//! } messages [max_num_msg];//!//! An array of buffers of preallocated messages, each one prefixed with the//! msg_hdr_t structure. Each of this message is pointed by one pointer of//! the index structure.class mq_hdr_t : public detail::priority_functor{ typedef offset_ptr<msg_hdr_t> msg_hdr_ptr_t; public: //!Constructor. This object must be constructed in the beginning of the //!shared memory of the size returned by the function "get_mem_size". //!This constructor initializes the needed resources and creates //!the internal structures like the priority index. This can throw.*/ mq_hdr_t(std::size_t max_num_msg, std::size_t max_msg_size) : m_max_num_msg(max_num_msg), m_max_msg_size(max_msg_size), m_cur_num_msg(0) { this->initialize_memory(); } //!Returns the inserted message with top priority msg_hdr_t * top_msg() { return mp_index[m_cur_num_msg-1].get(); } //!Returns true if the message queue is full bool is_full() const { return m_cur_num_msg == m_max_num_msg; } //!Returns true if the message queue is empty bool is_empty() const { return !m_cur_num_msg; } //!Frees the top priority message and saves it in the free message list void free_top_msg() { --m_cur_num_msg; } //!Returns the first free msg of the free message queue msg_hdr_t * free_msg() { return mp_index[m_cur_num_msg].get(); } //!Inserts the first free message in the priority queue void queue_free_msg() { //Get free msg msg_hdr_ptr_t free = mp_index[m_cur_num_msg]; //Get priority queue's range msg_hdr_ptr_t *it = &mp_index[0], *it_end = &mp_index[m_cur_num_msg]; //Check where the free message should be placed it = std::lower_bound(it, it_end, free, static_cast<priority_functor&>(*this)); //Make room in that position std::copy_backward(it, it_end, it_end+1); //Insert the free message in the correct position *it = free; ++m_cur_num_msg; } //!Returns the number of bytes needed to construct a message queue with //!"max_num_size" maximum number of messages and "max_msg_size" maximum //!message size. Never throws. static std::size_t get_mem_size (std::size_t max_msg_size, std::size_t max_num_msg) { const std::size_t msg_hdr_align = detail::alignment_of<detail::msg_hdr_t>::value, index_align = detail::alignment_of<msg_hdr_ptr_t>::value, r_hdr_size = detail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, r_index_size = detail::get_rounded_size(sizeof(msg_hdr_ptr_t)*max_num_msg, msg_hdr_align), r_max_msg_size = detail::get_rounded_size(max_msg_size, msg_hdr_align) + sizeof(detail::msg_hdr_t); return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) + detail::managed_open_or_create_impl<shared_memory_object>::ManagedOpenOrCreateUserOffset; } //!Initializes the memory structures to preallocate messages and constructs the //!message index. Never throws. void initialize_memory() { const std::size_t msg_hdr_align = detail::alignment_of<detail::msg_hdr_t>::value, index_align = detail::alignment_of<msg_hdr_ptr_t>::value, r_hdr_size = detail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, r_index_size = detail::get_rounded_size(sizeof(msg_hdr_ptr_t)*m_max_num_msg, msg_hdr_align), r_max_msg_size = detail::get_rounded_size(m_max_msg_size, msg_hdr_align) + sizeof(detail::msg_hdr_t); //Pointer to the index msg_hdr_ptr_t *index = reinterpret_cast<msg_hdr_ptr_t*> (reinterpret_cast<char*>(this)+r_hdr_size); //Pointer to the first message header detail::msg_hdr_t *msg_hdr = reinterpret_cast<detail::msg_hdr_t*> (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size); //Initialize the pointer to the index mp_index = index; //Initialize the index so each slot points to a preallocated message for(std::size_t i = 0; i < m_max_num_msg; ++i){ index[i] = msg_hdr;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?