⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 concrete_connection_handlers.cpp

📁 ACE自适配通信环境(ADAPTIVE Communication Environment)是可以自由使用、开放源码的面向对象(OO)框架(Framework)
💻 CPP
📖 第 1 页 / 共 2 页
字号:
// Concrete_Connection_Handlers.cpp,v 4.19 2005/05/20 15:10:27 schmidt Exp#define ACE_BUILD_SVC_DLL#include "ace/OS_NS_unistd.h"#include "Event_Channel.h"#include "Concrete_Connection_Handlers.h"ACE_RCSID(Gateway, Concrete_Connection_Handlers, "Concrete_Connection_Handlers.cpp,v 4.19 2005/05/20 15:10:27 schmidt Exp")Consumer_Handler::Consumer_Handler (const Connection_Config_Info &pci)  : Connection_Handler (pci){  this->connection_role_ = 'C';  this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ());}// This method should be called only when the Consumer shuts down// unexpectedly.  This method simply marks the Connection_Handler as// having failed so that handle_close () can reconnect.// Do not close handler when received data successfully.// Consumer_Handler should could process received data.// For example, Consumer could send reply-event to Supplier.intConsumer_Handler::handle_input (ACE_HANDLE){  // Do not set FAILED state at here, just at real failed place.  char buf[BUFSIZ];  ssize_t received = this->peer ().recv (buf, sizeof buf);  switch (received)    {    case -1:      this->state (Connection_Handler::FAILED);      ACE_ERROR_RETURN ((LM_ERROR,                        "(%t) Peer has failed unexpectedly for Consumer_Handler %d\n",                        this->connection_id ()),                        -1);      /* NOTREACHED */    case 0:      this->state (Connection_Handler::FAILED);      ACE_ERROR_RETURN ((LM_ERROR,                        "(%t) Peer has shutdown unexpectedly for Consumer_Handler %d\n",                        this->connection_id ()),                        -1);      /* NOTREACHED */    default:      ACE_ERROR_RETURN ((LM_ERROR,                        "(%t) IGNORED: Consumer is erroneously sending input to Consumer_Handler %d\n"                        "data size = %d\n",                        this->connection_id (),                        received),                        0); // Return 0 to identify received data successfully.      /* NOTREACHED */    }}// Perform a non-blocking put() of event.  If we are unable to send// the entire event the remainder is re-queued at the *front* of the// Event_List.intConsumer_Handler::nonblk_put (ACE_Message_Block *event){  // Try to send the event.  If we don't send it all (e.g., due to  // flow control), then re-queue the remainder at the head of the  // Event_List and ask the ACE_Reactor to inform us (via  // handle_output()) when it is possible to try again.  ssize_t n = this->send (event);  if (n == -1)    {      // -1 is returned only when things have really gone wrong (i.e.,      // not when flow control occurs).  Thus, let's try to close down      // and set up a new reconnection by calling handle_close().      this->state (Connection_Handler::FAILED);      this->handle_close ();      return -1;    }  else if (errno == EWOULDBLOCK)     {      // We didn't manage to send everything, so we need to queue      // things up.      ACE_DEBUG ((LM_DEBUG,                  "(%t) queueing activated on handle %d to routing id %d\n",                  this->get_handle (),                  this->connection_id ()));      // ACE_Queue in *front* of the list to preserve order.      if (this->msg_queue ()->enqueue_head          (event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)        ACE_ERROR_RETURN ((LM_ERROR,                           "(%t) %p\n",                           "enqueue_head"),                          -1);      // Tell ACE_Reactor to call us back when we can send again.      else if (ACE_Reactor::instance ()->schedule_wakeup               (this, ACE_Event_Handler::WRITE_MASK) == -1)        ACE_ERROR_RETURN ((LM_ERROR,                           "(%t) %p\n",                           "schedule_wakeup"),                          -1);      return 0;    }  else     return n;}ssize_tConsumer_Handler::send (ACE_Message_Block *event){  ACE_DEBUG ((LM_DEBUG,              "(%t) sending %d bytes to Consumer %d\n",              event->length (),              this->connection_id ()));  ssize_t len = event->length ();  ssize_t n = this->peer ().send (event->rd_ptr (), len);  if (n <= 0)    return errno == EWOULDBLOCK ? 0 : n;  else if (n < len)    {      // Re-adjust pointer to skip over the part we did send.      event->rd_ptr (n);      errno = EWOULDBLOCK;    }  else // if (n == length)    {      // The whole event is sent, we now decrement the reference count      // (which deletes itself with it reaches 0).      event->release ();      errno = 0;    }  this->total_bytes (n);  return n;}// Finish sending an event when flow control conditions abate.// This method is automatically called by the ACE_Reactor.intConsumer_Handler::handle_output (ACE_HANDLE){  ACE_Message_Block *event = 0;  ACE_DEBUG ((LM_DEBUG,              ACE_TEXT("(%t) Receiver signalled 'resume transmission' %d\n"),              this->get_handle ()));  // WIN32 Notes: When the receiver blocked, we started adding to the  // consumer handler's message Q. At this time, we registered a  // callback with the reactor to tell us when the TCP layer signalled  // that we could continue to send messages to the consumer. However,  // Winsock only sends this notification ONCE, so we have to assume  // at the application level, that we can continue to send until we  // get any subsequent blocking signals from the receiver's buffer.#if defined (ACE_WIN32)  // Win32 Winsock doesn't trigger multiple "You can write now"  // signals, so we have to assume that we can continue to write until  // we get another EWOULDBLOCK.  // We cancel the wakeup callback we set earlier.  if (ACE_Reactor::instance ()->cancel_wakeup      (this, ACE_Event_Handler::WRITE_MASK) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("(%t) %p\n"),                       ACE_TEXT ("Error in ACE_Reactor::cancel_wakeup()")),                      -1);  // The list had better not be empty, otherwise there's a bug!  while (this->msg_queue ()->dequeue_head         (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)    {      switch (this->nonblk_put (event))        {        case -1:                // Error sending message to consumer.          {            // We are responsible for releasing an ACE_Message_Block if            // failures occur.            event->release ();            ACE_ERROR ((LM_ERROR,                        ACE_TEXT ("(%t) %p\n"),                        ACE_TEXT ("transmission failure")));            break;          }        case 0:                 // Partial Send - we got flow controlled by the receiver          {            ACE_DEBUG ((LM_DEBUG,                        ACE_TEXT ("%D Partial Send due to flow control")                        ACE_TEXT ("- scheduling new wakeup with reactor\n")));            // Re-schedule a wakeup call from the reactor when the            // flow control conditions abate.            if (ACE_Reactor::instance ()->schedule_wakeup                (this,                 ACE_Event_Handler::WRITE_MASK) == -1)              ACE_ERROR_RETURN ((LM_ERROR,                                 ACE_TEXT ("(%t) %p\n"),                                 ACE_TEXT ("Error in ACE_Reactor::schedule_wakeup()")),                                -1);            // Didn't write everything this time, come back later...            return 0;          }        default:                // Sent the whole thing          {            ACE_DEBUG ((LM_DEBUG,                        ACE_TEXT ("Sent message from message Q, Q size = %d\n"),                        this->msg_queue()->message_count ()));            break;          }        }    }  // If we drop out of the while loop, then the message Q should be  // empty...or there's a problem in the dequeue_head() call...but  // thats another story.  ACE_DEBUG ((LM_DEBUG,              ACE_TEXT ("%D Sent all messages from consumers message Q\n")));  ACE_DEBUG ((LM_DEBUG,              ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"),              this->get_handle (),              this->connection_id ()));#else /* !defined (ACE_WIN32) */  // The list had better not be empty, otherwise there's a bug!  if (this->msg_queue ()->dequeue_head      (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)    {      switch (this->nonblk_put (event))        {        case 0:           // Partial send.          ACE_ASSERT (errno == EWOULDBLOCK);          ACE_DEBUG ((LM_DEBUG,                      ACE_TEXT ("%D Partial Send\n")));          // Didn't write everything this time, come back later...          break;        case -1:          // We are responsible for releasing an ACE_Message_Block if          // failures occur.          event->release ();          ACE_ERROR ((LM_ERROR,                      ACE_TEXT ("(%t) %p\n"),                      ACE_TEXT ("transmission failure")));          /* FALLTHROUGH */        default: // Sent the whole thing.          // If we succeed in writing the entire event (or we did not          // fail due to EWOULDBLOCK) then check if there are more          // events on the Message_Queue.  If there aren't, tell the          // ACE_Reactor not to notify us anymore (at least until          // there are new events queued up).          ACE_DEBUG ((LM_DEBUG,                      ACE_TEXT ("QQQ::Sent Message from consumer's Q\n")));          if (this->msg_queue ()->is_empty ())            {              ACE_DEBUG ((LM_DEBUG,                          ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"),                          this->get_handle (),                          this->connection_id ()));              if (ACE_Reactor::instance ()->cancel_wakeup                  (this, ACE_Event_Handler::WRITE_MASK) == -1)                ACE_ERROR ((LM_ERROR,                            ACE_TEXT ("(%t) %p\n"),                            ACE_TEXT ("cancel_wakeup")));            }        }    }  else    ACE_ERROR ((LM_ERROR,                ACE_TEXT ("(%t) %p\n"),                ACE_TEXT ("dequeue_head - handle_output called by reactor but nothing in Q")));#endif /* ACE_WIN32 */  return 0;}// Send an event to a Consumer (may queue if necessary).intConsumer_Handler::put (ACE_Message_Block *event,                       ACE_Time_Value *){  if (this->msg_queue ()->is_empty ())    // Try to send the event *without* blocking!    return this->nonblk_put (event);  else    // If we have queued up events due to flow control then just    // enqueue and return.    return this->msg_queue ()->enqueue_tail      (event, (ACE_Time_Value *) &ACE_Time_Value::zero);}Supplier_Handler::Supplier_Handler (const Connection_Config_Info &pci)  : Connection_Handler (pci),    msg_frag_ (0){  this->connection_role_ = 'S';  this->msg_queue ()->high_water_mark (0);}// Receive an Event from a Supplier.  Handles fragmentation.//// The event returned from recv consists of two parts://// 1. The Address part, contains the "virtual" routing id.//// 2. The Data part, which contains the actual data to be forwarded.//// The reason for having two parts is to shield the higher layers// of software from knowledge of the event structure.intSupplier_Handler::recv (ACE_Message_Block *&forward_addr){  if (this->msg_frag_ == 0)    // No existing fragment...    ACE_NEW_RETURN (this->msg_frag_,                    ACE_Message_Block (sizeof (Event),                                       ACE_Message_Block::MB_DATA,                                       0,                                       0,                                       0,                                       Options::instance ()->locking_strategy ()),                    -1);  Event *event = (Event *) this->msg_frag_->rd_ptr ();  ssize_t header_received = 0;  const size_t HEADER_SIZE = sizeof (Event_Header);  ssize_t header_bytes_left_to_read =    HEADER_SIZE - this->msg_frag_->length ();  if (header_bytes_left_to_read > 0)    {      header_received = this->peer ().recv        (this->msg_frag_->wr_ptr (), header_bytes_left_to_read);      if (header_received == -1 /* error */          || header_received == 0  /* EOF */)        {          ACE_ERROR ((LM_ERROR, "%p\n",                      "Recv error during header read "));          ACE_DEBUG ((LM_DEBUG,                      "attempted to read %d\n",                      header_bytes_left_to_read));          this->msg_frag_ = this->msg_frag_->release ();          return header_received;        }      // Bump the write pointer by the amount read.      this->msg_frag_->wr_ptr (header_received);      // At this point we may or may not have the ENTIRE header.      if (this->msg_frag_->length () < HEADER_SIZE)        {          ACE_DEBUG ((LM_DEBUG,                      "Partial header received: only %d bytes\n",                     this->msg_frag_->length ()));          // Notify the caller that we didn't get an entire event.          errno = EWOULDBLOCK;          return -1;        }      // Convert the header into host byte order so that we can access      // it directly without having to repeatedly muck with it...      event->header_.decode ();      if (event->header_.len_ > ACE_INT32 (sizeof event->data_))        {          // This data_ payload is too big!          errno = EINVAL;          ACE_DEBUG ((LM_DEBUG,                      "Data payload is too big (%d bytes)\n",                      event->header_.len_));          return -1;        }    }  // At this point there is a complete, valid header in Event.  Now we  // need to get the event payload.  Due to incomplete reads this may  // not be the first time we've read in a fragment for this message.  // We account for this here.  Note that the first time in here  // msg_frag_->wr_ptr() will point to event->data_.  Every time we do  // a successful fragment read, we advance wr_ptr().  Therefore, by  // subtracting how much we've already read from the  // event->header_.len_ we complete the data_bytes_left_to_read...

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -