📄 peer.cpp
字号:
// Peer.cpp,v 4.48 2004/09/11 11:42:24 jwillemsen Exp#define ACE_BUILD_SVC_DLL#include "ace/OS_NS_stdio.h"#include "ace/OS_NS_string.h"#include "ace/OS_NS_unistd.h"#include "ace/Signal.h"#include "Peer.h"ACE_RCSID(Peer, Peer, "Peer.cpp,v 4.48 2004/09/11 11:42:24 jwillemsen Exp")Peer_Handler::Peer_Handler (void) : connection_id_ (-1), // Maybe it's better than 0. msg_frag_ (0), total_bytes_ (0){ // Set the high water mark of the <ACE_Message_Queue>. This is used // to exert flow control. this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ()); first_time_ = 1; // It will be first time to open Peer_Handler.}// Upcall from the <ACE_Acceptor::handle_input> that turns control// over to our application-specific Gateway handler.intPeer_Handler::open (void *a){ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("handle = %d\n"), this->peer ().get_handle ())); // Call down to the base class to activate and register this handler // with an <ACE_Reactor>. if (this->inherited::open (a) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("open")), -1); if (this->peer ().enable (ACE_NONBLOCK) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("enable")), -1); ACE_Time_Value timeout (Options::instance ()->timeout ()); // Schedule the time between disconnects. This should really be a // "tunable" parameter. if (ACE_Reactor::instance ()->schedule_timer (this, 0, timeout) == -1) ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("schedule_timer"))); // If there are events left in the queue, make sure we enable the // <ACE_Reactor> appropriately to get them sent out. if (this->msg_queue ()->is_empty () == 0 && ACE_Reactor::instance ()->schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("schedule_wakeup")), -1); // First action is to wait to be notified of our connection id. this->do_action_ = &Peer_Handler::await_connection_id; return 0;}intPeer_Handler::transmit (ACE_Message_Block *mb, size_t n, int event_type){ Event *event = (Event *) mb->rd_ptr (); // Initialize the header. new (&event->header_) Event_Header (n, this->connection_id_, event_type, 0); // Convert all the fields into network byte order. event->header_.encode (); // Move the write pointer to the end of the event. mb->wr_ptr (sizeof (Event_Header) + n); if (this->put (mb) == -1) { if (errno == EWOULDBLOCK) // The queue has filled up! ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("gateway is flow controlled, so we're dropping events"))); else ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("transmission failure in transmit()"))); // Function name fixed. // Caller is responsible for freeing a ACE_Message_Block // if failures occur. mb->release (); return -1; } return 0;}// Read events from stdin and send them to the gatewayd.intPeer_Handler::transmit_stdin (void){ // If return value is -1, then first_time_ must be reset to 1. int result = 0; if (this->connection_id_ != -1) { ACE_Message_Block *mb; ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof (Event)), -1); // Cast the message block payload into an <Event> pointer. Event *event = (Event *) mb->rd_ptr (); ssize_t n = ACE_OS::read (ACE_STDIN, event->data_, sizeof event->data_); switch (n) { case 0: ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("stdin closing down\n"))); // Take stdin out of the ACE_Reactor so we stop trying to // send events. ACE_Reactor::instance ()->remove_handler (ACE_STDIN, ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK); mb->release (); result = 0; // break; /* NOTREACHED */ case -1: mb->release (); ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("read"))); result = 0; // break; /* NOTREACHED */ default: // Do not return directly, save the return value. result = this->transmit (mb, n, ROUTING_EVENT); break; /* NOTREACHED */ } // Do not return at here, but at exit of function. /*return 0;*/ } else { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Must transmit over an opened channel.\n"))); result = -1; // Save return value at here, return at exit of function. } // If transmit error, the stdin-thread will be cancelled, so should // reset first_time_ to 1, which will register_stdin_handler again. if (result == -1) first_time_ = 1; return result;}// Perform a non-blocking <put> of event MB. If we are unable to send// the entire event the remainder is re-queue'd at the *front* of the// Message_Queue.intPeer_Handler::nonblk_put (ACE_Message_Block *mb){ // Try to send the event. If we don't send it all (e.g., due to // flow control), then re-queue the remainder at the head of the // <ACE_Message_Queue> and ask the <ACE_Reactor> to inform us (via // <handle_output>) when it is possible to try again. ssize_t n = this->send (mb); if (n == -1) // -1 is returned only when things have really gone wrong (i.e., // not when flow control occurs). return -1; else if (errno == EWOULDBLOCK) { // We didn't manage to send everything, so requeue. ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("queueing activated on handle %d to connection id %d\n"), this->get_handle (), this->connection_id_)); // Re-queue in *front* of the list to preserve order. if (this->msg_queue ()->enqueue_head (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("enqueue_head")), -1); // Tell ACE_Reactor to call us back when we can send again. if (ACE_Reactor::instance ()->schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("schedule_wakeup")), -1); return 0; } else return n;}// Finish sending a event when flow control conditions abate. This// method is automatically called by the ACE_Reactor.intPeer_Handler::handle_output (ACE_HANDLE){ ACE_Message_Block *mb = 0; ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("in handle_output\n"))); if (this->msg_queue ()->dequeue_head (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) { switch (this->nonblk_put (mb)) { case 0: // Partial send. ACE_ASSERT (errno == EWOULDBLOCK); // Didn't write everything this time, come back later... break; /* NOTREACHED */ case -1: // Caller is responsible for freeing a ACE_Message_Block if // failures occur. mb->release (); ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("transmission failure in handle_output"))); /* FALLTHROUGH */ default: // Sent the whole thing. // If we succeed in writing the entire event (or we did not // fail due to EWOULDBLOCK) then check if there are more // events on the <ACE_Message_Queue>. If there aren't, tell // the <ACE_Reactor> not to notify us anymore (at least // until there are new events queued up). if (this->msg_queue ()->is_empty ()) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("queue now empty on handle %d to connection id %d\n"), this->get_handle (), this->connection_id_)); if (ACE_Reactor::instance ()->cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("cancel_wakeup"))); } } return 0; } else // If the list is empty there's a bug! ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("dequeue_head")), 0);}// Send an event to a peer (may block if necessary).intPeer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *){ if (this->msg_queue ()->is_empty ()) // Try to send the event *without* blocking! return this->nonblk_put (mb); else // If we have queued up events due to flow control then just // enqueue and return. return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);}// Send an Peer event to gatewayd.intPeer_Handler::send (ACE_Message_Block *mb){ size_t len = mb->length (); ssize_t n = this->peer ().send (mb->rd_ptr (), len); if (n <= 0) return errno == EWOULDBLOCK ? 0 : n; else if (n < (ssize_t) len) { // Re-adjust pointer to skip over the part we did send. mb->rd_ptr (n); this->total_bytes_ += n; } else // if (n == length). { // The whole event is sent, we can now safely deallocate the // buffer. Note that this should decrement a reference count... this->total_bytes_ += n; mb->release (); errno = 0; } ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("sent %d bytes, total bytes sent = %d\n"), n, this->total_bytes_)); return n;}// Receive an Event from gatewayd. Handles fragmentation.intPeer_Handler::recv (ACE_Message_Block *&mb){ if (this->msg_frag_ == 0) // No existing fragment... ACE_NEW_RETURN (this->msg_frag_, ACE_Message_Block (sizeof (Event)), -1); Event *event = (Event *) this->msg_frag_->rd_ptr (); ssize_t header_received = 0; const size_t HEADER_SIZE = sizeof (Event_Header); ssize_t header_bytes_left_to_read = HEADER_SIZE - this->msg_frag_->length (); if (header_bytes_left_to_read > 0) { header_received = this->peer ().recv (this->msg_frag_->wr_ptr (), header_bytes_left_to_read); if (header_received == -1 /* error */ || header_received == 0 /* EOF */) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("Recv error during header read"))); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("attempted to read %d bytes\n"), header_bytes_left_to_read)); this->msg_frag_ = this->msg_frag_->release (); return header_received; } // Bump the write pointer by the amount read. this->msg_frag_->wr_ptr (header_received); // At this point we may or may not have the ENTIRE header. if (this->msg_frag_->length () < HEADER_SIZE) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Partial header received: only %d bytes\n"), this->msg_frag_->length ())); // Notify the caller that we didn't get an entire event. errno = EWOULDBLOCK; return -1; } // Convert the header into host byte order so that we can access // it directly without having to repeatedly muck with it... event->header_.decode (); if (event->header_.len_ > ACE_INT32 (sizeof event->data_)) { // This data_ payload is too big! errno = EINVAL; ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Data payload is too big (%d bytes)\n"), event->header_.len_)); return -1; } } // At this point there is a complete, valid header in Event. Now we // need to get the event payload. Due to incomplete reads this may // not be the first time we've read in a fragment for this message. // We account for this here. Note that the first time in here // <msg_frag_->wr_ptr> will point to <event->data_>. Every time we // do a successful fragment read, we advance <wr_ptr>. Therefore, // by subtracting how much we've already read from the // <event->header_.len_> we complete the // <data_bytes_left_to_read>... ssize_t data_bytes_left_to_read = ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_)); // peer().recv() should not be called when data_bytes_left_to_read is 0. ssize_t data_received = !data_bytes_left_to_read ? 0 : this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read); // Try to receive the remainder of the event. switch (data_received) { case -1: if (errno == EWOULDBLOCK) // This might happen if only the header came through. return -1; else /* FALLTHROUGH */; case 0: // Premature EOF. if (data_bytes_left_to_read) { this->msg_frag_ = this->msg_frag_->release (); return 0; } /* FALLTHROUGH */; default: // Set the write pointer at 1 past the end of the event. this->msg_frag_->wr_ptr (data_received); if (data_received != data_bytes_left_to_read) { errno = EWOULDBLOCK; // Inform caller that we didn't get the whole event. return -1; } else { // Set the read pointer to the beginning of the event. this->msg_frag_->rd_ptr (this->msg_frag_->base ()); mb = this->msg_frag_;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -