📄 talker.cpp
字号:
int
Peer_Handler::open (void *)
{
if (host_ != 0) // Connector
{
ACE_INET_Addr addr (port_, host_);
ACE_SOCK_Connector connector;
// Establish connection with server.
if (connector.connect (stream_, addr) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1);
ACE_DEBUG ((LM_DEBUG, "(%t) connected.\n"));
}
else // Acceptor
{
ACE_SOCK_Acceptor acceptor;
ACE_INET_Addr local_addr (port_);
if ((acceptor.open (local_addr) == -1) ||
(acceptor.accept (this->stream_) == -1))
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1);
ACE_DEBUG ((LM_DEBUG, "(%t) accepted.\n"));
}
int result = this->rd_stream_.open (*this);
if (result != 0)
return result;
result = this->wr_stream_.open (*this);
if (result != 0)
return result;
result = this->rd_stream_.read (this->mb_,
this->mb_.size ());
return result;
}
// One of our asynchronous writes to the remote peer has completed.
// Make sure it succeeded and then delete the message.
void
Peer_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
if (result.bytes_transferred () <= 0)
ACE_DEBUG ((LM_DEBUG, "(%t) %p bytes = %d\n", "Message failed",
result.bytes_transferred ()));
// This was allocated by the STDIN_Handler, queued, dequeued, passed
// to the proactor, and now passed back to us.
result.message_block ().release ();
}
// The remote peer has sent us something. If it succeeded, print
// out the message and reinitiate a read. Otherwise, fail. In both
// cases, delete the message sent.
void
Peer_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
if (result.bytes_transferred () > 0 &&
this->mb_.length () > 0)
{
this->mb_.rd_ptr ()[result.bytes_transferred ()] = '\0';
// Print out the message received from the server.
ACE_DEBUG ((LM_DEBUG, "%s", this->mb_.rd_ptr ()));
}
else
{
// If a read failed, we will assume it's because the remote peer
// went away. We will end the event loop. Since we're in the
// main thread, we don't need to do a notify.
ACE_Reactor::end_event_loop();
return;
}
// Reset pointers
this->mb_.wr_ptr (this->mb_.wr_ptr () - result.bytes_transferred ());
// Start off another read
if (this->rd_stream_.read (this->mb_,
this->mb_.size ()) == -1)
ACE_ERROR ((LM_ERROR, "%p Read initiate.\n", "Peer_Handler"));
}
// This is so the Proactor can get our handle.
ACE_HANDLE
Peer_Handler::handle (void) const
{
return this->stream_.get_handle ();
}
void
Peer_Handler::handle (ACE_HANDLE handle)
{
this->stream_.set_handle (handle);
}
// We've been removed from the Reactor.
int
Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
{
ACE_DEBUG ((LM_DEBUG, "(%t) Peer_Handler closing down\n"));
return 0;
}
// New stuff added to the message queue. Try to dequeue a message.
int
Peer_Handler::handle_output (ACE_HANDLE)
{
ACE_Message_Block *mb;
ACE_Time_Value tv (ACE_Time_Value::zero);
// Forward the message to the remote peer receiver.
if (this->getq (mb, &tv) != -1)
{
if (this->wr_stream_.write (*mb,
mb->length ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"), -1);
}
return 0;
}
void
STDIN_Handler::handler (int signum)
{
ACE_DEBUG ((LM_DEBUG, "(%t) signal = %S\n", signum));
}
STDIN_Handler::STDIN_Handler (MT_TASK &ph)
: ph_ (ph)
{
// Register for ^C from the console. We just need to catch the
// exception so that the kernel doesn't kill our process.
// Registering this signal handler just tells the kernel that we
// know what we're doing; to leave us alone.
ACE_OS::signal (SIGINT, (ACE_SignalHandler) STDIN_Handler::handler);
};
// Activate object.
int
STDIN_Handler::open (void *)
{
if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), -1);
return 0;
}
// Shut down.
int
STDIN_Handler::close (u_long)
{
ACE_DEBUG ((LM_DEBUG, "(%t) thread is exiting.\n"));
return 0;
}
// Thread runs here.
int
STDIN_Handler::svc (void)
{
this->register_thread_exit_hook ();
for (;;)
{
ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ);
// Read from stdin into mb.
int read_result = ACE_OS::read (ACE_STDIN,
mb->rd_ptr (),
mb->size ());
// If read succeeds, put mb to peer handler, else end the loop.
if (read_result > 0)
{
mb->wr_ptr (read_result);
// Note that this call will first enqueue mb onto the peer
// handler's message queue, which will then turn around and
// notify the Reactor via the Notification_Strategy. This
// will subsequently signal the Peer_Handler, which will
// react by calling back to its handle_output() method,
// which dequeues the message and sends it to the peer
// across the network.
this->ph_.putq (mb);
}
else
{
mb->release ();
break;
}
}
// handle_signal will get called on the main proactor thread since
// we just exited and the main thread is waiting on our thread exit.
return 0;
}
// Register an exit hook with the reactor.
void
STDIN_Handler::register_thread_exit_hook (void)
{
// Get a real handle to our thread.
ACE_Thread_Manager::instance ()->thr_self (this->thr_handle_);
// Register ourselves to get called back when our thread exits.
if (ACE_Reactor::instance ()->
register_handler (this, this->thr_handle_) == -1)
ACE_ERROR ((LM_ERROR, "Exit_Hook Register failed.\n"));
}
// The STDIN thread has exited. This means the user hit ^C. We can
// end the event loop and delete ourself.
int
STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *)
{
if (si != 0)
{
ACE_ASSERT (this->thr_handle_ == si->si_handle_);
ACE_Reactor::end_event_loop ();
}
return 0;
}
int
STDIN_Handler::handle_close (ACE_HANDLE,
ACE_Reactor_Mask)
{
delete this;
return 0;
}
int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
// Let the proactor know that it will be used with Reactor
// Create specific proactor
ACE_WIN32_Proactor win32_proactor (0, 1);
// Get the interface proactor
ACE_Proactor proactor (&win32_proactor);
// Put it as the instance.
ACE_Proactor::instance (&proactor);
// Open handler for remote peer communications this will run from
// the main thread.
Peer_Handler peer_handler (argc, argv);
if (peer_handler.open () == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p open failed, errno = %d.\n",
"peer_handler", errno), 0);
// Open active object for reading from stdin.
STDIN_Handler *stdin_handler =
new STDIN_Handler (peer_handler);
// Spawn thread.
if (stdin_handler->open () == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p open failed, errno = %d.\n",
"stdin_handler", errno), 0);
// Register proactor with Reactor so that we can demultiplex
// "waitable" events and I/O operations from a single thread.
if (ACE_Reactor::instance ()->register_handler
(ACE_Proactor::instance ()->implementation ()) != 0)
ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n",
argv[0]), -1);
// Run main event demultiplexor.
ACE_Reactor::run_event_loop ();
// Remove proactor with Reactor.
if (ACE_Reactor::instance ()->remove_handler
(ACE_Proactor::instance ()->implementation (), ACE_Event_Handler::DONT_CALL) != 0)
ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n",
argv[0]), -1);
return 0;
}
#else /* !ACE_WIN32 */
int
ACE_TMAIN (int , ACE_TCHAR *[])
{
return 0;
}
#endif /* ACE_WIN32 */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -