⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 bounded_buffer.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
字号:
// bounded_buffer.cpp,v 4.9 2003/11/01 11:15:23 dhinton Exp

// This short program copies stdin to stdout via the use of an ASX
// Message_Queue.  It illustrates an implementation of the classic
// "bounded buffer" program.

#include "ace/Message_Queue.h"
#include "ace/Thread_Manager.h"
#include "ace/OS_NS_time.h"
#include "ace/OS_NS_unistd.h"

ACE_RCSID(Message_Queue, bounded_buffer, "bounded_buffer.cpp,v 4.9 2003/11/01 11:15:23 dhinton Exp")

#if defined (ACE_HAS_THREADS)

// The producer reads data from the stdin stream, creates a message,
// and then queues the message in the message list, where it is
// removed by the consumer thread.  A 0-sized message is enqueued when
// there is no more data to read.  The consumer uses this as a flag to
// know when to exit.

static void *
producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
{
  // Keep reading stdin, until we reach EOF. 

  for (int n; ; )
    {
      // Allocate a new message.
      ACE_Message_Block *mb;

      ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ), 0);

      n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), mb->size ());

      if (n <= 0)
        {
          // Send a shutdown message to the other thread and exit.
          mb->length (0);
          if (msg_queue->enqueue_tail (mb) == -1)
            ACE_ERROR ((LM_ERROR,
                        "(%t) %p\n",
                        "put_next"));
      	  break;
        }

      // Send the message to the other thread.
      else
	{
	  mb->msg_priority (n);
	  mb->wr_ptr (n);
	  if (msg_queue->enqueue_tail (mb) == -1)
	    ACE_ERROR ((LM_ERROR,
                        "(%t) %p\n",
                        "put_next"));
	}
    }

  return 0; 
}

// The consumer dequeues a message from the ACE_Message_Queue, writes
// the message to the stderr stream, and deletes the message.  The
// producer sends a 0-sized message to inform the consumer to stop
// reading and exit.

static void *consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
{
  int result = 0;

  // Keep looping, reading a message out of the queue, until we timeout
  // or get a message with a length == 0, which signals us to quit.

  for (;;)
    {
      ACE_Message_Block *mb;

      ACE_Time_Value timeout (ACE_OS::time (0) + 4, 0); // Wait for upto 4 seconds

      result = msg_queue->dequeue_head (mb, &timeout);

      if (result == -1)
	break;

      int length = mb->length ();

      if (length > 0)
	ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length);

      mb->release ();

      if (length == 0)
	break;
    }

  if (result == -1 && errno == EWOULDBLOCK)
    ACE_ERROR ((LM_ERROR,
                "(%t) %p\n%a",
                "timed out waiting for message",
                1));
  return 0;
}

// Spawn off two threads that copy stdin to stdout.

int 
main (int, char *[])
{
  // Message list.
  ACE_Message_Queue<ACE_MT_SYNCH> msg_queue;

  if (ACE_Thread_Manager::instance ()->spawn 
      (ACE_THR_FUNC (producer),
       (void *) &msg_queue,
       THR_NEW_LWP | THR_DETACHED) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "spawn"),
                      1);
  else if (ACE_Thread_Manager::instance ()->spawn 
           (ACE_THR_FUNC (consumer),
            (void *) &msg_queue,
            THR_NEW_LWP | THR_DETACHED) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "spawn"),
                      1);

  // Wait for producer and consumer threads to exit.
  ACE_Thread_Manager::instance ()->wait ();
  return 0;
}
#else
int 
main (int, char *[])
{
  ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
  return 0;
}
#endif /* ACE_HAS_THREADS */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -