⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 peer.cpp

📁 ACE自适配通信环境(ADAPTIVE Communication Environment)是可以自由使用、开放源码的面向对象(OO)框架(Framework)
💻 CPP
📖 第 1 页 / 共 2 页
字号:
          // Reset the pointer to indicate we've got an entire event.          this->msg_frag_ = 0;        }      ACE_DEBUG ((LM_DEBUG,                  ACE_TEXT ("(%t) connection id = %d, cur len = %d, total bytes read = %d\n"),                  event->header_.connection_id_,                  event->header_.len_,                  data_received + header_received));      if (Options::instance ()->enabled (Options::VERBOSE))        ACE_DEBUG ((LM_DEBUG,                    ACE_TEXT ("data_ = %*s\n"),                    event->header_.len_ - 2,                    event->data_));      return data_received + header_received;    }}// Receive various types of input (e.g., Peer event from the gatewayd,// as well as stdio).intPeer_Handler::handle_input (ACE_HANDLE sd){  ACE_DEBUG ((LM_DEBUG,              ACE_TEXT ("in handle_input, sd = %d\n"),              sd));  if (sd == ACE_STDIN) // Handle event from stdin.    return this->transmit_stdin ();  else    // Perform the appropriate action depending on the state we are    // in.    return (this->*do_action_) ();}// Action that receives our connection id from the Gateway.intPeer_Handler::await_connection_id (void){  ssize_t n = this->peer ().recv (&this->connection_id_,                                  sizeof this->connection_id_);  if (n != sizeof this->connection_id_)    {      if (n == 0)        ACE_ERROR_RETURN ((LM_ERROR,                           ACE_TEXT ("gatewayd has closed down unexpectedly\n")),                          -1);      else        ACE_ERROR_RETURN ((LM_ERROR,                           ACE_TEXT ("%p, bytes received on handle %d = %d\n"),                           ACE_TEXT ("recv"),                           this->get_handle (),                           n),                          -1);    }  else    {      this->connection_id_ = ntohl (this->connection_id_);      ACE_DEBUG ((LM_DEBUG,                  ACE_TEXT ("assigned connection id %d\n"),                  this->connection_id_));    }  // Subscribe for events if we're a Consumer.  if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR))    this->subscribe ();  // No need to disconnect by timeout.  ACE_Reactor::instance ()->cancel_timer(this);  // Transition to the action that waits for Peer events.  this->do_action_ = &Peer_Handler::await_events;  // Reset standard input.  ACE_OS::rewind (stdin);  // Call register_stdin_handler only once, until the stdin-thread  // closed which caused by transmit_stdin error.  if (first_time_)    {      // Register this handler to receive test events on stdin.      if (ACE_Event_Handler::register_stdin_handler          (this,           ACE_Reactor::instance (),           ACE_Thread_Manager::instance ()) == -1)        ACE_ERROR_RETURN ((LM_ERROR,                           ACE_TEXT ("(%t) %p\n"),                           ACE_TEXT ("register_stdin_handler")),                          -1);      // Next time in await_connection_id(), I'll don't call      // register_stdin_handler().      first_time_ = 0;    }  return 0;}intPeer_Handler::subscribe (void){  ACE_Message_Block *mb;  ACE_NEW_RETURN (mb,                  ACE_Message_Block (sizeof (Event)),                  -1);  Subscription *subscription =    (Subscription *) ((Event *) mb->rd_ptr ())->data_;  subscription->connection_id_ =    Options::instance ()->connection_id ();  return this->transmit (mb, sizeof *subscription, SUBSCRIPTION_EVENT);}// Action that receives events from the Gateway.intPeer_Handler::await_events (void){  ACE_Message_Block *mb = 0;  ssize_t n = this->recv (mb);  switch (n)    {    case 0:      ACE_ERROR_RETURN ((LM_ERROR,                         ACE_TEXT ("gatewayd has closed down\n")),                        -1);      /* NOTREACHED */    case -1:      if (errno == EWOULDBLOCK)        // A short-read, we'll come back and finish it up later on!        return 0;      else        ACE_ERROR_RETURN ((LM_ERROR,                           ACE_TEXT ("%p\n"),                           ACE_TEXT ("recv")),                          -1);      /* NOTREACHED */    default:      {        // We got a valid event, so let's process it now!  At the        // moment, we just print out the event contents...        Event *event = (Event *) mb->rd_ptr ();        this->total_bytes_ += mb->length ();        ACE_DEBUG ((LM_DEBUG,                    ACE_TEXT ("route id = %d, cur len = %d, total len = %d\n"),                    event->header_.connection_id_,                    event->header_.len_,                    this->total_bytes_));        if (Options::instance ()->enabled (Options::VERBOSE))          ACE_DEBUG ((LM_DEBUG,                      ACE_TEXT ("data_ = %*s\n"),                      event->header_.len_ - 2,                      event->data_));        mb->release ();        return 0;      }    }}// Periodically send events via ACE_Reactor timer mechanism.intPeer_Handler::handle_timeout (const ACE_Time_Value &,                              const void *){  // Shut down the handler.  return this->handle_close ();}Peer_Handler::~Peer_Handler (void){  // Shut down the handler.  this->handle_close ();}// Handle shutdown of the Peer object.intPeer_Handler::handle_close (ACE_HANDLE,                            ACE_Reactor_Mask){  if (this->get_handle () != ACE_INVALID_HANDLE)    {      ACE_DEBUG ((LM_DEBUG,                  ACE_TEXT ("shutting down Peer on handle %d\n"),                  this->get_handle ()));      ACE_Reactor_Mask mask =        ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK;      // Explicitly remove ourselves for ACE_STDIN (the <ACE_Reactor>      // removes the HANDLE.  Note that <ACE_Event_Handler::DONT_CALL>      // instructs the ACE_Reactor *not* to call <handle_close>, which      // would otherwise lead to infinite recursion!).      ACE_Reactor::instance ()->remove_handler        (ACE_STDIN, mask);      // Deregister this handler with the ACE_Reactor.      if (ACE_Reactor::instance ()->remove_handler          (this, mask) == -1)        ACE_ERROR_RETURN ((LM_ERROR,                           ACE_TEXT ("handle = %d: %p\n"),                           this->get_handle (),                           ACE_TEXT ("remove_handler")),                          -1);      // Close down the peer.      this->peer ().close ();    }  return 0;}intPeer_Acceptor::start (u_short port){  // This object only gets allocated once and is just recycled  // forever.  ACE_NEW_RETURN (peer_handler_, Peer_Handler, -1);  this->addr_.set (port);  ACE_DEBUG ((LM_DEBUG,              ACE_TEXT ("opening acceptor at port %d\n"),              port));  // Call down to the <Acceptor::open> method.  if (this->inherited::open (this->addr_) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("open")),                      -1);  else if (this->acceptor ().get_local_addr (this->addr_) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("get_local_addr")),                      -1);  else    ACE_DEBUG ((LM_DEBUG,                ACE_TEXT ("accepting at port %d\n"),                this->addr_.get_port_number ()));  return 0;}Peer_Acceptor::Peer_Acceptor (void)  : peer_handler_ (0){}intPeer_Acceptor::close (void){  // Will trigger a delete.  if (this->peer_handler_ != 0)    this->peer_handler_->destroy ();  // Close down the base class.  return this->inherited::close ();}// Note how this method just passes back the pre-allocated// <Peer_Handler> instead of having the <ACE_Acceptor> allocate a new// one each time!intPeer_Acceptor::make_svc_handler (Peer_Handler *&sh){  sh = this->peer_handler_;  return 0;}intPeer_Connector::open_connector (Peer_Handler *&peer_handler,                                u_short port){  // This object only gets allocated once and is just recycled  // forever.  ACE_NEW_RETURN (peer_handler,                  Peer_Handler,                  -1);  ACE_INET_Addr addr (port,                      Options::instance ()->connector_host ());  ACE_DEBUG ((LM_DEBUG,              ACE_TEXT ("connecting to %s:%d\n"),              addr.get_host_name (),              addr.get_port_number ()));  if (this->connect (peer_handler, addr) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("connect")),                      -1);  else    ACE_DEBUG ((LM_DEBUG,                ACE_TEXT ("connected to %C:%d\n"),                addr.get_host_name (),                addr.get_port_number ()));  return 0;}intPeer_Connector::open (ACE_Reactor *, int){  this->supplier_peer_handler_ = 0;  this->consumer_peer_handler_ = 0;  if (Options::instance ()->enabled (Options::SUPPLIER_CONNECTOR)      && this->open_connector (this->supplier_peer_handler_,                               Options::instance ()->supplier_connector_port ()) == -1)    return -1;  if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR)      && this->open_connector (this->consumer_peer_handler_,                               Options::instance ()->consumer_connector_port ()) == -1)    return -1;  return 0;}intPeer_Factory::handle_signal (int signum, siginfo_t *, ucontext_t *){  if (signum != SIGPIPE)  {    // Shut down the main event loop.    ACE_DEBUG((LM_NOTICE, ACE_TEXT ("Exit case signal\n"))); // Why do I exit?    ACE_Reactor::instance ()->end_reactor_event_loop();  }  return 0;}// Returns information on the currently active service.intPeer_Factory::info (ACE_TCHAR **strp, size_t length) const{  ACE_TCHAR buf[BUFSIZ];  ACE_TCHAR consumer_addr_str[BUFSIZ];  ACE_TCHAR supplier_addr_str[BUFSIZ];  ACE_INET_Addr addr;  if (this->consumer_acceptor_.acceptor ().get_local_addr (addr) == -1)    return -1;  else if (addr.addr_to_string (consumer_addr_str,                                sizeof addr) == -1)    return -1;  else if (this->supplier_acceptor_.acceptor ().get_local_addr (addr) == -1)    return -1;  else if (addr.addr_to_string (supplier_addr_str,                                sizeof addr) == -1)    return -1;  ACE_OS::strcpy (buf, ACE_TEXT ("peerd\t C:"));  ACE_OS::strcat (buf, consumer_addr_str);  ACE_OS::strcat (buf, ACE_TEXT ("|S:"));  ACE_OS::strcat (buf, supplier_addr_str);  ACE_OS::strcat    (buf, ACE_TEXT ("/tcp # Gateway traffic generator and data sink\n"));  if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)    return -1;  else    ACE_OS::strncpy (*strp, buf, length);  return ACE_OS::strlen (buf);}// Hook called by the explicit dynamic linking facility to terminate// the peer.intPeer_Factory::fini (void){  this->consumer_acceptor_.close ();  this->supplier_acceptor_.close ();  return 0;}// Hook called by the explicit dynamic linking facility to initialize// the peer.intPeer_Factory::init (int argc, ACE_TCHAR *argv[]){  Options::instance ()->parse_args (argc, argv);  ACE_Sig_Set sig_set;  sig_set.sig_add (SIGINT);  sig_set.sig_add (SIGQUIT);  sig_set.sig_add (SIGPIPE);  // Register ourselves to receive signals so we can shut down  // gracefully.  if (ACE_Reactor::instance ()->register_handler (sig_set,                                                  this) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("register_handler")),                      -1);  if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR)      && this->supplier_acceptor_.start      (Options::instance ()->supplier_acceptor_port ()) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("Acceptor::open")),                      -1);  else if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR)           && this->consumer_acceptor_.start           (Options::instance ()->consumer_acceptor_port ()) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("Acceptor::open")),                      -1);  else if (this->connector_.open () == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("Connector::open")),                      -1);  return 0;}// The following is a "Factory" used by the <ACE_Service_Config> and// svc.conf file to dynamically initialize the <Peer_Acceptor> and// <Peer_Connector>.ACE_SVC_FACTORY_DEFINE (Peer_Factory)#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)template class ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>;template class ACE_Connector_Base<Peer_Handler>;template class ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR>;template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>;template class ACE_NonBlocking_Connect_Handler<Peer_Handler>;#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)#pragma instantiate ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>#pragma instantiate ACE_Connector_Base<Peer_Handler>#pragma instantiate ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR>#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>#pragma instantiate ACE_NonBlocking_Connect_Handler<Peer_Handler>#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -