📄 talker.cpp
字号:
// was specified from the command line.intPeer_Handler::open (void *){ if (host_ != 0) // Connector { ACE_INET_Addr addr (port_, host_); ACE_SOCK_Connector connector; // Establish connection with server. if (connector.connect (stream_, addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1); ACE_DEBUG ((LM_DEBUG, "(%t) connected.\n")); } else // Acceptor { ACE_SOCK_Acceptor acceptor; ACE_INET_Addr local_addr (port_); if ((acceptor.open (local_addr) == -1) || (acceptor.accept (this->stream_) == -1)) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1); ACE_DEBUG ((LM_DEBUG, "(%t) accepted.\n")); } int result = this->rd_stream_.open (*this); if (result != 0) return result; result = this->wr_stream_.open (*this); if (result != 0) return result; result = this->rd_stream_.read (this->mb_, this->mb_.size ()); return result;}// One of our asynchronous writes to the remote peer has completed.// Make sure it succeeded and then delete the message.voidPeer_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result){ if (result.bytes_transferred () <= 0) ACE_DEBUG ((LM_DEBUG, "(%t) %p bytes = %d\n", "Message failed", result.bytes_transferred ())); // This was allocated by the STDIN_Handler, queued, dequeued, passed // to the proactor, and now passed back to us. result.message_block ().release ();}// The remote peer has sent us something. If it succeeded, print// out the message and reinitiate a read. Otherwise, fail. In both// cases, delete the message sent.voidPeer_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){ if (result.bytes_transferred () > 0 && this->mb_.length () > 0) { this->mb_.rd_ptr ()[result.bytes_transferred ()] = '\0'; // Print out the message received from the server. ACE_DEBUG ((LM_DEBUG, "%s", this->mb_.rd_ptr ())); } else { // If a read failed, we will assume it's because the remote peer // went away. We will end the event loop. Since we're in the // main thread, we don't need to do a notify. ACE_Reactor::end_event_loop(); return; } // Reset pointers this->mb_.wr_ptr (this->mb_.wr_ptr () - result.bytes_transferred ()); // Start off another read if (this->rd_stream_.read (this->mb_, this->mb_.size ()) == -1) ACE_ERROR ((LM_ERROR, "%p Read initiate.\n", "Peer_Handler"));}// This is so the Proactor can get our handle.ACE_HANDLEPeer_Handler::handle (void) const{ return this->stream_.get_handle ();}voidPeer_Handler::handle (ACE_HANDLE handle){ this->stream_.set_handle (handle);}// We've been removed from the Reactor.intPeer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask){ ACE_DEBUG ((LM_DEBUG, "(%t) Peer_Handler closing down\n")); return 0;}// New stuff added to the message queue. Try to dequeue a message.intPeer_Handler::handle_output (ACE_HANDLE){ ACE_Message_Block *mb; ACE_Time_Value tv (ACE_Time_Value::zero); // Forward the message to the remote peer receiver. if (this->getq (mb, &tv) != -1) { if (this->wr_stream_.write (*mb, mb->length ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"), -1); } return 0;}voidSTDIN_Handler::handler (int signum){ ACE_DEBUG ((LM_DEBUG, "(%t) signal = %S\n", signum));}STDIN_Handler::STDIN_Handler (MT_TASK &ph) : ph_ (ph){ // Register for ^C from the console. We just need to catch the // exception so that the kernel doesn't kill our process. // Registering this signal handler just tells the kernel that we // know what we're doing; to leave us alone. ACE_OS::signal (SIGINT, (ACE_SignalHandler) STDIN_Handler::handler);};// Activate object.intSTDIN_Handler::open (void *){ if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), -1); return 0;}// Shut down.intSTDIN_Handler::close (u_long){ ACE_DEBUG ((LM_DEBUG, "(%t) thread is exiting.\n")); return 0;}// Thread runs here.intSTDIN_Handler::svc (void){ this->register_thread_exit_hook (); for (;;) { ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ); // Read from stdin into mb. int read_result = ACE_OS::read (ACE_STDIN, mb->rd_ptr (), mb->size ()); // If read succeeds, put mb to peer handler, else end the loop. if (read_result > 0) { mb->wr_ptr (read_result); // Note that this call will first enqueue mb onto the peer // handler's message queue, which will then turn around and // notify the Reactor via the Notification_Strategy. This // will subsequently signal the Peer_Handler, which will // react by calling back to its handle_output() method, // which dequeues the message and sends it to the peer // across the network. this->ph_.putq (mb); } else { mb->release (); break; } } // handle_signal will get called on the main proactor thread since // we just exited and the main thread is waiting on our thread exit. return 0;}// Register an exit hook with the reactor.voidSTDIN_Handler::register_thread_exit_hook (void){ // Get a real handle to our thread. ACE_Thread_Manager::instance ()->thr_self (this->thr_handle_); // Register ourselves to get called back when our thread exits. if (ACE_Reactor::instance ()-> register_handler (this, this->thr_handle_) == -1) ACE_ERROR ((LM_ERROR, "Exit_Hook Register failed.\n"));}// The STDIN thread has exited. This means the user hit ^C. We can// end the event loop and delete ourself.intSTDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *){ if (si != 0) { ACE_ASSERT (this->thr_handle_ == si->si_handle_); ACE_Reactor::end_event_loop (); } return 0;}intSTDIN_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask){ delete this; return 0;}intACE_TMAIN (int argc, ACE_TCHAR *argv[]){ // Let the proactor know that it will be used with Reactor // Create specific proactor ACE_WIN32_Proactor win32_proactor (0, 1); // Get the interface proactor ACE_Proactor proactor (&win32_proactor); // Put it as the instance. ACE_Proactor::instance (&proactor); // Open handler for remote peer communications this will run from // the main thread. Peer_Handler peer_handler (argc, argv); if (peer_handler.open () == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p open failed, errno = %d.\n", "peer_handler", errno), 0); // Open active object for reading from stdin. STDIN_Handler *stdin_handler = new STDIN_Handler (peer_handler); // Spawn thread. if (stdin_handler->open () == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p open failed, errno = %d.\n", "stdin_handler", errno), 0); // Register proactor with Reactor so that we can demultiplex // "waitable" events and I/O operations from a single thread. if (ACE_Reactor::instance ()->register_handler (ACE_Proactor::instance ()->implementation ()) != 0) ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n", argv[0]), -1); // Run main event demultiplexor. ACE_Reactor::run_event_loop (); // Remove proactor with Reactor. if (ACE_Reactor::instance ()->remove_handler (ACE_Proactor::instance ()->implementation (), ACE_Event_Handler::DONT_CALL) != 0) ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n", argv[0]), -1); return 0;}#else /* !ACE_HAS_WIN32_OVERLAPPED_IO */intACE_TMAIN (int , ACE_TCHAR *[]){ return 0;}#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -