buffer_stream.cpp

来自「ace开发环境 用来开发网络程序 其运用了设计模式、多平台、C++等多种知识」· C++ 代码 · 共 315 行

CPP
315
字号
// $Id: buffer_stream.cpp 78962 2007-07-20 03:27:14Z sowayaa $// This short program copies stdin to stdout via the use of an ASX// Stream.  It illustrates an implementation of the classic "bounded// buffer" program using an ASX Stream containing two Modules.  Each// ACE_Module contains two Tasks.  Each ACE_Task contains a// ACE_Message_Queue and a pointer to a ACE_Thread_Manager.  Note how// the use of these reusable components reduces the reliance on global// variables, as compared with the bounded_buffer.C example.#include "ace/OS_main.h"#include "ace/OS_NS_stdio.h"#include "ace/OS_NS_string.h"#include "ace/OS_NS_time.h"#include "ace/OS_NS_unistd.h"#include "ace/Service_Config.h"#include "ace/Stream.h"#include "ace/Module.h"#include "ace/Task.h"ACE_RCSID(Message_Queue, buffer_stream, "$Id: buffer_stream.cpp 78962 2007-07-20 03:27:14Z sowayaa $")#if defined (ACE_HAS_THREADS)typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;typedef ACE_Module<ACE_MT_SYNCH> MT_Module;typedef ACE_Task<ACE_MT_SYNCH> MT_Task;class Common_Task : public MT_Task  // = TITLE  //   Methods that are common to the producer and consumer.{public:  Common_Task (void) {}  //FUZZ: disable check_for_lack_ACE_OS  // ACE_Task hooks  virtual int open (void * = 0);  virtual int close (u_long = 0);  //FUZZ: enable check_for_lack_ACE_OS};// Define the Producer interface.class Producer : public Common_Task{public:  Producer (void) {}  // Read data from stdin and pass to consumer.  virtual int svc (void);};class Consumer : public Common_Task  // = TITLE  //    Define the Consumer interface.{public:  Consumer (void) {}  virtual int put (ACE_Message_Block *mb,                   ACE_Time_Value *tv = 0);  // Enqueue the message on the ACE_Message_Queue for subsequent  // handling in the svc() method.  virtual int svc (void);  // Receive message from producer and print to stdout.private:  ACE_Time_Value timeout_;};class Filter : public MT_Task  // = TITLE  //    Defines a Filter that prepends a line number in front of each  //    line.{public:  Filter (void): count_ (1) {}  virtual int put (ACE_Message_Block *mb,                   ACE_Time_Value *tv = 0);  // Change the size of the message before passing it downstream.private:  size_t count_;  // Count the number of lines passing through the filter.};// Spawn off a new thread.intCommon_Task::open (void *){  if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("spawn")),                      -1);  return 0;}intCommon_Task::close (u_long exit_status){  ACE_DEBUG ((LM_DEBUG,              ACE_TEXT ("(%t) thread is exiting with status %d in module %s\n"),              exit_status,              this->name ()));  // Can do anything here that is required when a thread exits, e.g.,  // storing thread-specific information in some other storage  // location, etc.  return 0;}// The Consumer reads data from the stdin stream, creates a message,// and then queues the message in the message list, where it is// removed by the consumer thread.  A 0-sized message is enqueued when// there is no more data to read.  The consumer uses this as a flag to// know when to exit.intProducer::svc (void){  // Keep reading stdin, until we reach EOF.  for (int n; ; )    {      // Allocate a new message (add one to avoid nasty boundary      // conditions).      ACE_Message_Block *mb = 0;      ACE_NEW_RETURN (mb,                      ACE_Message_Block (BUFSIZ + 1),                      -1);      n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), BUFSIZ);      if (n <= 0)        {          // Send a shutdown message to the other thread and exit.          mb->length (0);          if (this->put_next (mb) == -1)            ACE_ERROR ((LM_ERROR,                        ACE_TEXT ("(%t) %p\n"),                        ACE_TEXT ("put_next")));          break;        }      // Send the message to the other thread.      else        {          mb->wr_ptr (n);          // NUL-terminate the string (since we use strlen() on it          // later).          mb->rd_ptr ()[n] = '\0';          if (this->put_next (mb) == -1)          ACE_ERROR ((LM_ERROR,                      ACE_TEXT ("(%t) %p\n"),                      ACE_TEXT ("put_next")));        }    }  return 0;}// Simply enqueue the Message_Block into the end of the queue.intConsumer::put (ACE_Message_Block *mb, ACE_Time_Value *tv){  return this->putq (mb, tv);}// The consumer dequeues a message from the ACE_Message_Queue, writes// the message to the stderr stream, and deletes the message.  The// Consumer sends a 0-sized message to inform the consumer to stop// reading and exit.intConsumer::svc (void){  int result = 0;  // Keep looping, reading a message out of the queue, until we  // timeout or get a message with a length == 0, which signals us to  // quit.  for (;;)    {      ACE_Message_Block *mb = 0;      // Wait for upto 4 seconds.      this->timeout_.sec (ACE_OS::time (0) + 4);      result = this->getq (mb, &this->timeout_);      if (result == -1)        break;      int length = mb->length ();      if (length > 0)        ACE_OS::write (ACE_STDOUT,                       mb->rd_ptr (),                       ACE_OS::strlen (mb->rd_ptr ()));      mb->release ();      if (length == 0)        break;    }  if (result == -1 && errno == EWOULDBLOCK)    ACE_ERROR ((LM_ERROR,                ACE_TEXT ("(%t) %p\n%a"),                ACE_TEXT ("timed out waiting for message"),                1));  return 0;}intFilter::put (ACE_Message_Block *mb,             ACE_Time_Value *tv){  if (mb->length () == 0)    return this->put_next (mb, tv);  else    {      char buf[BUFSIZ];      // Stash a copy of the buffer away.      ACE_OS::strncpy (buf, mb->rd_ptr (), sizeof buf);      // Increase the size of the buffer large enough that it will be      // reallocated (in order to test the reallocation mechanisms).      mb->size (mb->length () + BUFSIZ);      mb->length (mb->size ());      // Prepend the line count in front of the buffer.      ACE_OS::sprintf (mb->rd_ptr (),                       ACE_SIZE_T_FORMAT_SPECIFIER                       ": %s",                       this->count_++,                       buf);      return this->put_next (mb, tv);    }}// Main driver function.intACE_TMAIN (int, ACE_TCHAR *argv[]){  ACE_Service_Config daemon (argv[0]);  // This Stream controls hierachically-related active objects.  MT_Stream stream;  MT_Module *pm = 0;  MT_Module *fm = 0;  MT_Module *cm = 0;  ACE_NEW_RETURN (cm,                  MT_Module (ACE_TEXT ("Consumer"),                             new Consumer),                  -1);  ACE_NEW_RETURN (fm,                  MT_Module (ACE_TEXT ("Filter"),                             new Filter),                  -1);  ACE_NEW_RETURN (pm,                  MT_Module (ACE_TEXT ("Producer"),                             new Producer),                  -1);  // Create Consumer, Filter, and Producer Modules and push them onto  // the Stream.  All processing is performed in the Stream.  if (stream.push (cm) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("push")),                      1);  else if (stream.push (fm) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("push")),                      1);  else if (stream.push (pm) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("push")),                      1);  // Barrier synchronization: wait for the threads to exit, then exit  // ourselves.  ACE_Thread_Manager::instance ()->wait ();  return 0;}#elseintACE_TMAIN (int, ACE_TCHAR *[]){  ACE_ERROR ((LM_ERROR,              ACE_TEXT ("threads not supported on this platform\n")));  return 0;}#endif /* ACE_HAS_THREADS */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?