📄 concrete_connection_handlers.cpp
字号:
// Concrete_Connection_Handlers.cpp,v 4.17 2002/03/04 20:36:21 schmidt Exp
#define ACE_BUILD_SVC_DLL
#include "Event_Channel.h"
#include "Concrete_Connection_Handlers.h"
ACE_RCSID(Gateway, Concrete_Connection_Handlers, "Concrete_Connection_Handlers.cpp,v 4.17 2002/03/04 20:36:21 schmidt Exp")
Consumer_Handler::Consumer_Handler (const Connection_Config_Info &pci)
: Connection_Handler (pci)
{
this->connection_role_ = 'C';
this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ());
}
// This method should be called only when the Consumer shuts down
// unexpectedly. This method simply marks the Connection_Handler as
// having failed so that handle_close () can reconnect.
// Do not close handler when received data successfully.
// Consumer_Handler should could process received data.
// For example, Consumer could send reply-event to Supplier.
int
Consumer_Handler::handle_input (ACE_HANDLE)
{
// Do not set FAILED state at here, just at real failed place.
char buf[BUFSIZ];
ssize_t received = this->peer ().recv (buf, sizeof buf);
switch (received)
{
case -1:
this->state (Connection_Handler::FAILED);
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) Peer has failed unexpectedly for Consumer_Handler %d\n",
this->connection_id ()),
-1);
/* NOTREACHED */
case 0:
this->state (Connection_Handler::FAILED);
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) Peer has shutdown unexpectedly for Consumer_Handler %d\n",
this->connection_id ()),
-1);
/* NOTREACHED */
default:
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) IGNORED: Consumer is erroneously sending input to Consumer_Handler %d\n"
"data size = %d\n",
this->connection_id (),
received),
0); // Return 0 to identify received data successfully.
/* NOTREACHED */
}
}
// Perform a non-blocking put() of event. If we are unable to send
// the entire event the remainder is re-queued at the *front* of the
// Event_List.
int
Consumer_Handler::nonblk_put (ACE_Message_Block *event)
{
// Try to send the event. If we don't send it all (e.g., due to
// flow control), then re-queue the remainder at the head of the
// Event_List and ask the ACE_Reactor to inform us (via
// handle_output()) when it is possible to try again.
ssize_t n = this->send (event);
if (n == -1)
{
// -1 is returned only when things have really gone wrong (i.e.,
// not when flow control occurs). Thus, let's try to close down
// and set up a new reconnection by calling handle_close().
this->state (Connection_Handler::FAILED);
this->handle_close ();
return -1;
}
else if (errno == EWOULDBLOCK)
{
// We didn't manage to send everything, so we need to queue
// things up.
ACE_DEBUG ((LM_DEBUG,
"(%t) queueing activated on handle %d to routing id %d\n",
this->get_handle (),
this->connection_id ()));
// ACE_Queue in *front* of the list to preserve order.
if (this->msg_queue ()->enqueue_head
(event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) %p\n",
"enqueue_head"),
-1);
// Tell ACE_Reactor to call us back when we can send again.
else if (ACE_Reactor::instance ()->schedule_wakeup
(this, ACE_Event_Handler::WRITE_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) %p\n",
"schedule_wakeup"),
-1);
return 0;
}
else
return n;
}
ssize_t
Consumer_Handler::send (ACE_Message_Block *event)
{
ACE_DEBUG ((LM_DEBUG,
"(%t) sending %d bytes to Consumer %d\n",
event->length (),
this->connection_id ()));
ssize_t len = event->length ();
ssize_t n = this->peer ().send (event->rd_ptr (), len);
if (n <= 0)
return errno == EWOULDBLOCK ? 0 : n;
else if (n < len)
{
// Re-adjust pointer to skip over the part we did send.
event->rd_ptr (n);
errno = EWOULDBLOCK;
}
else // if (n == length)
{
// The whole event is sent, we now decrement the reference count
// (which deletes itself with it reaches 0).
event->release ();
errno = 0;
}
this->total_bytes (n);
return n;
}
// Finish sending an event when flow control conditions abate.
// This method is automatically called by the ACE_Reactor.
int
Consumer_Handler::handle_output (ACE_HANDLE)
{
ACE_Message_Block *event = 0;
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT("(%t) Receiver signalled 'resume transmission' %d\n"),
this->get_handle ()));
// WIN32 Notes: When the receiver blocked, we started adding to the
// consumer handler's message Q. At this time, we registered a
// callback with the reactor to tell us when the TCP layer signalled
// that we could continue to send messages to the consumer. However,
// Winsock only sends this notification ONCE, so we have to assume
// at the application level, that we can continue to send until we
// get any subsequent blocking signals from the receiver's buffer.
#if defined (ACE_WIN32)
// Win32 Winsock doesn't trigger multiple "You can write now"
// signals, so we have to assume that we can continue to write until
// we get another EWOULDBLOCK.
// We cancel the wakeup callback we set earlier.
if (ACE_Reactor::instance ()->cancel_wakeup
(this, ACE_Event_Handler::WRITE_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("Error in ACE_Reactor::cancel_wakeup()")),
-1);
// The list had better not be empty, otherwise there's a bug!
while (this->msg_queue ()->dequeue_head
(event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
{
switch (this->nonblk_put (event))
{
case -1: // Error sending message to consumer.
{
// We are responsible for releasing an ACE_Message_Block if
// failures occur.
event->release ();
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("transmission failure")));
break;
}
case 0: // Partial Send - we got flow controlled by the receiver
{
ACE_ASSERT (errno == EWOULDBLOCK);
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%D Partial Send due to flow control")
ACE_TEXT ("- scheduling new wakeup with reactor\n")));
// Re-schedule a wakeup call from the reactor when the
// flow control conditions abate.
if (ACE_Reactor::instance ()->schedule_wakeup
(this,
ACE_Event_Handler::WRITE_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("Error in ACE_Reactor::schedule_wakeup()")),
-1);
// Didn't write everything this time, come back later...
return 0;
}
default: // Sent the whole thing
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Sent message from message Q, Q size = %d\n"),
this->msg_queue()->message_count ()));
break;
}
}
}
// If we drop out of the while loop, then the message Q should be
// empty...or there's a problem in the dequeue_head() call...but
// thats another story.
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%D Sent all messages from consumers message Q\n")));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"),
this->get_handle (),
this->connection_id ()));
#else /* !defined (ACE_WIN32) */
// The list had better not be empty, otherwise there's a bug!
if (this->msg_queue ()->dequeue_head
(event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
{
switch (this->nonblk_put (event))
{
case 0: // Partial send.
ACE_ASSERT (errno == EWOULDBLOCK);
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%D Partial Send\n")));
// Didn't write everything this time, come back later...
break;
case -1:
// We are responsible for releasing an ACE_Message_Block if
// failures occur.
event->release ();
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("transmission failure")));
/* FALLTHROUGH */
default: // Sent the whole thing.
// If we succeed in writing the entire event (or we did not
// fail due to EWOULDBLOCK) then check if there are more
// events on the Message_Queue. If there aren't, tell the
// ACE_Reactor not to notify us anymore (at least until
// there are new events queued up).
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("QQQ::Sent Message from consumer's Q\n")));
if (this->msg_queue ()->is_empty ())
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"),
this->get_handle (),
this->connection_id ()));
if (ACE_Reactor::instance ()->cancel_wakeup
(this, ACE_Event_Handler::WRITE_MASK) == -1)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("cancel_wakeup")));
}
}
}
else
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("dequeue_head - handle_output called by reactor but nothing in Q")));
#endif /* ACE_WIN32 */
return 0;
}
// Send an event to a Consumer (may queue if necessary).
int
Consumer_Handler::put (ACE_Message_Block *event,
ACE_Time_Value *)
{
if (this->msg_queue ()->is_empty ())
// Try to send the event *without* blocking!
return this->nonblk_put (event);
else
// If we have queued up events due to flow control then just
// enqueue and return.
return this->msg_queue ()->enqueue_tail
(event, (ACE_Time_Value *) &ACE_Time_Value::zero);
}
Supplier_Handler::Supplier_Handler (const Connection_Config_Info &pci)
: Connection_Handler (pci),
msg_frag_ (0)
{
this->connection_role_ = 'S';
this->msg_queue ()->high_water_mark (0);
}
// Receive an Event from a Supplier. Handles fragmentation.
//
// The event returned from recv consists of two parts:
//
// 1. The Address part, contains the "virtual" routing id.
//
// 2. The Data part, which contains the actual data to be forwarded.
//
// The reason for having two parts is to shield the higher layers
// of software from knowledge of the event structure.
int
Supplier_Handler::recv (ACE_Message_Block *&forward_addr)
{
if (this->msg_frag_ == 0)
// No existing fragment...
ACE_NEW_RETURN (this->msg_frag_,
ACE_Message_Block (sizeof (Event),
ACE_Message_Block::MB_DATA,
0,
0,
0,
Options::instance ()->locking_strategy ()),
-1);
Event *event = (Event *) this->msg_frag_->rd_ptr ();
ssize_t header_received = 0;
const size_t HEADER_SIZE = sizeof (Event_Header);
ssize_t header_bytes_left_to_read =
HEADER_SIZE - this->msg_frag_->length ();
if (header_bytes_left_to_read > 0)
{
header_received = this->peer ().recv
(this->msg_frag_->wr_ptr (), header_bytes_left_to_read);
if (header_received == -1 /* error */
|| header_received == 0 /* EOF */)
{
ACE_ERROR ((LM_ERROR, "%p\n",
"Recv error during header read "));
ACE_DEBUG ((LM_DEBUG,
"attempted to read %d\n",
header_bytes_left_to_read));
this->msg_frag_ = this->msg_frag_->release ();
return header_received;
}
// Bump the write pointer by the amount read.
this->msg_frag_->wr_ptr (header_received);
// At this point we may or may not have the ENTIRE header.
if (this->msg_frag_->length () < HEADER_SIZE)
{
ACE_DEBUG ((LM_DEBUG,
"Partial header received: only %d bytes\n",
this->msg_frag_->length ()));
// Notify the caller that we didn't get an entire event.
errno = EWOULDBLOCK;
return -1;
}
// Convert the header into host byte order so that we can access
// it directly without having to repeatedly muck with it...
event->header_.decode ();
if (event->header_.len_ > ACE_INT32 (sizeof event->data_))
{
// This data_ payload is too big!
errno = EINVAL;
ACE_DEBUG ((LM_DEBUG,
"Data payload is too big (%d bytes)\n",
event->header_.len_));
return -1;
}
}
// At this point there is a complete, valid header in Event. Now we
// need to get the event payload. Due to incomplete reads this may
// not be the first time we've read in a fragment for this message.
// We account for this here. Note that the first time in here
// msg_frag_->wr_ptr() will point to event->data_. Every time we do
// a successful fragment read, we advance wr_ptr(). Therefore, by
// subtracting how much we've already read from the
// event->header_.len_ we complete the data_bytes_left_to_read...
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -