test_proactor3.cpp

来自「ace开发环境 用来开发网络程序 其运用了设计模式、多平台、C++等多种知识」· C++ 代码 · 共 890 行 · 第 1/2 页

CPP
890
字号
  ACE_SOCK_Stream stream_;  // Network I/O handle  ACE_Asynch_Write_Stream ws_;  // ws (write stream): for writing to the socket  ACE_Asynch_Read_Stream rs_;  // rs (read file): for reading from the socket  ACE_Message_Block welcome_message_;  // Welcome message  ACE_Recursive_Thread_Mutex mutex_;  long io_count_;};static const char *data = "Welcome to Irfan World! Irfan RULES here !!\n";Sender::Sender (void)  : io_count_ (0){  // Moment of inspiration... :-)  this->welcome_message_.init (data, ACE_OS::strlen (data));}Sender::~Sender (void){  this->close ();}void Sender::close (void){  this->stream_.close ();}ACE_HANDLE Sender::handle (void) const{  return this->stream_.get_handle ();}void Sender::handle (ACE_HANDLE handle){  this->stream_.set_handle (handle);}int Sender::open (const ACE_TCHAR *host, u_short port){  // Initialize stuff  // Connect to remote host  ACE_INET_Addr address (port, host);  ACE_SOCK_Connector connector;  if (connector.connect (this->stream_, address) == -1)    {      ACE_ERROR_RETURN ((LM_ERROR,                         "%p\n",                         "ACE_SOCK_Connector::connect"),                        -1);    }  // Open ACE_Asynch_Write_Stream  if (this->ws_.open (*this) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       "%p\n",                       "ACE_Asynch_Write_Stream::open"),                      -1);  // Open ACE_Asynch_Read_Stream  if (this->rs_.open (*this) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       "%p\n",                       "ACE_Asynch_Read_Stream::open"),                      -1);  // Start an asynchronous transmit file  if (this->initiate_write_stream () == -1)    return -1;  if (duplex != 0)    // Start an asynchronous read file    if (this->initiate_read_stream () == -1)      return -1;  return 0;}intSender::initiate_write_stream (void){  ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);  welcome_message_.rd_ptr(welcome_message_.base ());  welcome_message_.wr_ptr(welcome_message_.base ());  welcome_message_.wr_ptr (ACE_OS::strlen (data));  if (this->ws_.write (welcome_message_,                       welcome_message_.length ()) == -1)    ACE_ERROR_RETURN((LM_ERROR,                      "%p\n",                      "ACE_Asynch_Write_Stream::write"),                     -1);  io_count_++;  return 0;}intSender::initiate_read_stream (void){  ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);  // Create a new <Message_Block>.  Note that this message block will  // be used both to <read> data asynchronously from the socket and to  // <write> data asynchronously to the file.  ACE_DEBUG ((LM_DEBUG,              "initiate_read_stream called\n"));  ACE_Message_Block *mb = 0;  ACE_NEW_RETURN (mb,                  ACE_Message_Block (BUFSIZ + 1),                  -1);  // Inititiate read  if (this->rs_.read (*mb, mb->size ()- 1) == -1)    {      mb->release ();      ACE_ERROR_RETURN ((LM_ERROR,                         "%p\n",                         "ACE_Asynch_Read_Stream::read"),                        -1);    }  io_count_++;  return 0;}voidSender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result){  if (result.bytes_transferred () == 0 || result.error () != 0)    {      ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));      // Reset pointers.      result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());      ACE_DEBUG ((LM_DEBUG, "********************\n"));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));      ACE_DEBUG ((LM_DEBUG, "********************\n"));      ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));    }  // Simplify just for Test  if (result.success () && result.bytes_transferred () != 0)    {      if (duplex != 0)  // full duplex, continue write        initiate_write_stream ();      else  // half-duplex   read reply, after read we will start write        initiate_read_stream ();    }  {    ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);    io_count_--;  }}voidSender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){  if (result.bytes_transferred () == 0 || result.error () != 0)    {      ACE_DEBUG ((LM_DEBUG,                  "handle_read_stream called\n"));      // Reset pointers.      result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';      ACE_DEBUG ((LM_DEBUG, "********************\n"));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));      ACE_DEBUG ((LM_DEBUG, "********************\n"));      ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));    }  result.message_block().release ();  if (result.success () && result.bytes_transferred () != 0)    {      // Successful read: write the data to the file asynchronously.      // Note how we reuse the <ACE_Message_Block> for the writing.      // Therefore, we do not delete this buffer because it is handled      // in <handle_write_stream>.      if (duplex != 0)  // full duplex, continue read        initiate_read_stream ();      else  // half-duplex  writey, after write we will start read        initiate_write_stream ();    }  {    ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);    io_count_--;  }}static intset_proactor_type (const char *ptype){  if (!ptype)    return false;  switch (ACE_OS::ace_toupper (*ptype))    {    case 'D' :  proactor_type = 0; return true;    case 'A' :  proactor_type = 1; return true;    case 'I' :  proactor_type = 2; return true;#if defined (sun)    case 'S' :  proactor_type = 3; return true;#endif /* sun */    }  return false;}static intparse_args (int argc, ACE_TCHAR *argv[]){  ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("t:o:n:p:d:h:s:u"));  int c;  while ((c = get_opt ()) != EOF)    switch (c)      {      case 'd':		// duplex        duplex = ACE_OS::atoi (get_opt.opt_arg ());        break;      case 'h':		// host for sender        host = get_opt.opt_arg ();        break;      case 'p':		// port number        port = ACE_OS::atoi (get_opt.opt_arg ());        break;      case 'n':		// thread pool size        threads = ACE_OS::atoi (get_opt.opt_arg ());        break;      case 's':     // number of senders        senders = ACE_OS::atoi (get_opt.opt_arg ());        if (senders > MaxSenders)          senders = MaxSenders;        break;      case 'o':     // max number of aio for proactor        max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ());	break;      case 't':    //  Proactor Type        if (set_proactor_type (get_opt.opt_arg ()))          break;      case 'u':      default:        ACE_ERROR ((LM_ERROR, "%p.",                    "\nusage:"                    "\n-o <max number of started aio operations for Proactor>"                    "\n-t <Proactor type> UNIX-only, Win32-default always:"                    "\n    a AIOCB"                    "\n    i SIG"                    "\n    s SUN"                    "\n    d default"                    "\n-d <duplex mode 1-on/0-off>"                    "\n-h <host> for Sender mode"                    "\n-n <number threads for Proactor pool>"                    "\n-p <port to listen/connect>"                    "\n-s <number of sender's instances>"                    "\n-u show this message"                    "\n"));	return -1;      }  return 0;}intACE_TMAIN (int argc, ACE_TCHAR *argv[]){#if defined (sun)  ACE_DEBUG ((LM_DEBUG, "\nSUN defined!\n"));#endif  if (parse_args (argc, argv) == -1)    return -1;  disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);  MyTask task1;  if (task1.activate (THR_NEW_LWP, threads) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       "%p.\n",                       "main"),                      -1);  // wait for creation of Proactor  task1.waitready ();  Sender * send_list[MaxSenders];  ACE_Asynch_Acceptor<Receiver> acceptor;  int rc = -1;  int i;  char c;  if (host == 0) // Acceptor    {      // Simplify, initial read with  zero size      if (acceptor.open (ACE_INET_Addr (port),0,1) == 0)        rc = 1;    }  else    {      for (i = 0; i < senders; ++i)        send_list[i] = new Sender;      for (i = 0; i < senders; ++i)        if (send_list[i]->open (host, port) == 0)          rc++;    }  if (rc > 0)    {      cout << "Press any key to stop=>" << flush;      cin.clear ();      cin >> c;    }  ACE_Proactor::end_event_loop ();  if (host != 0) // we are sender    {      for (i = 0; i < senders; ++i)        send_list[i]->close ();    }  ACE_Thread_Manager *tm =    ACE_Thread_Manager::instance();  tm->wait_task (&task1);  cout 	<< "\nNumber of Receivers objects="        << Receiver::get_number_sessions ()        << flush;  for (i = 0; i < senders; ++i)    {      delete (send_list[i]);      send_list[i] = 0;    }  return 0;}static intdisable_signal (int sigmin, int sigmax){#ifndef ACE_WIN32  sigset_t signal_set;  if (ACE_OS::sigemptyset (&signal_set) == - 1)    ACE_ERROR ((LM_ERROR,                "Error:(%P | %t):%p\n",                "sigemptyset failed"));  for (int i = sigmin; i <= sigmax; i++)    ACE_OS::sigaddset (&signal_set, i);  //  Put the <signal_set>.  if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0)    ACE_ERROR ((LM_ERROR,                "Error:(%P | %t):%p\n",                "pthread_sigmask failed"));#else  ACE_UNUSED_ARG (sigmin);  ACE_UNUSED_ARG (sigmax);#endif /* ACE_WIN32 */  return 1;}// Get the <signal_set> back from the OS.#if 0static intprint_sigmask (void){#ifndef ACE_WIN32  sigset_t  mask;  int member = 0;  COUT ("\n=============Signal Mask==========")  if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask) != 0)    ACE_ERROR ((LM_ERROR,                "Error:(%P | %t):%p\n",                "ACE_OS::pthread_sigmask failed"));  else    for (int i = 1; i < 1000; i++)      {        member = ACE_OS::sigismember (&mask,i);        COUT ("\nSig ")        COUT (i)        COUT (" is ")        COUT (member)        if (member == -1)          break;    }#endif /* ACE_WIN32 */  return 0;}#endif /* 0 */#else /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */intACE_TMAIN (int, ACE_TCHAR *[]){  ACE_DEBUG ((LM_DEBUG,              "This example does not work on this platform.\n"));  return 1;}#endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?