⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 concrete_connection_handlers.cpp

📁 ACE自适配通信环境(ADAPTIVE Communication Environment)是可以自由使用、开放源码的面向对象(OO)框架(Framework)
💻 CPP
📖 第 1 页 / 共 2 页
字号:
  ssize_t data_bytes_left_to_read =    ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));  ssize_t data_received =    !data_bytes_left_to_read    ? 0 // peer().recv() should not be called when data_bytes_left_to_read is 0.    : this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read);  // Try to receive the remainder of the event.  switch (data_received)    {    case -1:      if (errno == EWOULDBLOCK)        // This might happen if only the header came through.        return -1;      else        /* FALLTHROUGH */;    case 0: // Premature EOF.      if (data_bytes_left_to_read)        {          this->msg_frag_ = this->msg_frag_->release ();          return 0;        }      /* FALLTHROUGH */;    default:      // Set the write pointer at 1 past the end of the event.      this->msg_frag_->wr_ptr (data_received);      if (data_received != data_bytes_left_to_read)        {          errno = EWOULDBLOCK;          // Inform caller that we didn't get the whole event.          return -1;        }      else        {          // Set the read pointer to the beginning of the event.          this->msg_frag_->rd_ptr (this->msg_frag_->base ());          // Allocate an event forwarding header and chain the data          // portion onto its continuation field.          forward_addr = new ACE_Message_Block (sizeof (Event_Key),                                                ACE_Message_Block::MB_PROTO,                                                this->msg_frag_,                                                0,                                                0,                                                Options::instance ()->locking_strategy ());          if (forward_addr == 0)            {              this->msg_frag_ = this->msg_frag_->release ();              errno = ENOMEM;              return -1;            }          Event_Key event_addr (this->connection_id (),                                event->header_.type_);          // Copy the forwarding address from the Event_Key into          // forward_addr.          forward_addr->copy ((char *) &event_addr, sizeof (Event_Key));          // Reset the pointer to indicate we've got an entire event.          this->msg_frag_ = 0;        }      this->total_bytes (data_received + header_received);      ACE_DEBUG ((LM_DEBUG,                  "(%t) connection id = %d, cur len = %d, total bytes read = %d\n",                  event->header_.connection_id_,                  event->header_.len_,                  data_received + header_received));      if (Options::instance ()->enabled (Options::VERBOSE))        ACE_DEBUG ((LM_DEBUG,                    "data_ = %*s\n",                    event->header_.len_ - 2,                    event->data_));      // Encode before returning so that we can set things out in      // network byte order.      event->header_.encode ();      return data_received + header_received;    }}// Receive various types of input (e.g., Peer event from the gatewayd,// as well as stdio).intSupplier_Handler::handle_input (ACE_HANDLE){  ACE_Message_Block *event_key = 0;  switch (this->recv (event_key))    {    case 0:      // Note that a peer shouldn't initiate a shutdown by closing the      // connection.  Therefore, the peer must have crashed, so we'll      // need to bail out here and let the higher layers reconnect.      this->state (Connection_Handler::FAILED);      ACE_ERROR_RETURN ((LM_ERROR,                        "(%t) Peer has closed down unexpectedly for Input Connection_Handler %d\n",                        this->connection_id ()),                        -1);      /* NOTREACHED */    case -1:      if (errno == EWOULDBLOCK)        // A short-read, we'll come back and finish it up later on!        return 0;      else // A weird problem occurred, shut down and start again.        {          this->state (Connection_Handler::FAILED);          ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Connection_Handler %d\n",                             "Peer has failed unexpectedly",                             this->connection_id ()),                            -1);        }      /* NOTREACHED */    default:      // Route messages to Consumers.      return this->process (event_key);    }}// This delegates to the <Event_Channel> to do the actual processing.// Typically, this forwards the event to its appropriate Consumer(s).intSupplier_Handler::process (ACE_Message_Block *event_key){  return this->event_channel_->put (event_key);}Thr_Consumer_Handler::Thr_Consumer_Handler (const Connection_Config_Info &pci)  : Consumer_Handler (pci){  // It is not in thread svc() now.  in_thread_ = 0;}// Overriding handle_close() method.  If in thread svc(), no need to// process handle_close() when call peer().close(), because the// connection is blocked now.intThr_Consumer_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m){  if (in_thread_)    return 0;  else    return Consumer_Handler::handle_close (h, m);}// This method should be called only when the Consumer shuts down// unexpectedly.  This method marks the Connection_Handler as having// failed and deactivates the ACE_Message_Queue (to wake up the thread// blocked on <dequeue_head> in svc()).// Thr_Consumer_Handler::handle_close () will eventually try to// reconnect...// Let Consumer_Handler receive normal data.intThr_Consumer_Handler::handle_input (ACE_HANDLE h){  // Call down to the <Consumer_Handler> to handle this first.  if (this->Consumer_Handler::handle_input (h) != 0)    {      // Only do such work when failed.      ACE_Reactor::instance ()->remove_handler        (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);      // Deactivate the queue while we try to get reconnected.      this->msg_queue ()->deactivate ();      // Will call handle_close.      return -1;    }  return 0;}// Initialize the threaded Consumer_Handler object and spawn a new// thread.intThr_Consumer_Handler::open (void *){  // Turn off non-blocking I/O.  if (this->peer ().disable (ACE_NONBLOCK) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       "(%t) %p\n",                       "disable"),                      -1); // Incorrect info fixed.  // Call back to the <Event_Channel> to complete our initialization.  else if (this->event_channel_->complete_connection_connection (this) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       "(%t) %p\n",                       "complete_connection_connection"),                      -1);  // Register ourselves to receive input events (which indicate that  // the Consumer has shut down unexpectedly).  else if (ACE_Reactor::instance ()->register_handler      (this, ACE_Event_Handler::READ_MASK) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       "(%t) %p\n",                       "register_handler"),                      -1);  // Reactivate message queue.  If it was active then this is the  // first time in and we need to spawn a thread, otherwise the queue  // was inactive due to some problem and we've already got a thread.  else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE)    {      ACE_DEBUG ((LM_DEBUG,                  "(%t) spawning new thread\n"));      // Become an active object by spawning a new thread to transmit      // events to Consumers.      return this->activate (THR_NEW_LWP | THR_DETACHED);    }  else    {      ACE_DEBUG ((LM_DEBUG,                  "(%t) reusing existing thread\n"));      return 0;    }}// Queue up an event for transmission (must not block since// Supplier_Handlers may be single-threaded).intThr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *){  // Perform non-blocking enqueue, i.e., if <msg_queue> is full  // *don't* block!  return this->msg_queue ()->enqueue_tail    (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);}// Transmit events to the peer.  Note the simplification resulting// from the use of threads, compared with the Reactive solution.intThr_Consumer_Handler::svc (void){  for (in_thread_ = 1;;)    {      ACE_DEBUG ((LM_DEBUG,                  "(%t) Thr_Consumer_Handler's handle = %d\n",                  this->peer ().get_handle ()));      // Since this method runs in its own thread it is OK to block on      // output.      for (ACE_Message_Block *mb = 0;           this->msg_queue ()->dequeue_head (mb) != -1;           )        if (this->send (mb) == -1)          ACE_ERROR ((LM_ERROR,                      "(%t) %p\n",                      "send failed"));      ACE_ASSERT (errno == ESHUTDOWN);      ACE_DEBUG ((LM_DEBUG,                  "(%t) shutting down threaded Consumer_Handler %d on handle %d\n",                  this->connection_id (),                  this->get_handle ()));      this->peer ().close ();      // Re-establish the connection, using exponential backoff.      for (this->timeout (1);           // Default is to reconnect synchronously.           this->event_channel_->initiate_connection_connection (this, 1) == -1;           // Second parameter '1' means using sync mode directly,           // don't care Options::blocking_semantics().  If don't do           // so, async mode will be used to connect which won't           // satisfy original design.           )        {          ACE_Time_Value tv (this->timeout ());          ACE_ERROR ((LM_ERROR,                      "(%t) reattempting connection, sec = %d\n",                      tv.sec ()));          ACE_OS::sleep (tv);        }    }  ACE_NOTREACHED (return 0;)}Thr_Supplier_Handler::Thr_Supplier_Handler (const Connection_Config_Info &pci)  : Supplier_Handler (pci){  // It is not in thread svc() now.  in_thread_ = 0;}// Overriding handle_close() method.  If in thread svc(), no need to// process handle_close() when call peer().close(), because the// connection is blocked now.intThr_Supplier_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m){  if (in_thread_)    return 0;  else    return Supplier_Handler::handle_close (h, m);}intThr_Supplier_Handler::open (void *){  // Turn off non-blocking I/O.  if (this->peer ().disable (ACE_NONBLOCK) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       "(%t) %p\n",                       "disable"),                      -1); // Incorrect info fixed.  // Call back to the <Event_Channel> to complete our initialization.  else if (this->event_channel_->complete_connection_connection (this) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       "(%t) %p\n",                       "complete_connection_connection"),                      -1);  // Reactivate message queue.  If it was active then this is the  // first time in and we need to spawn a thread, otherwise the queue  // was inactive due to some problem and we've already got a thread.  else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE)    {      ACE_DEBUG ((LM_DEBUG,                  "(%t) spawning new thread\n"));      // Become an active object by spawning a new thread to transmit      // events to peers.      return this->activate (THR_NEW_LWP | THR_DETACHED);    }  else    {      ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));      return 0;    }}// Receive events from a Peer in a separate thread (note reuse of// existing code!).intThr_Supplier_Handler::svc (void){  for (in_thread_ = 1;;)    {      ACE_DEBUG ((LM_DEBUG,                  "(%t) Thr_Supplier_Handler's handle = %d\n",                 this->peer ().get_handle ()));      // Since this method runs in its own thread and processes events      // for one connection it is OK to call down to the      // <Supplier_Handler::handle_input> method, which blocks on      // input.      while (this->Supplier_Handler::handle_input () != -1)        continue;      ACE_DEBUG ((LM_DEBUG,                  "(%t) shutting down threaded Supplier_Handler %d on handle %d\n",                  this->connection_id (),                  this->get_handle ()));      this->peer ().close ();      // Deactivate the queue while we try to get reconnected.      this->msg_queue ()->deactivate ();      // Re-establish the connection, using expoential backoff.      for (this->timeout (1);           // Default is to reconnect synchronously.           this->event_channel_->initiate_connection_connection (this, 1) == -1;           // Second parameter '1' means using sync mode directly,           // don't care Options::blocking_semantics().  If don't do           // so, async mode will be used to connect which won't           // satisfy original design.           )        {          ACE_Time_Value tv (this->timeout ());          ACE_ERROR ((LM_ERROR,                      "(%t) reattempting connection, sec = %d\n",                      tv.sec ()));          ACE_OS::sleep (tv);        }    }  ACE_NOTREACHED(return 0);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -