message_queue_test.cpp

来自「Boost provides free peer-reviewed portab」· C++ 代码 · 共 265 行

CPP
265
字号
////////////////////////////////////////////////////////////////////////////////// (C) Copyright Ion Gaztanaga 2004-2007. Distributed under the Boost// Software License, Version 1.0. (See accompanying file// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)//// See http://www.boost.org/libs/interprocess for documentation.////////////////////////////////////////////////////////////////////////////////#include <boost/interprocess/detail/config_begin.hpp>#include <boost/interprocess/ipc/message_queue.hpp>#include <boost/interprocess/managed_external_buffer.hpp>#include <boost/interprocess/managed_heap_memory.hpp>#include <boost/interprocess/containers/map.hpp>#include <boost/interprocess/containers/set.hpp>#include <boost/interprocess/allocators/node_allocator.hpp>#include <vector>#include <cstddef>#include <limits>#include <boost/thread.hpp>#include <memory>#include <string>#include "get_process_id_name.hpp"#ifdef max#undef max#endif//////////////////////////////////////////////////////////////////////////////////                                                                            ////  This example tests the process shared message queue.                      ////                                                                            //////////////////////////////////////////////////////////////////////////////////using namespace boost::interprocess;//This test inserts messages with different priority and marks them with a //time-stamp to check if receiver obtains highest priority messages first and//messages with same priority are received in fifo orderbool test_priority_order(){   message_queue::remove(test::get_process_id_name());   {      message_queue mq1         (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)),         mq2         (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t));      //We test that the queue is ordered by priority and in the       //same priority, is a FIFO      std::size_t recvd = 0;      unsigned int priority = 0;      std::size_t tstamp;      //We will send 100 message with priority 0-9      //The message will contain the timestamp of the message      for(std::size_t i = 0; i < 100; ++i){         tstamp = i;         mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10));      }      unsigned int priority_prev = std::numeric_limits<unsigned int>::max();      std::size_t  tstamp_prev = 0;      //Receive all messages and test those are ordered      //by priority and by FIFO in the same priority      for(std::size_t i = 0; i < 100; ++i){         mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);         if(priority > priority_prev)            return false;         if(priority == priority_prev &&            tstamp   <= tstamp_prev){            return false;         }         priority_prev  = priority;         tstamp_prev    = tstamp;      }   }   message_queue::remove(test::get_process_id_name());   return true;}//[message_queue_test_test_serialize_db//This test creates a in memory data-base using Interprocess machinery and //serializes it through a message queue. Then rebuilds the data-base in //another buffer and checks it against the original data-basebool test_serialize_db(){   //Typedef data to create a Interprocess map      typedef std::pair<const std::size_t, std::size_t> MyPair;   typedef std::less<std::size_t>   MyLess;   typedef node_allocator<MyPair, managed_external_buffer::segment_manager>      node_allocator_t;   typedef map<std::size_t,                std::size_t,                std::less<std::size_t>,                node_allocator_t>               MyMap;   //Some constants   const std::size_t BufferSize  = 65536;   const std::size_t MaxMsgSize  = 100;   //Allocate a memory buffer to hold the destiny database using vector<char>   std::vector<char> buffer_destiny(BufferSize, 0);   message_queue::remove(test::get_process_id_name());   {      //Create the message-queues      message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize);      //Open previously created message-queue simulating other process      message_queue mq2(open_only, test::get_process_id_name());      //A managed heap memory to create the origin database      managed_heap_memory db_origin(buffer_destiny.size());      //Construct the map in the first buffer      MyMap *map1 = db_origin.construct<MyMap>("MyMap")                                       (MyLess(),                                        db_origin.get_segment_manager());      if(!map1)         return false;      //Fill map1 until is full       try{         std::size_t i = 0;         while(1){            (*map1)[i] = i;            ++i;         }      }      catch(boost::interprocess::bad_alloc &){}      //Data control data sending through the message queue      std::size_t sent = 0;      std::size_t recvd = 0;      std::size_t total_recvd = 0;      unsigned int priority;      //Send whole first buffer through the mq1, read it       //through mq2 to the second buffer      while(1){         //Send a fragment of buffer1 through mq1         std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ?                                        MaxMsgSize : (db_origin.get_size() - sent);         mq1.send( &static_cast<char*>(db_origin.get_address())[sent]               , bytes_to_send               , 0);         sent += bytes_to_send;         //Receive the fragment through mq2 to buffer_destiny         mq2.receive( &buffer_destiny[total_recvd]                  , BufferSize - recvd                  , recvd                  , priority);         total_recvd += recvd;         //Check if we have received all the buffer         if(total_recvd == BufferSize){            break;         }      }            //The buffer will contain a copy of the original database       //so let's interpret the buffer with managed_external_buffer      managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize);      //Let's find the map      std::pair<MyMap *, std::size_t> ret = db_destiny.find<MyMap>("MyMap");      MyMap *map2 = ret.first;      //Check if we have found it      if(!map2){         return false;      }      //Check if it is a single variable (not an array)      if(ret.second != 1){         return false;      }      //Now let's compare size      if(map1->size() != map2->size()){         return false;      }      //Now let's compare all db values      for(std::size_t i = 0, num_elements = map1->size(); i < num_elements; ++i){         if((*map1)[i] != (*map2)[i]){            return false;         }      }            //Destroy maps from db-s      db_origin.destroy_ptr(map1);      db_destiny.destroy_ptr(map2);   }   message_queue::remove(test::get_process_id_name());   return true;}//]static const int MsgSize = 10;static const int NumMsg  = 1000;static char msgsend [10];static char msgrecv [10];static boost::interprocess::message_queue *pmessage_queue;void receiver(){   std::size_t recvd_size;   unsigned int priority;   int nummsg = NumMsg;   while(nummsg--){      pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority);   }}bool test_buffer_overflow(){   boost::interprocess::message_queue::remove(test::get_process_id_name());   {      std::auto_ptr<boost::interprocess::message_queue>         ptr(new boost::interprocess::message_queue               (create_only, test::get_process_id_name(), 10, 10));      pmessage_queue = ptr.get();      //Launch the receiver thread      boost::thread thread(&receiver);      boost::thread::yield();      int nummsg = NumMsg;      while(nummsg--){         pmessage_queue->send(msgsend, MsgSize, 0);      }      thread.join();   }   boost::interprocess::message_queue::remove(test::get_process_id_name());   return true;}int main (){   if(!test_priority_order()){       return 1;   }   if(!test_serialize_db()){       return 1;   }   if(!test_buffer_overflow()){       return 1;   }   return 0;}#include <boost/interprocess/detail/config_end.hpp>

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?