📄 peer.cpp
字号:
// Reset the pointer to indicate we've got an entire event. this->msg_frag_ = 0; } ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) connection id = %d, cur len = %d, total bytes read = %d\n"), event->header_.connection_id_, event->header_.len_, data_received + header_received)); if (Options::instance ()->enabled (Options::VERBOSE)) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("data_ = %*s\n"), event->header_.len_ - 2, event->data_)); return data_received + header_received; }}// Receive various types of input (e.g., Peer event from the gatewayd,// as well as stdio).intPeer_Handler::handle_input (ACE_HANDLE sd){ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("in handle_input, sd = %d\n"), sd)); if (sd == ACE_STDIN) // Handle event from stdin. return this->transmit_stdin (); else // Perform the appropriate action depending on the state we are // in. return (this->*do_action_) ();}// Action that receives our connection id from the Gateway.intPeer_Handler::await_connection_id (void){ ssize_t n = this->peer ().recv (&this->connection_id_, sizeof this->connection_id_); if (n != sizeof this->connection_id_) { if (n == 0) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("gatewayd has closed down unexpectedly\n")), -1); else ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p, bytes received on handle %d = %d\n"), ACE_TEXT ("recv"), this->get_handle (), n), -1); } else { this->connection_id_ = ntohl (this->connection_id_); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("assigned connection id %d\n"), this->connection_id_)); } // Subscribe for events if we're a Consumer. if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR)) this->subscribe (); // No need to disconnect by timeout. ACE_Reactor::instance ()->cancel_timer(this); // Transition to the action that waits for Peer events. this->do_action_ = &Peer_Handler::await_events; // Reset standard input. ACE_OS::rewind (stdin); // Call register_stdin_handler only once, until the stdin-thread // closed which caused by transmit_stdin error. if (first_time_) { // Register this handler to receive test events on stdin. if (ACE_Event_Handler::register_stdin_handler (this, ACE_Reactor::instance (), ACE_Thread_Manager::instance ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("register_stdin_handler")), -1); // Next time in await_connection_id(), I'll don't call // register_stdin_handler(). first_time_ = 0; } return 0;}intPeer_Handler::subscribe (void){ ACE_Message_Block *mb; ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof (Event)), -1); Subscription *subscription = (Subscription *) ((Event *) mb->rd_ptr ())->data_; subscription->connection_id_ = Options::instance ()->connection_id (); return this->transmit (mb, sizeof *subscription, SUBSCRIPTION_EVENT);}// Action that receives events from the Gateway.intPeer_Handler::await_events (void){ ACE_Message_Block *mb = 0; ssize_t n = this->recv (mb); switch (n) { case 0: ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("gatewayd has closed down\n")), -1); /* NOTREACHED */ case -1: if (errno == EWOULDBLOCK) // A short-read, we'll come back and finish it up later on! return 0; else ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("recv")), -1); /* NOTREACHED */ default: { // We got a valid event, so let's process it now! At the // moment, we just print out the event contents... Event *event = (Event *) mb->rd_ptr (); this->total_bytes_ += mb->length (); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("route id = %d, cur len = %d, total len = %d\n"), event->header_.connection_id_, event->header_.len_, this->total_bytes_)); if (Options::instance ()->enabled (Options::VERBOSE)) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("data_ = %*s\n"), event->header_.len_ - 2, event->data_)); mb->release (); return 0; } }}// Periodically send events via ACE_Reactor timer mechanism.intPeer_Handler::handle_timeout (const ACE_Time_Value &, const void *){ // Shut down the handler. return this->handle_close ();}Peer_Handler::~Peer_Handler (void){ // Shut down the handler. this->handle_close ();}// Handle shutdown of the Peer object.intPeer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask){ if (this->get_handle () != ACE_INVALID_HANDLE) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("shutting down Peer on handle %d\n"), this->get_handle ())); ACE_Reactor_Mask mask = ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK; // Explicitly remove ourselves for ACE_STDIN (the <ACE_Reactor> // removes the HANDLE. Note that <ACE_Event_Handler::DONT_CALL> // instructs the ACE_Reactor *not* to call <handle_close>, which // would otherwise lead to infinite recursion!). ACE_Reactor::instance ()->remove_handler (ACE_STDIN, mask); // Deregister this handler with the ACE_Reactor. if (ACE_Reactor::instance ()->remove_handler (this, mask) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("handle = %d: %p\n"), this->get_handle (), ACE_TEXT ("remove_handler")), -1); // Close down the peer. this->peer ().close (); } return 0;}intPeer_Acceptor::start (u_short port){ // This object only gets allocated once and is just recycled // forever. ACE_NEW_RETURN (peer_handler_, Peer_Handler, -1); this->addr_.set (port); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("opening acceptor at port %d\n"), port)); // Call down to the <Acceptor::open> method. if (this->inherited::open (this->addr_) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("open")), -1); else if (this->acceptor ().get_local_addr (this->addr_) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("get_local_addr")), -1); else ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("accepting at port %d\n"), this->addr_.get_port_number ())); return 0;}Peer_Acceptor::Peer_Acceptor (void) : peer_handler_ (0){}intPeer_Acceptor::close (void){ // Will trigger a delete. if (this->peer_handler_ != 0) this->peer_handler_->destroy (); // Close down the base class. return this->inherited::close ();}// Note how this method just passes back the pre-allocated// <Peer_Handler> instead of having the <ACE_Acceptor> allocate a new// one each time!intPeer_Acceptor::make_svc_handler (Peer_Handler *&sh){ sh = this->peer_handler_; return 0;}intPeer_Connector::open_connector (Peer_Handler *&peer_handler, u_short port){ // This object only gets allocated once and is just recycled // forever. ACE_NEW_RETURN (peer_handler, Peer_Handler, -1); ACE_INET_Addr addr (port, Options::instance ()->connector_host ()); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("connecting to %s:%d\n"), addr.get_host_name (), addr.get_port_number ())); if (this->connect (peer_handler, addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("connect")), -1); else ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("connected to %C:%d\n"), addr.get_host_name (), addr.get_port_number ())); return 0;}intPeer_Connector::open (ACE_Reactor *, int){ this->supplier_peer_handler_ = 0; this->consumer_peer_handler_ = 0; if (Options::instance ()->enabled (Options::SUPPLIER_CONNECTOR) && this->open_connector (this->supplier_peer_handler_, Options::instance ()->supplier_connector_port ()) == -1) return -1; if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR) && this->open_connector (this->consumer_peer_handler_, Options::instance ()->consumer_connector_port ()) == -1) return -1; return 0;}intPeer_Factory::handle_signal (int signum, siginfo_t *, ucontext_t *){ if (signum != SIGPIPE) { // Shut down the main event loop. ACE_DEBUG((LM_NOTICE, ACE_TEXT ("Exit case signal\n"))); // Why do I exit? ACE_Reactor::instance ()->end_reactor_event_loop(); } return 0;}// Returns information on the currently active service.intPeer_Factory::info (ACE_TCHAR **strp, size_t length) const{ ACE_TCHAR buf[BUFSIZ]; ACE_TCHAR consumer_addr_str[BUFSIZ]; ACE_TCHAR supplier_addr_str[BUFSIZ]; ACE_INET_Addr addr; if (this->consumer_acceptor_.acceptor ().get_local_addr (addr) == -1) return -1; else if (addr.addr_to_string (consumer_addr_str, sizeof addr) == -1) return -1; else if (this->supplier_acceptor_.acceptor ().get_local_addr (addr) == -1) return -1; else if (addr.addr_to_string (supplier_addr_str, sizeof addr) == -1) return -1; ACE_OS::strcpy (buf, ACE_TEXT ("peerd\t C:")); ACE_OS::strcat (buf, consumer_addr_str); ACE_OS::strcat (buf, ACE_TEXT ("|S:")); ACE_OS::strcat (buf, supplier_addr_str); ACE_OS::strcat (buf, ACE_TEXT ("/tcp # Gateway traffic generator and data sink\n")); if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0) return -1; else ACE_OS::strncpy (*strp, buf, length); return ACE_OS::strlen (buf);}// Hook called by the explicit dynamic linking facility to terminate// the peer.intPeer_Factory::fini (void){ this->consumer_acceptor_.close (); this->supplier_acceptor_.close (); return 0;}// Hook called by the explicit dynamic linking facility to initialize// the peer.intPeer_Factory::init (int argc, ACE_TCHAR *argv[]){ Options::instance ()->parse_args (argc, argv); ACE_Sig_Set sig_set; sig_set.sig_add (SIGINT); sig_set.sig_add (SIGQUIT); sig_set.sig_add (SIGPIPE); // Register ourselves to receive signals so we can shut down // gracefully. if (ACE_Reactor::instance ()->register_handler (sig_set, this) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("register_handler")), -1); if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR) && this->supplier_acceptor_.start (Options::instance ()->supplier_acceptor_port ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("Acceptor::open")), -1); else if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR) && this->consumer_acceptor_.start (Options::instance ()->consumer_acceptor_port ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("Acceptor::open")), -1); else if (this->connector_.open () == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("Connector::open")), -1); return 0;}// The following is a "Factory" used by the <ACE_Service_Config> and// svc.conf file to dynamically initialize the <Peer_Acceptor> and// <Peer_Connector>.ACE_SVC_FACTORY_DEFINE (Peer_Factory)#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)template class ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>;template class ACE_Connector_Base<Peer_Handler>;template class ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR>;template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>;template class ACE_NonBlocking_Connect_Handler<Peer_Handler>;#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)#pragma instantiate ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>#pragma instantiate ACE_Connector_Base<Peer_Handler>#pragma instantiate ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR>#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>#pragma instantiate ACE_NonBlocking_Connect_Handler<Peer_Handler>#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -