test_proactor2.cpp
来自「ace开发环境 用来开发网络程序 其运用了设计模式、多平台、C++等多种知识」· C++ 代码 · 共 825 行 · 第 1/2 页
CPP
825 行
MyMutex m_Mtx ;long nIOCount ;};static const char *data = "Welcome to Irfan World! Irfan RULES here !!\n";Sender::Sender (void) :nIOCount ( 0 ){ // Moment of inspiration... :-) this->welcome_message_.init (data, ACE_OS::strlen (data));}Sender::~Sender (void){ this->close ();}void Sender::close (){ this->stream_.close ();}ACE_HANDLE Sender::handle (void) const{ return this->stream_.get_handle ();}void Sender::handle (ACE_HANDLE handle){ this->stream_.set_handle (handle);}int Sender::open (const ACE_TCHAR *host, u_short port){ // Initialize stuff // Connect to remote host ACE_INET_Addr address (port, host); ACE_SOCK_Connector connector; if (connector.connect (this->stream_, address) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_SOCK_Connector::connect"), -1); } // Open ACE_Asynch_Write_Stream if (this->ws_.open (*this) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::open"), -1); // Open ACE_Asynch_Read_Stream if (this->rs_.open (*this) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_File::open"), -1); // Start an asynchronous transmit file if ( this->initiate_write_stream () == -1) return -1; if ( duplex != 0 ) { // Start an asynchronous read file if (this->initiate_read_stream () == -1) return -1; } return 0;}int Sender::initiate_write_stream (void){ ACE_Guard<MyMutex> locker (m_Mtx) ; welcome_message_.rd_ptr( welcome_message_.base ()); welcome_message_.wr_ptr( welcome_message_.base ()); welcome_message_.wr_ptr (ACE_OS::strlen (data)); if (this->ws_.write (welcome_message_, welcome_message_.length () ) == -1) { ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write"), -1); } nIOCount++ ; return 0;}int Sender::initiate_read_stream (void){ ACE_Guard<MyMutex> locker (m_Mtx) ; // Create a new <Message_Block>. Note that this message block will // be used both to <read> data asynchronously from the socket and to // <write> data asynchronously to the file. ACE_DEBUG ((LM_DEBUG, "initiate_read_stream called\n")); ACE_Message_Block *mb = 0; ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1); // Inititiate read if (this->rs_.read (*mb, mb->size ()- 1) == -1) { mb->release () ; ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::read"), -1); } nIOCount++ ; return 0;}void Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result){ ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n")); // Reset pointers. result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ()); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); // Simplify just for Test if (result.success () && result.bytes_transferred () != 0) { if ( duplex != 0 ) // full duplex, continue write { initiate_write_stream () ; } else // half-duplex read reply, after read we will start // write { initiate_read_stream () ; } } { ACE_Guard<MyMutex> locker (m_Mtx) ; nIOCount-- ; }}voidSender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){ ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n")); // Reset pointers. result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); result.message_block().release (); if ( result.success () && result.bytes_transferred () != 0) { // Successful read: write the data to the file asynchronously. // Note how we reuse the <ACE_Message_Block> for the writing. // Therefore, we do not delete this buffer because it is handled // in <handle_write_stream>. if ( duplex != 0 ) // full duplex, continue read { initiate_read_stream () ; } else // half-duplex writey, after write we will start read { initiate_write_stream () ; } } { ACE_Guard<MyMutex> locker (m_Mtx) ; nIOCount-- ; }}//--------------------------------------------------------------------------static intparse_args (int argc, ACE_TCHAR *argv[]){ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("n:p:d:h:")); int c; while ((c = get_opt ()) != EOF) switch (c) { case 'h': host = get_opt.opt_arg (); break; case 'n': nThreads = ACE_OS::atoi (get_opt.opt_arg ()) ; break; case 'p': port = ACE_OS::atoi (get_opt.opt_arg ()); break; case 'd': duplex = ACE_OS::atoi (get_opt.opt_arg ()); break; default: ACE_ERROR ((LM_ERROR, "%p.\n", "usage :\n" "-h <host> for Sender mode\n" "-d <duplex mode 1-on/0-off>\n" "-p <port to listen/connect>\n" "-n <number threads for Proactor pool>\n")); return -1; } return 0;}intACE_TMAIN (int argc, ACE_TCHAR *argv[]){ ACE_UNUSED_ARG (initial_read_size); if (parse_args (argc, argv) == -1) return -1;#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) ACE_WIN32_Proactor * pImpl = new ACE_WIN32_Proactor;#elif defined (ACE_HAS_AIO_CALLS) // ACE_POSIX_AIOCB_Proactor * pImpl = new ACE_POSIX_AIOCB_Proactor; ACE_POSIX_SIG_Proactor * pImpl = new ACE_POSIX_SIG_Proactor;#endif ACE_Proactor Proactor ( pImpl ,1 ); ACE_Proactor::instance( & Proactor ); MyTask Task1 ; if (Task1.activate (THR_NEW_LWP, nThreads ) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1); } Sender sender; ACE_Asynch_Acceptor<Receiver> acceptor; int Rc = -1 ; if ( host == 0 ) // Acceptor { // Simplify , initial read with zero size Rc = acceptor.open (ACE_INET_Addr (port),0,1); } else { Rc = sender.open (host, port); } if ( Rc == 0 ) { char c ; cout << "Press any key to stop and exit=>\n" << flush ; cin.clear (); cin >> c ; } ACE_Proactor::end_event_loop () ; if ( host != 0 ) // we are sender { sender.close () ; // disconnect to get reciever error !!! } ACE_Thread_Manager * pTM = ACE_Thread_Manager::instance(); pTM->wait_task ( & Task1 ) ; ACE_Proactor::instance( ( ACE_Proactor* )0 ); return 0;}//--------------------------------------------------------------------////--------------------------------------------------------------------int DisableSignal ( int SigNum ){#ifndef ACE_WIN32 sigset_t signal_set; if ( ACE_OS::sigemptyset (&signal_set) == - 1 ) { ACE_ERROR ((LM_ERROR, "Error:(%P | %t):%p\n", "sigemptyset failed")); } ACE_OS::sigaddset (&signal_set, SigNum); // Put the <signal_set>. if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0) { ACE_ERROR ((LM_ERROR, "Error:(%P | %t):%p\n", "pthread_sigmask failed")); }#else ACE_UNUSED_ARG(SigNum);#endif return 1;}//--------------------------------------------------------------------// Get the <signal_set> back from the OS.//--------------------------------------------------------------------int PrintSigMask (){#ifndef ACE_WIN32 sigset_t mask ; int member = 0; COUT ( "\n=============Signal Mask==========" ) if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask ) != 0) { ACE_ERROR ((LM_ERROR, "Error:(%P | %t):%p\n", "ACE_OS::pthread_sigmask failed")); } else for (int i = 1 ; i < 1000; i++) { member = ACE_OS::sigismember (&mask,i); COUT ( "\nSig " ) COUT ( i ) COUT ( " is " ) COUT (member ) if (member == -1) { break ; } }#endif return 0;}#else /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */intACE_TMAIN (int, ACE_TCHAR *[]){ ACE_DEBUG ((LM_DEBUG, "This example does not work on this platform.\n")); return 1;}#endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?