bounded_buffer.cpp
来自「ace开发环境 用来开发网络程序 其运用了设计模式、多平台、C++等多种知识」· C++ 代码 · 共 141 行
CPP
141 行
// $Id: bounded_buffer.cpp 63209 2005-01-10 10:28:08Z jwillemsen $// This short program copies stdin to stdout via the use of an ASX// Message_Queue. It illustrates an implementation of the classic// "bounded buffer" program.#include "ace/Message_Queue.h"#include "ace/Thread_Manager.h"#include "ace/OS_NS_time.h"#include "ace/OS_NS_unistd.h"ACE_RCSID(Message_Queue, bounded_buffer, "$Id: bounded_buffer.cpp 63209 2005-01-10 10:28:08Z jwillemsen $")#if defined (ACE_HAS_THREADS)// The producer reads data from the stdin stream, creates a message,// and then queues the message in the message list, where it is// removed by the consumer thread. A 0-sized message is enqueued when// there is no more data to read. The consumer uses this as a flag to// know when to exit.static void *producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue){ // Keep reading stdin, until we reach EOF. for (int n; ; ) { // Allocate a new message. ACE_Message_Block *mb; ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ), 0); n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), mb->size ()); if (n <= 0) { // Send a shutdown message to the other thread and exit. mb->length (0); if (msg_queue->enqueue_tail (mb) == -1) ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); break; } // Send the message to the other thread. else { mb->msg_priority (n); mb->wr_ptr (n); if (msg_queue->enqueue_tail (mb) == -1) ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); } } return 0;}// The consumer dequeues a message from the ACE_Message_Queue, writes// the message to the stderr stream, and deletes the message. The// producer sends a 0-sized message to inform the consumer to stop// reading and exit.static void *consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue){ int result = 0; // Keep looping, reading a message out of the queue, until we timeout // or get a message with a length == 0, which signals us to quit. for (;;) { ACE_Message_Block *mb; ACE_Time_Value timeout (ACE_OS::time (0) + 4, 0); // Wait for upto 4 seconds result = msg_queue->dequeue_head (mb, &timeout); if (result == -1) break; int length = mb->length (); if (length > 0) ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length); mb->release (); if (length == 0) break; } if (result == -1 && errno == EWOULDBLOCK) ACE_ERROR ((LM_ERROR, "(%t) %p\n%a", "timed out waiting for message", 1)); return 0;}// Spawn off two threads that copy stdin to stdout.intACE_TMAIN (int, ACE_TCHAR *[]){ // Message list. ACE_Message_Queue<ACE_MT_SYNCH> msg_queue; if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (producer), (void *) &msg_queue, THR_NEW_LWP | THR_DETACHED) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1); else if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (consumer), (void *) &msg_queue, THR_NEW_LWP | THR_DETACHED) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1); // Wait for producer and consumer threads to exit. ACE_Thread_Manager::instance ()->wait (); return 0;}#elseintACE_TMAIN (int, ACE_TCHAR *[]){ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); return 0;}#endif /* ACE_HAS_THREADS */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?