📄 concrete_connection_handlers.cpp
字号:
ssize_t data_bytes_left_to_read =
ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));
ssize_t data_received =
!data_bytes_left_to_read
? 0 // peer().recv() should not be called when data_bytes_left_to_read is 0.
: this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read);
// Try to receive the remainder of the event.
switch (data_received)
{
case -1:
if (errno == EWOULDBLOCK)
// This might happen if only the header came through.
return -1;
else
/* FALLTHROUGH */;
case 0: // Premature EOF.
if (data_bytes_left_to_read)
{
this->msg_frag_ = this->msg_frag_->release ();
return 0;
}
/* FALLTHROUGH */;
default:
// Set the write pointer at 1 past the end of the event.
this->msg_frag_->wr_ptr (data_received);
if (data_received != data_bytes_left_to_read)
{
errno = EWOULDBLOCK;
// Inform caller that we didn't get the whole event.
return -1;
}
else
{
// Set the read pointer to the beginning of the event.
this->msg_frag_->rd_ptr (this->msg_frag_->base ());
// Allocate an event forwarding header and chain the data
// portion onto its continuation field.
forward_addr = new ACE_Message_Block (sizeof (Event_Key),
ACE_Message_Block::MB_PROTO,
this->msg_frag_,
0,
0,
Options::instance ()->locking_strategy ());
if (forward_addr == 0)
{
this->msg_frag_ = this->msg_frag_->release ();
errno = ENOMEM;
return -1;
}
Event_Key event_addr (this->connection_id (),
event->header_.type_);
// Copy the forwarding address from the Event_Key into
// forward_addr.
forward_addr->copy ((char *) &event_addr, sizeof (Event_Key));
// Reset the pointer to indicate we've got an entire event.
this->msg_frag_ = 0;
}
this->total_bytes (data_received + header_received);
ACE_DEBUG ((LM_DEBUG,
"(%t) connection id = %d, cur len = %d, total bytes read = %d\n",
event->header_.connection_id_,
event->header_.len_,
data_received + header_received));
if (Options::instance ()->enabled (Options::VERBOSE))
ACE_DEBUG ((LM_DEBUG,
"data_ = %*s\n",
event->header_.len_ - 2,
event->data_));
// Encode before returning so that we can set things out in
// network byte order.
event->header_.encode ();
return data_received + header_received;
}
}
// Receive various types of input (e.g., Peer event from the gatewayd,
// as well as stdio).
int
Supplier_Handler::handle_input (ACE_HANDLE)
{
ACE_Message_Block *event_key = 0;
switch (this->recv (event_key))
{
case 0:
// Note that a peer shouldn't initiate a shutdown by closing the
// connection. Therefore, the peer must have crashed, so we'll
// need to bail out here and let the higher layers reconnect.
this->state (Connection_Handler::FAILED);
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) Peer has closed down unexpectedly for Input Connection_Handler %d\n",
this->connection_id ()),
-1);
/* NOTREACHED */
case -1:
if (errno == EWOULDBLOCK)
// A short-read, we'll come back and finish it up later on!
return 0;
else // A weird problem occurred, shut down and start again.
{
this->state (Connection_Handler::FAILED);
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Connection_Handler %d\n",
"Peer has failed unexpectedly",
this->connection_id ()),
-1);
}
/* NOTREACHED */
default:
// Route messages to Consumers.
return this->process (event_key);
}
}
// This delegates to the <Event_Channel> to do the actual processing.
// Typically, this forwards the event to its appropriate Consumer(s).
int
Supplier_Handler::process (ACE_Message_Block *event_key)
{
return this->event_channel_->put (event_key);
}
Thr_Consumer_Handler::Thr_Consumer_Handler (const Connection_Config_Info &pci)
: Consumer_Handler (pci)
{
// It is not in thread svc() now.
in_thread_ = 0;
}
// Overriding handle_close() method. If in thread svc(), no need to
// process handle_close() when call peer().close(), because the
// connection is blocked now.
int
Thr_Consumer_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
{
if (in_thread_)
return 0;
else
return Consumer_Handler::handle_close (h, m);
}
// This method should be called only when the Consumer shuts down
// unexpectedly. This method marks the Connection_Handler as having
// failed and deactivates the ACE_Message_Queue (to wake up the thread
// blocked on <dequeue_head> in svc()).
// Thr_Consumer_Handler::handle_close () will eventually try to
// reconnect...
// Let Consumer_Handler receive normal data.
int
Thr_Consumer_Handler::handle_input (ACE_HANDLE h)
{
// Call down to the <Consumer_Handler> to handle this first.
if (this->Consumer_Handler::handle_input (h) != 0)
{
// Only do such work when failed.
ACE_Reactor::instance ()->remove_handler
(h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
// Deactivate the queue while we try to get reconnected.
this->msg_queue ()->deactivate ();
// Will call handle_close.
return -1;
}
return 0;
}
// Initialize the threaded Consumer_Handler object and spawn a new
// thread.
int
Thr_Consumer_Handler::open (void *)
{
// Turn off non-blocking I/O.
if (this->peer ().disable (ACE_NONBLOCK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) %p\n",
"disable"),
-1); // Incorrect info fixed.
// Call back to the <Event_Channel> to complete our initialization.
else if (this->event_channel_->complete_connection_connection (this) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) %p\n",
"complete_connection_connection"),
-1);
// Register ourselves to receive input events (which indicate that
// the Consumer has shut down unexpectedly).
else if (ACE_Reactor::instance ()->register_handler
(this, ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) %p\n",
"register_handler"),
-1);
// Reactivate message queue. If it was active then this is the
// first time in and we need to spawn a thread, otherwise the queue
// was inactive due to some problem and we've already got a thread.
else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE)
{
ACE_DEBUG ((LM_DEBUG,
"(%t) spawning new thread\n"));
// Become an active object by spawning a new thread to transmit
// events to Consumers.
return this->activate (THR_NEW_LWP | THR_DETACHED);
}
else
{
ACE_DEBUG ((LM_DEBUG,
"(%t) reusing existing thread\n"));
return 0;
}
}
// Queue up an event for transmission (must not block since
// Supplier_Handlers may be single-threaded).
int
Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
{
// Perform non-blocking enqueue, i.e., if <msg_queue> is full
// *don't* block!
return this->msg_queue ()->enqueue_tail
(mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
}
// Transmit events to the peer. Note the simplification resulting
// from the use of threads, compared with the Reactive solution.
int
Thr_Consumer_Handler::svc (void)
{
for (in_thread_ = 1;;)
{
ACE_DEBUG ((LM_DEBUG,
"(%t) Thr_Consumer_Handler's handle = %d\n",
this->peer ().get_handle ()));
// Since this method runs in its own thread it is OK to block on
// output.
for (ACE_Message_Block *mb = 0;
this->msg_queue ()->dequeue_head (mb) != -1;
)
if (this->send (mb) == -1)
ACE_ERROR ((LM_ERROR,
"(%t) %p\n",
"send failed"));
ACE_ASSERT (errno == ESHUTDOWN);
ACE_DEBUG ((LM_DEBUG,
"(%t) shutting down threaded Consumer_Handler %d on handle %d\n",
this->connection_id (),
this->get_handle ()));
this->peer ().close ();
// Re-establish the connection, using exponential backoff.
for (this->timeout (1);
// Default is to reconnect synchronously.
this->event_channel_->initiate_connection_connection (this, 1) == -1;
// Second parameter '1' means using sync mode directly,
// don't care Options::blocking_semantics(). If don't do
// so, async mode will be used to connect which won't
// satisfy original design.
)
{
ACE_Time_Value tv (this->timeout ());
ACE_ERROR ((LM_ERROR,
"(%t) reattempting connection, sec = %d\n",
tv.sec ()));
ACE_OS::sleep (tv);
}
}
ACE_NOTREACHED (return 0;)
}
Thr_Supplier_Handler::Thr_Supplier_Handler (const Connection_Config_Info &pci)
: Supplier_Handler (pci)
{
// It is not in thread svc() now.
in_thread_ = 0;
}
// Overriding handle_close() method. If in thread svc(), no need to
// process handle_close() when call peer().close(), because the
// connection is blocked now.
int
Thr_Supplier_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
{
if (in_thread_)
return 0;
else
return Supplier_Handler::handle_close (h, m);
}
int
Thr_Supplier_Handler::open (void *)
{
// Turn off non-blocking I/O.
if (this->peer ().disable (ACE_NONBLOCK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) %p\n",
"disable"),
-1); // Incorrect info fixed.
// Call back to the <Event_Channel> to complete our initialization.
else if (this->event_channel_->complete_connection_connection (this) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) %p\n",
"complete_connection_connection"),
-1);
// Reactivate message queue. If it was active then this is the
// first time in and we need to spawn a thread, otherwise the queue
// was inactive due to some problem and we've already got a thread.
else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE)
{
ACE_DEBUG ((LM_DEBUG,
"(%t) spawning new thread\n"));
// Become an active object by spawning a new thread to transmit
// events to peers.
return this->activate (THR_NEW_LWP | THR_DETACHED);
}
else
{
ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
return 0;
}
}
// Receive events from a Peer in a separate thread (note reuse of
// existing code!).
int
Thr_Supplier_Handler::svc (void)
{
for (in_thread_ = 1;;)
{
ACE_DEBUG ((LM_DEBUG,
"(%t) Thr_Supplier_Handler's handle = %d\n",
this->peer ().get_handle ()));
// Since this method runs in its own thread and processes events
// for one connection it is OK to call down to the
// <Supplier_Handler::handle_input> method, which blocks on
// input.
while (this->Supplier_Handler::handle_input () != -1)
continue;
ACE_DEBUG ((LM_DEBUG,
"(%t) shutting down threaded Supplier_Handler %d on handle %d\n",
this->connection_id (),
this->get_handle ()));
this->peer ().close ();
// Deactivate the queue while we try to get reconnected.
this->msg_queue ()->deactivate ();
// Re-establish the connection, using expoential backoff.
for (this->timeout (1);
// Default is to reconnect synchronously.
this->event_channel_->initiate_connection_connection (this, 1) == -1;
// Second parameter '1' means using sync mode directly,
// don't care Options::blocking_semantics(). If don't do
// so, async mode will be used to connect which won't
// satisfy original design.
)
{
ACE_Time_Value tv (this->timeout ());
ACE_ERROR ((LM_ERROR,
"(%t) reattempting connection, sec = %d\n",
tv.sec ()));
ACE_OS::sleep (tv);
}
}
ACE_NOTREACHED(return 0);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -