📄 concrete_connection_handlers.cpp
字号:
// Concrete_Connection_Handlers.cpp,v 4.19 2005/05/20 15:10:27 schmidt Exp#define ACE_BUILD_SVC_DLL#include "ace/OS_NS_unistd.h"#include "Event_Channel.h"#include "Concrete_Connection_Handlers.h"ACE_RCSID(Gateway, Concrete_Connection_Handlers, "Concrete_Connection_Handlers.cpp,v 4.19 2005/05/20 15:10:27 schmidt Exp")Consumer_Handler::Consumer_Handler (const Connection_Config_Info &pci) : Connection_Handler (pci){ this->connection_role_ = 'C'; this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ());}// This method should be called only when the Consumer shuts down// unexpectedly. This method simply marks the Connection_Handler as// having failed so that handle_close () can reconnect.// Do not close handler when received data successfully.// Consumer_Handler should could process received data.// For example, Consumer could send reply-event to Supplier.intConsumer_Handler::handle_input (ACE_HANDLE){ // Do not set FAILED state at here, just at real failed place. char buf[BUFSIZ]; ssize_t received = this->peer ().recv (buf, sizeof buf); switch (received) { case -1: this->state (Connection_Handler::FAILED); ACE_ERROR_RETURN ((LM_ERROR, "(%t) Peer has failed unexpectedly for Consumer_Handler %d\n", this->connection_id ()), -1); /* NOTREACHED */ case 0: this->state (Connection_Handler::FAILED); ACE_ERROR_RETURN ((LM_ERROR, "(%t) Peer has shutdown unexpectedly for Consumer_Handler %d\n", this->connection_id ()), -1); /* NOTREACHED */ default: ACE_ERROR_RETURN ((LM_ERROR, "(%t) IGNORED: Consumer is erroneously sending input to Consumer_Handler %d\n" "data size = %d\n", this->connection_id (), received), 0); // Return 0 to identify received data successfully. /* NOTREACHED */ }}// Perform a non-blocking put() of event. If we are unable to send// the entire event the remainder is re-queued at the *front* of the// Event_List.intConsumer_Handler::nonblk_put (ACE_Message_Block *event){ // Try to send the event. If we don't send it all (e.g., due to // flow control), then re-queue the remainder at the head of the // Event_List and ask the ACE_Reactor to inform us (via // handle_output()) when it is possible to try again. ssize_t n = this->send (event); if (n == -1) { // -1 is returned only when things have really gone wrong (i.e., // not when flow control occurs). Thus, let's try to close down // and set up a new reconnection by calling handle_close(). this->state (Connection_Handler::FAILED); this->handle_close (); return -1; } else if (errno == EWOULDBLOCK) { // We didn't manage to send everything, so we need to queue // things up. ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n", this->get_handle (), this->connection_id ())); // ACE_Queue in *front* of the list to preserve order. if (this->msg_queue ()->enqueue_head (event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1); // Tell ACE_Reactor to call us back when we can send again. else if (ACE_Reactor::instance ()->schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1); return 0; } else return n;}ssize_tConsumer_Handler::send (ACE_Message_Block *event){ ACE_DEBUG ((LM_DEBUG, "(%t) sending %d bytes to Consumer %d\n", event->length (), this->connection_id ())); ssize_t len = event->length (); ssize_t n = this->peer ().send (event->rd_ptr (), len); if (n <= 0) return errno == EWOULDBLOCK ? 0 : n; else if (n < len) { // Re-adjust pointer to skip over the part we did send. event->rd_ptr (n); errno = EWOULDBLOCK; } else // if (n == length) { // The whole event is sent, we now decrement the reference count // (which deletes itself with it reaches 0). event->release (); errno = 0; } this->total_bytes (n); return n;}// Finish sending an event when flow control conditions abate.// This method is automatically called by the ACE_Reactor.intConsumer_Handler::handle_output (ACE_HANDLE){ ACE_Message_Block *event = 0; ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%t) Receiver signalled 'resume transmission' %d\n"), this->get_handle ())); // WIN32 Notes: When the receiver blocked, we started adding to the // consumer handler's message Q. At this time, we registered a // callback with the reactor to tell us when the TCP layer signalled // that we could continue to send messages to the consumer. However, // Winsock only sends this notification ONCE, so we have to assume // at the application level, that we can continue to send until we // get any subsequent blocking signals from the receiver's buffer.#if defined (ACE_WIN32) // Win32 Winsock doesn't trigger multiple "You can write now" // signals, so we have to assume that we can continue to write until // we get another EWOULDBLOCK. // We cancel the wakeup callback we set earlier. if (ACE_Reactor::instance ()->cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("Error in ACE_Reactor::cancel_wakeup()")), -1); // The list had better not be empty, otherwise there's a bug! while (this->msg_queue ()->dequeue_head (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) { switch (this->nonblk_put (event)) { case -1: // Error sending message to consumer. { // We are responsible for releasing an ACE_Message_Block if // failures occur. event->release (); ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("transmission failure"))); break; } case 0: // Partial Send - we got flow controlled by the receiver { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%D Partial Send due to flow control") ACE_TEXT ("- scheduling new wakeup with reactor\n"))); // Re-schedule a wakeup call from the reactor when the // flow control conditions abate. if (ACE_Reactor::instance ()->schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("Error in ACE_Reactor::schedule_wakeup()")), -1); // Didn't write everything this time, come back later... return 0; } default: // Sent the whole thing { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Sent message from message Q, Q size = %d\n"), this->msg_queue()->message_count ())); break; } } } // If we drop out of the while loop, then the message Q should be // empty...or there's a problem in the dequeue_head() call...but // thats another story. ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%D Sent all messages from consumers message Q\n"))); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"), this->get_handle (), this->connection_id ()));#else /* !defined (ACE_WIN32) */ // The list had better not be empty, otherwise there's a bug! if (this->msg_queue ()->dequeue_head (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) { switch (this->nonblk_put (event)) { case 0: // Partial send. ACE_ASSERT (errno == EWOULDBLOCK); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%D Partial Send\n"))); // Didn't write everything this time, come back later... break; case -1: // We are responsible for releasing an ACE_Message_Block if // failures occur. event->release (); ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("transmission failure"))); /* FALLTHROUGH */ default: // Sent the whole thing. // If we succeed in writing the entire event (or we did not // fail due to EWOULDBLOCK) then check if there are more // events on the Message_Queue. If there aren't, tell the // ACE_Reactor not to notify us anymore (at least until // there are new events queued up). ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("QQQ::Sent Message from consumer's Q\n"))); if (this->msg_queue ()->is_empty ()) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"), this->get_handle (), this->connection_id ())); if (ACE_Reactor::instance ()->cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("cancel_wakeup"))); } } } else ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("dequeue_head - handle_output called by reactor but nothing in Q")));#endif /* ACE_WIN32 */ return 0;}// Send an event to a Consumer (may queue if necessary).intConsumer_Handler::put (ACE_Message_Block *event, ACE_Time_Value *){ if (this->msg_queue ()->is_empty ()) // Try to send the event *without* blocking! return this->nonblk_put (event); else // If we have queued up events due to flow control then just // enqueue and return. return this->msg_queue ()->enqueue_tail (event, (ACE_Time_Value *) &ACE_Time_Value::zero);}Supplier_Handler::Supplier_Handler (const Connection_Config_Info &pci) : Connection_Handler (pci), msg_frag_ (0){ this->connection_role_ = 'S'; this->msg_queue ()->high_water_mark (0);}// Receive an Event from a Supplier. Handles fragmentation.//// The event returned from recv consists of two parts://// 1. The Address part, contains the "virtual" routing id.//// 2. The Data part, which contains the actual data to be forwarded.//// The reason for having two parts is to shield the higher layers// of software from knowledge of the event structure.intSupplier_Handler::recv (ACE_Message_Block *&forward_addr){ if (this->msg_frag_ == 0) // No existing fragment... ACE_NEW_RETURN (this->msg_frag_, ACE_Message_Block (sizeof (Event), ACE_Message_Block::MB_DATA, 0, 0, 0, Options::instance ()->locking_strategy ()), -1); Event *event = (Event *) this->msg_frag_->rd_ptr (); ssize_t header_received = 0; const size_t HEADER_SIZE = sizeof (Event_Header); ssize_t header_bytes_left_to_read = HEADER_SIZE - this->msg_frag_->length (); if (header_bytes_left_to_read > 0) { header_received = this->peer ().recv (this->msg_frag_->wr_ptr (), header_bytes_left_to_read); if (header_received == -1 /* error */ || header_received == 0 /* EOF */) { ACE_ERROR ((LM_ERROR, "%p\n", "Recv error during header read ")); ACE_DEBUG ((LM_DEBUG, "attempted to read %d\n", header_bytes_left_to_read)); this->msg_frag_ = this->msg_frag_->release (); return header_received; } // Bump the write pointer by the amount read. this->msg_frag_->wr_ptr (header_received); // At this point we may or may not have the ENTIRE header. if (this->msg_frag_->length () < HEADER_SIZE) { ACE_DEBUG ((LM_DEBUG, "Partial header received: only %d bytes\n", this->msg_frag_->length ())); // Notify the caller that we didn't get an entire event. errno = EWOULDBLOCK; return -1; } // Convert the header into host byte order so that we can access // it directly without having to repeatedly muck with it... event->header_.decode (); if (event->header_.len_ > ACE_INT32 (sizeof event->data_)) { // This data_ payload is too big! errno = EINVAL; ACE_DEBUG ((LM_DEBUG, "Data payload is too big (%d bytes)\n", event->header_.len_)); return -1; } } // At this point there is a complete, valid header in Event. Now we // need to get the event payload. Due to incomplete reads this may // not be the first time we've read in a fragment for this message. // We account for this here. Note that the first time in here // msg_frag_->wr_ptr() will point to event->data_. Every time we do // a successful fragment read, we advance wr_ptr(). Therefore, by // subtracting how much we've already read from the // event->header_.len_ we complete the data_bytes_left_to_read...
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -