bounded_buffer.cpp

来自「ace开发环境 用来开发网络程序 其运用了设计模式、多平台、C++等多种知识」· C++ 代码 · 共 141 行

CPP
141
字号
// $Id: bounded_buffer.cpp 63209 2005-01-10 10:28:08Z jwillemsen $// This short program copies stdin to stdout via the use of an ASX// Message_Queue.  It illustrates an implementation of the classic// "bounded buffer" program.#include "ace/Message_Queue.h"#include "ace/Thread_Manager.h"#include "ace/OS_NS_time.h"#include "ace/OS_NS_unistd.h"ACE_RCSID(Message_Queue, bounded_buffer, "$Id: bounded_buffer.cpp 63209 2005-01-10 10:28:08Z jwillemsen $")#if defined (ACE_HAS_THREADS)// The producer reads data from the stdin stream, creates a message,// and then queues the message in the message list, where it is// removed by the consumer thread.  A 0-sized message is enqueued when// there is no more data to read.  The consumer uses this as a flag to// know when to exit.static void *producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue){  // Keep reading stdin, until we reach EOF.  for (int n; ; )    {      // Allocate a new message.      ACE_Message_Block *mb;      ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ), 0);      n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), mb->size ());      if (n <= 0)        {          // Send a shutdown message to the other thread and exit.          mb->length (0);          if (msg_queue->enqueue_tail (mb) == -1)            ACE_ERROR ((LM_ERROR,                        "(%t) %p\n",                        "put_next"));      	  break;        }      // Send the message to the other thread.      else	{	  mb->msg_priority (n);	  mb->wr_ptr (n);	  if (msg_queue->enqueue_tail (mb) == -1)	    ACE_ERROR ((LM_ERROR,                        "(%t) %p\n",                        "put_next"));	}    }  return 0;}// The consumer dequeues a message from the ACE_Message_Queue, writes// the message to the stderr stream, and deletes the message.  The// producer sends a 0-sized message to inform the consumer to stop// reading and exit.static void *consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue){  int result = 0;  // Keep looping, reading a message out of the queue, until we timeout  // or get a message with a length == 0, which signals us to quit.  for (;;)    {      ACE_Message_Block *mb;      ACE_Time_Value timeout (ACE_OS::time (0) + 4, 0); // Wait for upto 4 seconds      result = msg_queue->dequeue_head (mb, &timeout);      if (result == -1)	break;      int length = mb->length ();      if (length > 0)	ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length);      mb->release ();      if (length == 0)	break;    }  if (result == -1 && errno == EWOULDBLOCK)    ACE_ERROR ((LM_ERROR,                "(%t) %p\n%a",                "timed out waiting for message",                1));  return 0;}// Spawn off two threads that copy stdin to stdout.intACE_TMAIN (int, ACE_TCHAR *[]){  // Message list.  ACE_Message_Queue<ACE_MT_SYNCH> msg_queue;  if (ACE_Thread_Manager::instance ()->spawn      (ACE_THR_FUNC (producer),       (void *) &msg_queue,       THR_NEW_LWP | THR_DETACHED) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       "%p\n",                       "spawn"),                      1);  else if (ACE_Thread_Manager::instance ()->spawn           (ACE_THR_FUNC (consumer),            (void *) &msg_queue,            THR_NEW_LWP | THR_DETACHED) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       "%p\n",                       "spawn"),                      1);  // Wait for producer and consumer threads to exit.  ACE_Thread_Manager::instance ()->wait ();  return 0;}#elseintACE_TMAIN (int, ACE_TCHAR *[]){  ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));  return 0;}#endif /* ACE_HAS_THREADS */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?