⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 peer.cpp

📁 ACE自适配通信环境(ADAPTIVE Communication Environment)是可以自由使用、开放源码的面向对象(OO)框架(Framework)
💻 CPP
📖 第 1 页 / 共 2 页
字号:
// Peer.cpp,v 4.48 2004/09/11 11:42:24 jwillemsen Exp#define ACE_BUILD_SVC_DLL#include "ace/OS_NS_stdio.h"#include "ace/OS_NS_string.h"#include "ace/OS_NS_unistd.h"#include "ace/Signal.h"#include "Peer.h"ACE_RCSID(Peer, Peer, "Peer.cpp,v 4.48 2004/09/11 11:42:24 jwillemsen Exp")Peer_Handler::Peer_Handler (void)  : connection_id_ (-1),  // Maybe it's better than 0.    msg_frag_ (0),    total_bytes_ (0){  // Set the high water mark of the <ACE_Message_Queue>.  This is used  // to exert flow control.  this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ());  first_time_ = 1;  // It will be first time to open Peer_Handler.}// Upcall from the <ACE_Acceptor::handle_input> that turns control// over to our application-specific Gateway handler.intPeer_Handler::open (void *a){  ACE_DEBUG ((LM_DEBUG,              ACE_TEXT ("handle = %d\n"),              this->peer ().get_handle ()));  // Call down to the base class to activate and register this handler  // with an <ACE_Reactor>.  if (this->inherited::open (a) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("open")),                      -1);  if (this->peer ().enable (ACE_NONBLOCK) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("enable")),                      -1);  ACE_Time_Value timeout (Options::instance ()->timeout ());  // Schedule the time between disconnects.  This should really be a  // "tunable" parameter.  if (ACE_Reactor::instance ()->schedule_timer      (this, 0, timeout) == -1)    ACE_ERROR ((LM_ERROR,                ACE_TEXT ("%p\n"),                ACE_TEXT ("schedule_timer")));  // If there are events left in the queue, make sure we enable the  // <ACE_Reactor> appropriately to get them sent out.  if (this->msg_queue ()->is_empty () == 0      && ACE_Reactor::instance ()->schedule_wakeup          (this, ACE_Event_Handler::WRITE_MASK) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("schedule_wakeup")),                      -1);  // First action is to wait to be notified of our connection id.  this->do_action_ = &Peer_Handler::await_connection_id;  return 0;}intPeer_Handler::transmit (ACE_Message_Block *mb,                        size_t n,                        int event_type){  Event *event = (Event *) mb->rd_ptr ();  // Initialize the header.  new (&event->header_) Event_Header (n,                                      this->connection_id_,                                      event_type,                                      0);  // Convert all the fields into network byte order.  event->header_.encode ();  // Move the write pointer to the end of the event.  mb->wr_ptr (sizeof (Event_Header) + n);  if (this->put (mb) == -1)    {      if (errno == EWOULDBLOCK) // The queue has filled up!        ACE_ERROR ((LM_ERROR,                    ACE_TEXT ("%p\n"),                    ACE_TEXT ("gateway is flow controlled, so we're dropping events")));      else        ACE_ERROR ((LM_ERROR,                    ACE_TEXT ("%p\n"),                    ACE_TEXT ("transmission failure in transmit()"))); // Function name fixed.      // Caller is responsible for freeing a ACE_Message_Block      // if failures occur.      mb->release ();      return -1;    }  return 0;}// Read events from stdin and send them to the gatewayd.intPeer_Handler::transmit_stdin (void){  // If return value is -1, then first_time_ must be reset to 1.  int result = 0;  if (this->connection_id_ != -1)    {      ACE_Message_Block *mb;      ACE_NEW_RETURN (mb,                      ACE_Message_Block (sizeof (Event)),                      -1);      // Cast the message block payload into an <Event> pointer.      Event *event = (Event *) mb->rd_ptr ();      ssize_t n = ACE_OS::read (ACE_STDIN,                                event->data_,                                sizeof event->data_);      switch (n)        {        case 0:          ACE_DEBUG ((LM_DEBUG,                      ACE_TEXT ("stdin closing down\n")));          // Take stdin out of the ACE_Reactor so we stop trying to          // send events.          ACE_Reactor::instance ()->remove_handler            (ACE_STDIN,             ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK);          mb->release ();          result = 0; //          break;          /* NOTREACHED */        case -1:          mb->release ();          ACE_ERROR ((LM_ERROR,                      ACE_TEXT ("%p\n"),                      ACE_TEXT ("read")));          result = 0; //          break;          /* NOTREACHED */        default:          // Do not return directly, save the return value.          result = this->transmit (mb, n, ROUTING_EVENT);          break;          /* NOTREACHED */        }      // Do not return at here, but at exit of function.      /*return 0;*/    }  else  {  ACE_DEBUG ((LM_DEBUG,              ACE_TEXT ("Must transmit over an opened channel.\n")));    result = -1; // Save return value at here, return at exit of function.  }  // If transmit error, the stdin-thread will be cancelled, so should  // reset first_time_ to 1, which will register_stdin_handler again.  if (result == -1)    first_time_ = 1;  return result;}// Perform a non-blocking <put> of event MB.  If we are unable to send// the entire event the remainder is re-queue'd at the *front* of the// Message_Queue.intPeer_Handler::nonblk_put (ACE_Message_Block *mb){  // Try to send the event.  If we don't send it all (e.g., due to  // flow control), then re-queue the remainder at the head of the  // <ACE_Message_Queue> and ask the <ACE_Reactor> to inform us (via  // <handle_output>) when it is possible to try again.  ssize_t n = this->send (mb);  if (n == -1)    // -1 is returned only when things have really gone wrong (i.e.,    // not when flow control occurs).    return -1;  else if (errno == EWOULDBLOCK)    {      // We didn't manage to send everything, so requeue.      ACE_DEBUG ((LM_DEBUG,                  ACE_TEXT ("queueing activated on handle %d to connection id %d\n"),                 this->get_handle (),                  this->connection_id_));      // Re-queue in *front* of the list to preserve order.      if (this->msg_queue ()->enqueue_head          (mb,           (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)        ACE_ERROR_RETURN ((LM_ERROR,                           ACE_TEXT ("%p\n"),                           ACE_TEXT ("enqueue_head")),                          -1);      // Tell ACE_Reactor to call us back when we can send again.      if (ACE_Reactor::instance ()->schedule_wakeup          (this, ACE_Event_Handler::WRITE_MASK) == -1)        ACE_ERROR_RETURN ((LM_ERROR,                           ACE_TEXT ("%p\n"),                           ACE_TEXT ("schedule_wakeup")),                          -1);      return 0;    }  else    return n;}// Finish sending a event when flow control conditions abate.  This// method is automatically called by the ACE_Reactor.intPeer_Handler::handle_output (ACE_HANDLE){  ACE_Message_Block *mb = 0;  ACE_DEBUG ((LM_DEBUG,              ACE_TEXT ("in handle_output\n")));  if (this->msg_queue ()->dequeue_head      (mb,       (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)    {      switch (this->nonblk_put (mb))        {        case 0:           // Partial send.          ACE_ASSERT (errno == EWOULDBLOCK);          // Didn't write everything this time, come back later...          break;          /* NOTREACHED */        case -1:          // Caller is responsible for freeing a ACE_Message_Block if          // failures occur.          mb->release ();          ACE_ERROR ((LM_ERROR,                      ACE_TEXT ("%p\n"),                      ACE_TEXT ("transmission failure in handle_output")));          /* FALLTHROUGH */        default: // Sent the whole thing.          // If we succeed in writing the entire event (or we did not          // fail due to EWOULDBLOCK) then check if there are more          // events on the <ACE_Message_Queue>.  If there aren't, tell          // the <ACE_Reactor> not to notify us anymore (at least          // until there are new events queued up).          if (this->msg_queue ()->is_empty ())            {              ACE_DEBUG ((LM_DEBUG,                          ACE_TEXT ("queue now empty on handle %d to connection id %d\n"),                          this->get_handle (),                          this->connection_id_));              if (ACE_Reactor::instance ()->cancel_wakeup                  (this, ACE_Event_Handler::WRITE_MASK) == -1)                ACE_ERROR ((LM_ERROR,                            ACE_TEXT ("%p\n"),                            ACE_TEXT ("cancel_wakeup")));            }        }      return 0;    }  else    // If the list is empty there's a bug!    ACE_ERROR_RETURN ((LM_ERROR,                       ACE_TEXT ("%p\n"),                       ACE_TEXT ("dequeue_head")),                      0);}// Send an event to a peer (may block if necessary).intPeer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *){  if (this->msg_queue ()->is_empty ())    // Try to send the event *without* blocking!    return this->nonblk_put (mb);  else    // If we have queued up events due to flow control then just    // enqueue and return.    return this->msg_queue ()->enqueue_tail      (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);}// Send an Peer event to gatewayd.intPeer_Handler::send (ACE_Message_Block *mb){  size_t len = mb->length ();  ssize_t n = this->peer ().send (mb->rd_ptr (), len);  if (n <= 0)    return errno == EWOULDBLOCK ? 0 : n;  else if (n < (ssize_t) len)    {      // Re-adjust pointer to skip over the part we did send.      mb->rd_ptr (n);      this->total_bytes_ += n;    }  else // if (n == length).    {      // The whole event is sent, we can now safely deallocate the      // buffer.  Note that this should decrement a reference count...      this->total_bytes_ += n;      mb->release ();      errno = 0;    }  ACE_DEBUG ((LM_DEBUG,              ACE_TEXT ("sent %d bytes, total bytes sent = %d\n"),              n,              this->total_bytes_));  return n;}// Receive an Event from gatewayd.  Handles fragmentation.intPeer_Handler::recv (ACE_Message_Block *&mb){  if (this->msg_frag_ == 0)    // No existing fragment...    ACE_NEW_RETURN (this->msg_frag_,                    ACE_Message_Block (sizeof (Event)),                    -1);  Event *event = (Event *) this->msg_frag_->rd_ptr ();  ssize_t header_received = 0;  const size_t HEADER_SIZE = sizeof (Event_Header);  ssize_t header_bytes_left_to_read =    HEADER_SIZE - this->msg_frag_->length ();  if (header_bytes_left_to_read > 0)    {      header_received = this->peer ().recv        (this->msg_frag_->wr_ptr (),         header_bytes_left_to_read);      if (header_received == -1 /* error */          || header_received == 0  /* EOF */)        {          ACE_ERROR ((LM_ERROR,                      ACE_TEXT ("%p\n"),                      ACE_TEXT ("Recv error during header read")));          ACE_DEBUG ((LM_DEBUG,                      ACE_TEXT ("attempted to read %d bytes\n"),                      header_bytes_left_to_read));          this->msg_frag_ = this->msg_frag_->release ();          return header_received;        }      // Bump the write pointer by the amount read.      this->msg_frag_->wr_ptr (header_received);      // At this point we may or may not have the ENTIRE header.      if (this->msg_frag_->length () < HEADER_SIZE)        {          ACE_DEBUG ((LM_DEBUG,                      ACE_TEXT ("Partial header received: only %d bytes\n"),                      this->msg_frag_->length ()));          // Notify the caller that we didn't get an entire event.          errno = EWOULDBLOCK;          return -1;        }      // Convert the header into host byte order so that we can access      // it directly without having to repeatedly muck with it...      event->header_.decode ();      if (event->header_.len_ > ACE_INT32 (sizeof event->data_))        {          // This data_ payload is too big!          errno = EINVAL;          ACE_DEBUG ((LM_DEBUG,                      ACE_TEXT ("Data payload is too big (%d bytes)\n"),                      event->header_.len_));          return -1;        }    }  // At this point there is a complete, valid header in Event.  Now we  // need to get the event payload.  Due to incomplete reads this may  // not be the first time we've read in a fragment for this message.  // We account for this here.  Note that the first time in here  // <msg_frag_->wr_ptr> will point to <event->data_>.  Every time we  // do a successful fragment read, we advance <wr_ptr>.  Therefore,  // by subtracting how much we've already read from the  // <event->header_.len_> we complete the  // <data_bytes_left_to_read>...  ssize_t data_bytes_left_to_read =    ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));  // peer().recv() should not be called when data_bytes_left_to_read is 0.  ssize_t data_received = !data_bytes_left_to_read ? 0 :    this->peer ().recv (this->msg_frag_->wr_ptr (),                        data_bytes_left_to_read);  // Try to receive the remainder of the event.  switch (data_received)    {    case -1:      if (errno == EWOULDBLOCK)        // This might happen if only the header came through.        return -1;      else        /* FALLTHROUGH */;    case 0: // Premature EOF.      if (data_bytes_left_to_read)      {      this->msg_frag_ = this->msg_frag_->release ();      return 0;      }      /* FALLTHROUGH */;    default:      // Set the write pointer at 1 past the end of the event.      this->msg_frag_->wr_ptr (data_received);      if (data_received != data_bytes_left_to_read)        {          errno = EWOULDBLOCK;          // Inform caller that we didn't get the whole event.          return -1;        }      else        {          // Set the read pointer to the beginning of the event.          this->msg_frag_->rd_ptr (this->msg_frag_->base ());          mb = this->msg_frag_;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -