test_proactor2.cpp

来自「ace开发环境 用来开发网络程序 其运用了设计模式、多平台、C++等多种知识」· C++ 代码 · 共 825 行 · 第 1/2 页

CPP
825
字号
MyMutex m_Mtx ;long    nIOCount ;};static const char *data = "Welcome to Irfan World! Irfan RULES here !!\n";Sender::Sender (void)  :nIOCount ( 0 ){  // Moment of inspiration... :-)  this->welcome_message_.init (data, ACE_OS::strlen (data));}Sender::~Sender (void){  this->close ();}void Sender::close (){  this->stream_.close ();}ACE_HANDLE Sender::handle (void) const{  return this->stream_.get_handle ();}void Sender::handle (ACE_HANDLE handle){  this->stream_.set_handle (handle);}int Sender::open (const ACE_TCHAR *host, u_short port){  // Initialize stuff  // Connect to remote host  ACE_INET_Addr address (port, host);  ACE_SOCK_Connector connector;  if (connector.connect (this->stream_,                         address) == -1)    {      ACE_ERROR_RETURN ((LM_ERROR,                         "%p\n",                         "ACE_SOCK_Connector::connect"),                        -1);    }  // Open ACE_Asynch_Write_Stream  if (this->ws_.open (*this) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       "%p\n",                       "ACE_Asynch_Write_Stream::open"),                      -1);  // Open ACE_Asynch_Read_Stream  if (this->rs_.open (*this) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       "%p\n",                       "ACE_Asynch_Read_File::open"),                      -1);  // Start an asynchronous transmit file  if ( this->initiate_write_stream () == -1)    return -1;  if ( duplex != 0 )    {      // Start an asynchronous read file      if (this->initiate_read_stream () == -1)        return -1;    }  return 0;}int Sender::initiate_write_stream (void){  ACE_Guard<MyMutex> locker (m_Mtx) ;  welcome_message_.rd_ptr( welcome_message_.base ());  welcome_message_.wr_ptr( welcome_message_.base ());  welcome_message_.wr_ptr (ACE_OS::strlen (data));  if (this->ws_.write (welcome_message_,                       welcome_message_.length ()                       ) == -1)    {      ACE_ERROR_RETURN((LM_ERROR,                        "%p\n",                        "ACE_Asynch_Write_File::write"),                       -1);    }  nIOCount++ ;  return 0;}int Sender::initiate_read_stream (void){  ACE_Guard<MyMutex> locker (m_Mtx) ;  // Create a new <Message_Block>.  Note that this message block will  // be used both to <read> data asynchronously from the socket and to  // <write> data asynchronously to the file.  ACE_DEBUG ((LM_DEBUG,              "initiate_read_stream called\n"));  ACE_Message_Block *mb = 0;  ACE_NEW_RETURN (mb,                  ACE_Message_Block (BUFSIZ + 1),                  -1);  // Inititiate read  if (this->rs_.read (*mb, mb->size ()- 1) == -1)    {      mb->release () ;      ACE_ERROR_RETURN ((LM_ERROR,                         "%p\n",                         "ACE_Asynch_Read_Stream::read"),                        -1);    }  nIOCount++ ;  return 0;}void Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result                                  &result){  ACE_DEBUG ((LM_DEBUG,              "handle_write_stream called\n"));  // Reset pointers.  result.message_block ().rd_ptr (result.message_block ().rd_ptr () -                                  result.bytes_transferred ());  ACE_DEBUG ((LM_DEBUG, "********************\n"));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write",              result.bytes_to_write ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",              result.bytes_transferred ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)              result.completion_key ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));  ACE_DEBUG ((LM_DEBUG, "********************\n"));  ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",              result.message_block ().rd_ptr ()));  // Simplify just for Test  if (result.success () && result.bytes_transferred () != 0)    {      if ( duplex != 0 )  // full duplex, continue write        {          initiate_write_stream () ;        }      else  // half-duplex   read reply, after read we will start        // write        {          initiate_read_stream () ;        }    }  {    ACE_Guard<MyMutex> locker (m_Mtx) ;    nIOCount-- ;  }}voidSender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){  ACE_DEBUG ((LM_DEBUG,              "handle_read_stream called\n"));  // Reset pointers.  result.message_block ().rd_ptr ()[result.bytes_transferred ()] =    '\0';  ACE_DEBUG ((LM_DEBUG, "********************\n"));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read              ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",              result.bytes_transferred ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)              result.completion_key ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));  ACE_DEBUG ((LM_DEBUG, "********************\n"));  ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",              result.message_block ().rd_ptr ()));  result.message_block().release ();  if ( result.success () && result.bytes_transferred () != 0)    {      // Successful read: write the data to the file asynchronously.      // Note how we reuse the <ACE_Message_Block> for the writing.      // Therefore, we do not delete this buffer because it is handled      // in <handle_write_stream>.      if ( duplex != 0 )  // full duplex, continue read        {          initiate_read_stream () ;        }      else  // half-duplex  writey, after write we will start read        {          initiate_write_stream () ;        }    }  {    ACE_Guard<MyMutex> locker (m_Mtx) ;    nIOCount-- ;  }}//--------------------------------------------------------------------------static intparse_args (int argc, ACE_TCHAR *argv[]){  ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("n:p:d:h:"));  int c;  while ((c = get_opt ()) != EOF)    switch (c)      {      case 'h':        host = get_opt.opt_arg ();        break;      case 'n':        nThreads = ACE_OS::atoi (get_opt.opt_arg ()) ;        break;      case 'p':        port = ACE_OS::atoi (get_opt.opt_arg ());        break;      case 'd':        duplex = ACE_OS::atoi (get_opt.opt_arg ());        break;      default:        ACE_ERROR ((LM_ERROR, "%p.\n",                    "usage :\n"                    "-h <host> for Sender mode\n"                    "-d <duplex mode 1-on/0-off>\n"                    "-p <port to listen/connect>\n"                    "-n <number threads for Proactor pool>\n"));        return -1;      }  return 0;}intACE_TMAIN (int argc, ACE_TCHAR *argv[]){  ACE_UNUSED_ARG (initial_read_size);  if (parse_args (argc, argv) == -1)    return -1;#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)  ACE_WIN32_Proactor *      pImpl = new ACE_WIN32_Proactor;#elif defined (ACE_HAS_AIO_CALLS)  //  ACE_POSIX_AIOCB_Proactor *  pImpl = new ACE_POSIX_AIOCB_Proactor;  ACE_POSIX_SIG_Proactor *  pImpl = new ACE_POSIX_SIG_Proactor;#endif  ACE_Proactor Proactor ( pImpl ,1 );  ACE_Proactor::instance( & Proactor );  MyTask  Task1 ;  if (Task1.activate (THR_NEW_LWP, nThreads ) == -1)    {      ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1);    }  Sender sender;  ACE_Asynch_Acceptor<Receiver> acceptor;  int Rc = -1 ;  if ( host == 0 ) // Acceptor    {      // Simplify , initial read with  zero size      Rc = acceptor.open (ACE_INET_Addr (port),0,1);    }  else    {      Rc = sender.open (host, port);    }  if ( Rc == 0 )    {      char c ;      cout << "Press any key to stop and exit=>\n" << flush ;      cin.clear ();      cin >> c ;    }  ACE_Proactor::end_event_loop () ;  if ( host != 0 ) // we are sender    {      sender.close () ; // disconnect to get reciever error !!!    }  ACE_Thread_Manager * pTM = ACE_Thread_Manager::instance();  pTM->wait_task ( & Task1 ) ;  ACE_Proactor::instance( ( ACE_Proactor* )0 );  return 0;}//--------------------------------------------------------------------////--------------------------------------------------------------------int DisableSignal ( int SigNum ){#ifndef ACE_WIN32  sigset_t signal_set;  if ( ACE_OS::sigemptyset (&signal_set) == - 1 )    {      ACE_ERROR ((LM_ERROR,                  "Error:(%P | %t):%p\n",                  "sigemptyset failed"));    }  ACE_OS::sigaddset (&signal_set, SigNum);  //  Put the <signal_set>.  if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0)    {      ACE_ERROR ((LM_ERROR,                  "Error:(%P | %t):%p\n",                  "pthread_sigmask failed"));    }#else  ACE_UNUSED_ARG(SigNum);#endif  return  1;}//--------------------------------------------------------------------// Get the <signal_set> back from the OS.//--------------------------------------------------------------------int PrintSigMask (){#ifndef ACE_WIN32  sigset_t  mask ;  int member = 0;  COUT ( "\n=============Signal Mask==========" )    if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask ) != 0)      {        ACE_ERROR ((LM_ERROR,                    "Error:(%P | %t):%p\n",                    "ACE_OS::pthread_sigmask failed"));      }    else  for (int i = 1 ; i < 1000; i++)      {        member = ACE_OS::sigismember (&mask,i);        COUT ( "\nSig " )          COUT ( i )          COUT ( " is " )          COUT (member )          if (member == -1)            {              break ;            }      }#endif  return 0;}#else /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */intACE_TMAIN (int, ACE_TCHAR *[]){  ACE_DEBUG ((LM_DEBUG,              "This example does not work on this platform.\n"));  return 1;}#endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?