📄 peer.cpp
字号:
// Peer.cpp,v 4.46 2003/12/30 23:18:58 shuston Exp
#define ACE_BUILD_SVC_DLL
#include "ace/OS_NS_stdio.h"
#include "ace/OS_NS_string.h"
#include "ace/Signal.h"
#include "Peer.h"
ACE_RCSID(Peer, Peer, "Peer.cpp,v 4.46 2003/12/30 23:18:58 shuston Exp")
Peer_Handler::Peer_Handler (void)
: connection_id_ (-1), // Maybe it's better than 0.
msg_frag_ (0),
total_bytes_ (0)
{
// Set the high water mark of the <ACE_Message_Queue>. This is used
// to exert flow control.
this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ());
first_time_ = 1; // It will be first time to open Peer_Handler.
}
// Upcall from the <ACE_Acceptor::handle_input> that turns control
// over to our application-specific Gateway handler.
int
Peer_Handler::open (void *a)
{
ACE_DEBUG ((LM_DEBUG,
"handle = %d\n",
this->peer ().get_handle ()));
// Call down to the base class to activate and register this handler
// with an <ACE_Reactor>.
if (this->inherited::open (a) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"open"),
-1);
if (this->peer ().enable (ACE_NONBLOCK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"enable"),
-1);
ACE_Time_Value timeout (Options::instance ()->timeout ());
// Schedule the time between disconnects. This should really be a
// "tunable" parameter.
if (ACE_Reactor::instance ()->schedule_timer
(this, 0, timeout) == -1)
ACE_ERROR ((LM_ERROR,
"%p\n",
"schedule_timer"));
// If there are events left in the queue, make sure we enable the
// <ACE_Reactor> appropriately to get them sent out.
if (this->msg_queue ()->is_empty () == 0
&& ACE_Reactor::instance ()->schedule_wakeup
(this, ACE_Event_Handler::WRITE_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"schedule_wakeup"),
-1);
// First action is to wait to be notified of our connection id.
this->do_action_ = &Peer_Handler::await_connection_id;
return 0;
}
int
Peer_Handler::transmit (ACE_Message_Block *mb,
size_t n,
int event_type)
{
Event *event = (Event *) mb->rd_ptr ();
// Initialize the header.
new (&event->header_) Event_Header (n,
this->connection_id_,
event_type,
0);
// Convert all the fields into network byte order.
event->header_.encode ();
// Move the write pointer to the end of the event.
mb->wr_ptr (sizeof (Event_Header) + n);
if (this->put (mb) == -1)
{
if (errno == EWOULDBLOCK) // The queue has filled up!
ACE_ERROR ((LM_ERROR,
"%p\n",
"gateway is flow controlled, so we're dropping events"));
else
ACE_ERROR ((LM_ERROR,
"%p\n",
"transmission failure in transmit()")); // Function name fixed.
// Caller is responsible for freeing a ACE_Message_Block
// if failures occur.
mb->release ();
return -1;
}
return 0;
}
// Read events from stdin and send them to the gatewayd.
int
Peer_Handler::transmit_stdin (void)
{
// If return value is -1, then first_time_ must be reset to 1.
int result = 0;
if (this->connection_id_ != -1)
{
ACE_Message_Block *mb;
ACE_NEW_RETURN (mb,
ACE_Message_Block (sizeof (Event)),
-1);
// Cast the message block payload into an <Event> pointer.
Event *event = (Event *) mb->rd_ptr ();
ssize_t n = ACE_OS::read (ACE_STDIN,
event->data_,
sizeof event->data_);
switch (n)
{
case 0:
ACE_DEBUG ((LM_DEBUG,
"stdin closing down\n"));
// Take stdin out of the ACE_Reactor so we stop trying to
// send events.
ACE_Reactor::instance ()->remove_handler
(ACE_STDIN,
ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK);
mb->release ();
result = 0; //
break;
/* NOTREACHED */
case -1:
mb->release ();
ACE_ERROR ((LM_ERROR,
"%p\n",
"read"));
result = 0; //
break;
/* NOTREACHED */
default:
// Do not return directly, save the return value.
result = this->transmit (mb, n, ROUTING_EVENT);
break;
/* NOTREACHED */
}
// Do not return at here, but at exit of function.
/*return 0;*/
}
else
{
ACE_DEBUG ((LM_DEBUG,
"Must transmit over an opened channel.\n"));
result = -1; // Save return value at here, return at exit of function.
}
// If transmit error, the stdin-thread will be cancelled, so should
// reset first_time_ to 1, which will register_stdin_handler again.
if (result == -1)
first_time_ = 1;
return result;
}
// Perform a non-blocking <put> of event MB. If we are unable to send
// the entire event the remainder is re-queue'd at the *front* of the
// Message_Queue.
int
Peer_Handler::nonblk_put (ACE_Message_Block *mb)
{
// Try to send the event. If we don't send it all (e.g., due to
// flow control), then re-queue the remainder at the head of the
// <ACE_Message_Queue> and ask the <ACE_Reactor> to inform us (via
// <handle_output>) when it is possible to try again.
ssize_t n = this->send (mb);
if (n == -1)
// -1 is returned only when things have really gone wrong (i.e.,
// not when flow control occurs).
return -1;
else if (errno == EWOULDBLOCK)
{
// We didn't manage to send everything, so requeue.
ACE_DEBUG ((LM_DEBUG,
"queueing activated on handle %d to connection id %d\n",
this->get_handle (),
this->connection_id_));
// Re-queue in *front* of the list to preserve order.
if (this->msg_queue ()->enqueue_head
(mb,
(ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"enqueue_head"),
-1);
// Tell ACE_Reactor to call us back when we can send again.
if (ACE_Reactor::instance ()->schedule_wakeup
(this, ACE_Event_Handler::WRITE_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"schedule_wakeup"),
-1);
return 0;
}
else
return n;
}
// Finish sending a event when flow control conditions abate. This
// method is automatically called by the ACE_Reactor.
int
Peer_Handler::handle_output (ACE_HANDLE)
{
ACE_Message_Block *mb = 0;
ACE_DEBUG ((LM_DEBUG,
"in handle_output\n"));
if (this->msg_queue ()->dequeue_head
(mb,
(ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
{
switch (this->nonblk_put (mb))
{
case 0: // Partial send.
ACE_ASSERT (errno == EWOULDBLOCK);
// Didn't write everything this time, come back later...
break;
/* NOTREACHED */
case -1:
// Caller is responsible for freeing a ACE_Message_Block if
// failures occur.
mb->release ();
ACE_ERROR ((LM_ERROR,
"%p\n",
"transmission failure in handle_output"));
/* FALLTHROUGH */
default: // Sent the whole thing.
// If we succeed in writing the entire event (or we did not
// fail due to EWOULDBLOCK) then check if there are more
// events on the <ACE_Message_Queue>. If there aren't, tell
// the <ACE_Reactor> not to notify us anymore (at least
// until there are new events queued up).
if (this->msg_queue ()->is_empty ())
{
ACE_DEBUG ((LM_DEBUG,
"queue now empty on handle %d to connection id %d\n",
this->get_handle (),
this->connection_id_));
if (ACE_Reactor::instance ()->cancel_wakeup
(this, ACE_Event_Handler::WRITE_MASK) == -1)
ACE_ERROR ((LM_ERROR,
"%p\n",
"cancel_wakeup"));
}
}
return 0;
}
else
// If the list is empty there's a bug!
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"dequeue_head"),
0);
}
// Send an event to a peer (may block if necessary).
int
Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
{
if (this->msg_queue ()->is_empty ())
// Try to send the event *without* blocking!
return this->nonblk_put (mb);
else
// If we have queued up events due to flow control then just
// enqueue and return.
return this->msg_queue ()->enqueue_tail
(mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
}
// Send an Peer event to gatewayd.
int
Peer_Handler::send (ACE_Message_Block *mb)
{
size_t len = mb->length ();
ssize_t n = this->peer ().send (mb->rd_ptr (), len);
if (n <= 0)
return errno == EWOULDBLOCK ? 0 : n;
else if (n < (ssize_t) len)
{
// Re-adjust pointer to skip over the part we did send.
mb->rd_ptr (n);
this->total_bytes_ += n;
}
else // if (n == length).
{
// The whole event is sent, we can now safely deallocate the
// buffer. Note that this should decrement a reference count...
this->total_bytes_ += n;
mb->release ();
errno = 0;
}
ACE_DEBUG ((LM_DEBUG,
"sent %d bytes, total bytes sent = %d\n",
n,
this->total_bytes_));
return n;
}
// Receive an Event from gatewayd. Handles fragmentation.
int
Peer_Handler::recv (ACE_Message_Block *&mb)
{
if (this->msg_frag_ == 0)
// No existing fragment...
ACE_NEW_RETURN (this->msg_frag_,
ACE_Message_Block (sizeof (Event)),
-1);
Event *event = (Event *) this->msg_frag_->rd_ptr ();
ssize_t header_received = 0;
const size_t HEADER_SIZE = sizeof (Event_Header);
ssize_t header_bytes_left_to_read =
HEADER_SIZE - this->msg_frag_->length ();
if (header_bytes_left_to_read > 0)
{
header_received = this->peer ().recv
(this->msg_frag_->wr_ptr (),
header_bytes_left_to_read);
if (header_received == -1 /* error */
|| header_received == 0 /* EOF */)
{
ACE_ERROR ((LM_ERROR,
"%p\n",
"Recv error during header read"));
ACE_DEBUG ((LM_DEBUG,
"attempted to read %d bytes\n",
header_bytes_left_to_read));
this->msg_frag_ = this->msg_frag_->release ();
return header_received;
}
// Bump the write pointer by the amount read.
this->msg_frag_->wr_ptr (header_received);
// At this point we may or may not have the ENTIRE header.
if (this->msg_frag_->length () < HEADER_SIZE)
{
ACE_DEBUG ((LM_DEBUG,
"Partial header received: only %d bytes\n",
this->msg_frag_->length ()));
// Notify the caller that we didn't get an entire event.
errno = EWOULDBLOCK;
return -1;
}
// Convert the header into host byte order so that we can access
// it directly without having to repeatedly muck with it...
event->header_.decode ();
if (event->header_.len_ > ACE_INT32 (sizeof event->data_))
{
// This data_ payload is too big!
errno = EINVAL;
ACE_DEBUG ((LM_DEBUG,
"Data payload is too big (%d bytes)\n",
event->header_.len_));
return -1;
}
}
// At this point there is a complete, valid header in Event. Now we
// need to get the event payload. Due to incomplete reads this may
// not be the first time we've read in a fragment for this message.
// We account for this here. Note that the first time in here
// <msg_frag_->wr_ptr> will point to <event->data_>. Every time we
// do a successful fragment read, we advance <wr_ptr>. Therefore,
// by subtracting how much we've already read from the
// <event->header_.len_> we complete the
// <data_bytes_left_to_read>...
ssize_t data_bytes_left_to_read =
ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));
// peer().recv() should not be called when data_bytes_left_to_read is 0.
ssize_t data_received = !data_bytes_left_to_read ? 0 :
this->peer ().recv (this->msg_frag_->wr_ptr (),
data_bytes_left_to_read);
// Try to receive the remainder of the event.
switch (data_received)
{
case -1:
if (errno == EWOULDBLOCK)
// This might happen if only the header came through.
return -1;
else
/* FALLTHROUGH */;
case 0: // Premature EOF.
if (data_bytes_left_to_read)
{
this->msg_frag_ = this->msg_frag_->release ();
return 0;
}
/* FALLTHROUGH */;
default:
// Set the write pointer at 1 past the end of the event.
this->msg_frag_->wr_ptr (data_received);
if (data_received != data_bytes_left_to_read)
{
errno = EWOULDBLOCK;
// Inform caller that we didn't get the whole event.
return -1;
}
else
{
// Set the read pointer to the beginning of the event.
this->msg_frag_->rd_ptr (this->msg_frag_->base ());
mb = this->msg_frag_;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -