test_proactor3.cpp
来自「ace开发环境 用来开发网络程序 其运用了设计模式、多平台、C++等多种知识」· C++ 代码 · 共 890 行 · 第 1/2 页
CPP
890 行
ACE_SOCK_Stream stream_; // Network I/O handle ACE_Asynch_Write_Stream ws_; // ws (write stream): for writing to the socket ACE_Asynch_Read_Stream rs_; // rs (read file): for reading from the socket ACE_Message_Block welcome_message_; // Welcome message ACE_Recursive_Thread_Mutex mutex_; long io_count_;};static const char *data = "Welcome to Irfan World! Irfan RULES here !!\n";Sender::Sender (void) : io_count_ (0){ // Moment of inspiration... :-) this->welcome_message_.init (data, ACE_OS::strlen (data));}Sender::~Sender (void){ this->close ();}void Sender::close (void){ this->stream_.close ();}ACE_HANDLE Sender::handle (void) const{ return this->stream_.get_handle ();}void Sender::handle (ACE_HANDLE handle){ this->stream_.set_handle (handle);}int Sender::open (const ACE_TCHAR *host, u_short port){ // Initialize stuff // Connect to remote host ACE_INET_Addr address (port, host); ACE_SOCK_Connector connector; if (connector.connect (this->stream_, address) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_SOCK_Connector::connect"), -1); } // Open ACE_Asynch_Write_Stream if (this->ws_.open (*this) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::open"), -1); // Open ACE_Asynch_Read_Stream if (this->rs_.open (*this) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::open"), -1); // Start an asynchronous transmit file if (this->initiate_write_stream () == -1) return -1; if (duplex != 0) // Start an asynchronous read file if (this->initiate_read_stream () == -1) return -1; return 0;}intSender::initiate_write_stream (void){ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); welcome_message_.rd_ptr(welcome_message_.base ()); welcome_message_.wr_ptr(welcome_message_.base ()); welcome_message_.wr_ptr (ACE_OS::strlen (data)); if (this->ws_.write (welcome_message_, welcome_message_.length ()) == -1) ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::write"), -1); io_count_++; return 0;}intSender::initiate_read_stream (void){ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); // Create a new <Message_Block>. Note that this message block will // be used both to <read> data asynchronously from the socket and to // <write> data asynchronously to the file. ACE_DEBUG ((LM_DEBUG, "initiate_read_stream called\n")); ACE_Message_Block *mb = 0; ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1); // Inititiate read if (this->rs_.read (*mb, mb->size ()- 1) == -1) { mb->release (); ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::read"), -1); } io_count_++; return 0;}voidSender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result){ if (result.bytes_transferred () == 0 || result.error () != 0) { ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n")); // Reset pointers. result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ()); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); } // Simplify just for Test if (result.success () && result.bytes_transferred () != 0) { if (duplex != 0) // full duplex, continue write initiate_write_stream (); else // half-duplex read reply, after read we will start write initiate_read_stream (); } { ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); io_count_--; }}voidSender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){ if (result.bytes_transferred () == 0 || result.error () != 0) { ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n")); // Reset pointers. result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); } result.message_block().release (); if (result.success () && result.bytes_transferred () != 0) { // Successful read: write the data to the file asynchronously. // Note how we reuse the <ACE_Message_Block> for the writing. // Therefore, we do not delete this buffer because it is handled // in <handle_write_stream>. if (duplex != 0) // full duplex, continue read initiate_read_stream (); else // half-duplex writey, after write we will start read initiate_write_stream (); } { ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); io_count_--; }}static intset_proactor_type (const char *ptype){ if (!ptype) return false; switch (ACE_OS::ace_toupper (*ptype)) { case 'D' : proactor_type = 0; return true; case 'A' : proactor_type = 1; return true; case 'I' : proactor_type = 2; return true;#if defined (sun) case 'S' : proactor_type = 3; return true;#endif /* sun */ } return false;}static intparse_args (int argc, ACE_TCHAR *argv[]){ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("t:o:n:p:d:h:s:u")); int c; while ((c = get_opt ()) != EOF) switch (c) { case 'd': // duplex duplex = ACE_OS::atoi (get_opt.opt_arg ()); break; case 'h': // host for sender host = get_opt.opt_arg (); break; case 'p': // port number port = ACE_OS::atoi (get_opt.opt_arg ()); break; case 'n': // thread pool size threads = ACE_OS::atoi (get_opt.opt_arg ()); break; case 's': // number of senders senders = ACE_OS::atoi (get_opt.opt_arg ()); if (senders > MaxSenders) senders = MaxSenders; break; case 'o': // max number of aio for proactor max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ()); break; case 't': // Proactor Type if (set_proactor_type (get_opt.opt_arg ())) break; case 'u': default: ACE_ERROR ((LM_ERROR, "%p.", "\nusage:" "\n-o <max number of started aio operations for Proactor>" "\n-t <Proactor type> UNIX-only, Win32-default always:" "\n a AIOCB" "\n i SIG" "\n s SUN" "\n d default" "\n-d <duplex mode 1-on/0-off>" "\n-h <host> for Sender mode" "\n-n <number threads for Proactor pool>" "\n-p <port to listen/connect>" "\n-s <number of sender's instances>" "\n-u show this message" "\n")); return -1; } return 0;}intACE_TMAIN (int argc, ACE_TCHAR *argv[]){#if defined (sun) ACE_DEBUG ((LM_DEBUG, "\nSUN defined!\n"));#endif if (parse_args (argc, argv) == -1) return -1; disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX); MyTask task1; if (task1.activate (THR_NEW_LWP, threads) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1); // wait for creation of Proactor task1.waitready (); Sender * send_list[MaxSenders]; ACE_Asynch_Acceptor<Receiver> acceptor; int rc = -1; int i; char c; if (host == 0) // Acceptor { // Simplify, initial read with zero size if (acceptor.open (ACE_INET_Addr (port),0,1) == 0) rc = 1; } else { for (i = 0; i < senders; ++i) send_list[i] = new Sender; for (i = 0; i < senders; ++i) if (send_list[i]->open (host, port) == 0) rc++; } if (rc > 0) { cout << "Press any key to stop=>" << flush; cin.clear (); cin >> c; } ACE_Proactor::end_event_loop (); if (host != 0) // we are sender { for (i = 0; i < senders; ++i) send_list[i]->close (); } ACE_Thread_Manager *tm = ACE_Thread_Manager::instance(); tm->wait_task (&task1); cout << "\nNumber of Receivers objects=" << Receiver::get_number_sessions () << flush; for (i = 0; i < senders; ++i) { delete (send_list[i]); send_list[i] = 0; } return 0;}static intdisable_signal (int sigmin, int sigmax){#ifndef ACE_WIN32 sigset_t signal_set; if (ACE_OS::sigemptyset (&signal_set) == - 1) ACE_ERROR ((LM_ERROR, "Error:(%P | %t):%p\n", "sigemptyset failed")); for (int i = sigmin; i <= sigmax; i++) ACE_OS::sigaddset (&signal_set, i); // Put the <signal_set>. if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0) ACE_ERROR ((LM_ERROR, "Error:(%P | %t):%p\n", "pthread_sigmask failed"));#else ACE_UNUSED_ARG (sigmin); ACE_UNUSED_ARG (sigmax);#endif /* ACE_WIN32 */ return 1;}// Get the <signal_set> back from the OS.#if 0static intprint_sigmask (void){#ifndef ACE_WIN32 sigset_t mask; int member = 0; COUT ("\n=============Signal Mask==========") if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask) != 0) ACE_ERROR ((LM_ERROR, "Error:(%P | %t):%p\n", "ACE_OS::pthread_sigmask failed")); else for (int i = 1; i < 1000; i++) { member = ACE_OS::sigismember (&mask,i); COUT ("\nSig ") COUT (i) COUT (" is ") COUT (member) if (member == -1) break; }#endif /* ACE_WIN32 */ return 0;}#endif /* 0 */#else /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */intACE_TMAIN (int, ACE_TCHAR *[]){ ACE_DEBUG ((LM_DEBUG, "This example does not work on this platform.\n")); return 1;}#endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?