📄 concrete_connection_handlers.cpp
字号:
ssize_t data_bytes_left_to_read = ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_)); ssize_t data_received = !data_bytes_left_to_read ? 0 // peer().recv() should not be called when data_bytes_left_to_read is 0. : this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read); // Try to receive the remainder of the event. switch (data_received) { case -1: if (errno == EWOULDBLOCK) // This might happen if only the header came through. return -1; else /* FALLTHROUGH */; case 0: // Premature EOF. if (data_bytes_left_to_read) { this->msg_frag_ = this->msg_frag_->release (); return 0; } /* FALLTHROUGH */; default: // Set the write pointer at 1 past the end of the event. this->msg_frag_->wr_ptr (data_received); if (data_received != data_bytes_left_to_read) { errno = EWOULDBLOCK; // Inform caller that we didn't get the whole event. return -1; } else { // Set the read pointer to the beginning of the event. this->msg_frag_->rd_ptr (this->msg_frag_->base ()); // Allocate an event forwarding header and chain the data // portion onto its continuation field. forward_addr = new ACE_Message_Block (sizeof (Event_Key), ACE_Message_Block::MB_PROTO, this->msg_frag_, 0, 0, Options::instance ()->locking_strategy ()); if (forward_addr == 0) { this->msg_frag_ = this->msg_frag_->release (); errno = ENOMEM; return -1; } Event_Key event_addr (this->connection_id (), event->header_.type_); // Copy the forwarding address from the Event_Key into // forward_addr. forward_addr->copy ((char *) &event_addr, sizeof (Event_Key)); // Reset the pointer to indicate we've got an entire event. this->msg_frag_ = 0; } this->total_bytes (data_received + header_received); ACE_DEBUG ((LM_DEBUG, "(%t) connection id = %d, cur len = %d, total bytes read = %d\n", event->header_.connection_id_, event->header_.len_, data_received + header_received)); if (Options::instance ()->enabled (Options::VERBOSE)) ACE_DEBUG ((LM_DEBUG, "data_ = %*s\n", event->header_.len_ - 2, event->data_)); // Encode before returning so that we can set things out in // network byte order. event->header_.encode (); return data_received + header_received; }}// Receive various types of input (e.g., Peer event from the gatewayd,// as well as stdio).intSupplier_Handler::handle_input (ACE_HANDLE){ ACE_Message_Block *event_key = 0; switch (this->recv (event_key)) { case 0: // Note that a peer shouldn't initiate a shutdown by closing the // connection. Therefore, the peer must have crashed, so we'll // need to bail out here and let the higher layers reconnect. this->state (Connection_Handler::FAILED); ACE_ERROR_RETURN ((LM_ERROR, "(%t) Peer has closed down unexpectedly for Input Connection_Handler %d\n", this->connection_id ()), -1); /* NOTREACHED */ case -1: if (errno == EWOULDBLOCK) // A short-read, we'll come back and finish it up later on! return 0; else // A weird problem occurred, shut down and start again. { this->state (Connection_Handler::FAILED); ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Connection_Handler %d\n", "Peer has failed unexpectedly", this->connection_id ()), -1); } /* NOTREACHED */ default: // Route messages to Consumers. return this->process (event_key); }}// This delegates to the <Event_Channel> to do the actual processing.// Typically, this forwards the event to its appropriate Consumer(s).intSupplier_Handler::process (ACE_Message_Block *event_key){ return this->event_channel_->put (event_key);}Thr_Consumer_Handler::Thr_Consumer_Handler (const Connection_Config_Info &pci) : Consumer_Handler (pci){ // It is not in thread svc() now. in_thread_ = 0;}// Overriding handle_close() method. If in thread svc(), no need to// process handle_close() when call peer().close(), because the// connection is blocked now.intThr_Consumer_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m){ if (in_thread_) return 0; else return Consumer_Handler::handle_close (h, m);}// This method should be called only when the Consumer shuts down// unexpectedly. This method marks the Connection_Handler as having// failed and deactivates the ACE_Message_Queue (to wake up the thread// blocked on <dequeue_head> in svc()).// Thr_Consumer_Handler::handle_close () will eventually try to// reconnect...// Let Consumer_Handler receive normal data.intThr_Consumer_Handler::handle_input (ACE_HANDLE h){ // Call down to the <Consumer_Handler> to handle this first. if (this->Consumer_Handler::handle_input (h) != 0) { // Only do such work when failed. ACE_Reactor::instance ()->remove_handler (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); // Deactivate the queue while we try to get reconnected. this->msg_queue ()->deactivate (); // Will call handle_close. return -1; } return 0;}// Initialize the threaded Consumer_Handler object and spawn a new// thread.intThr_Consumer_Handler::open (void *){ // Turn off non-blocking I/O. if (this->peer ().disable (ACE_NONBLOCK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "disable"), -1); // Incorrect info fixed. // Call back to the <Event_Channel> to complete our initialization. else if (this->event_channel_->complete_connection_connection (this) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_connection_connection"), -1); // Register ourselves to receive input events (which indicate that // the Consumer has shut down unexpectedly). else if (ACE_Reactor::instance ()->register_handler (this, ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); // Reactivate message queue. If it was active then this is the // first time in and we need to spawn a thread, otherwise the queue // was inactive due to some problem and we've already got a thread. else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE) { ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); // Become an active object by spawning a new thread to transmit // events to Consumers. return this->activate (THR_NEW_LWP | THR_DETACHED); } else { ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); return 0; }}// Queue up an event for transmission (must not block since// Supplier_Handlers may be single-threaded).intThr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *){ // Perform non-blocking enqueue, i.e., if <msg_queue> is full // *don't* block! return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);}// Transmit events to the peer. Note the simplification resulting// from the use of threads, compared with the Reactive solution.intThr_Consumer_Handler::svc (void){ for (in_thread_ = 1;;) { ACE_DEBUG ((LM_DEBUG, "(%t) Thr_Consumer_Handler's handle = %d\n", this->peer ().get_handle ())); // Since this method runs in its own thread it is OK to block on // output. for (ACE_Message_Block *mb = 0; this->msg_queue ()->dequeue_head (mb) != -1; ) if (this->send (mb) == -1) ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed")); ACE_ASSERT (errno == ESHUTDOWN); ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Consumer_Handler %d on handle %d\n", this->connection_id (), this->get_handle ())); this->peer ().close (); // Re-establish the connection, using exponential backoff. for (this->timeout (1); // Default is to reconnect synchronously. this->event_channel_->initiate_connection_connection (this, 1) == -1; // Second parameter '1' means using sync mode directly, // don't care Options::blocking_semantics(). If don't do // so, async mode will be used to connect which won't // satisfy original design. ) { ACE_Time_Value tv (this->timeout ()); ACE_ERROR ((LM_ERROR, "(%t) reattempting connection, sec = %d\n", tv.sec ())); ACE_OS::sleep (tv); } } ACE_NOTREACHED (return 0;)}Thr_Supplier_Handler::Thr_Supplier_Handler (const Connection_Config_Info &pci) : Supplier_Handler (pci){ // It is not in thread svc() now. in_thread_ = 0;}// Overriding handle_close() method. If in thread svc(), no need to// process handle_close() when call peer().close(), because the// connection is blocked now.intThr_Supplier_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m){ if (in_thread_) return 0; else return Supplier_Handler::handle_close (h, m);}intThr_Supplier_Handler::open (void *){ // Turn off non-blocking I/O. if (this->peer ().disable (ACE_NONBLOCK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "disable"), -1); // Incorrect info fixed. // Call back to the <Event_Channel> to complete our initialization. else if (this->event_channel_->complete_connection_connection (this) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_connection_connection"), -1); // Reactivate message queue. If it was active then this is the // first time in and we need to spawn a thread, otherwise the queue // was inactive due to some problem and we've already got a thread. else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE) { ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); // Become an active object by spawning a new thread to transmit // events to peers. return this->activate (THR_NEW_LWP | THR_DETACHED); } else { ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); return 0; }}// Receive events from a Peer in a separate thread (note reuse of// existing code!).intThr_Supplier_Handler::svc (void){ for (in_thread_ = 1;;) { ACE_DEBUG ((LM_DEBUG, "(%t) Thr_Supplier_Handler's handle = %d\n", this->peer ().get_handle ())); // Since this method runs in its own thread and processes events // for one connection it is OK to call down to the // <Supplier_Handler::handle_input> method, which blocks on // input. while (this->Supplier_Handler::handle_input () != -1) continue; ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Supplier_Handler %d on handle %d\n", this->connection_id (), this->get_handle ())); this->peer ().close (); // Deactivate the queue while we try to get reconnected. this->msg_queue ()->deactivate (); // Re-establish the connection, using expoential backoff. for (this->timeout (1); // Default is to reconnect synchronously. this->event_channel_->initiate_connection_connection (this, 1) == -1; // Second parameter '1' means using sync mode directly, // don't care Options::blocking_semantics(). If don't do // so, async mode will be used to connect which won't // satisfy original design. ) { ACE_Time_Value tv (this->timeout ()); ACE_ERROR ((LM_ERROR, "(%t) reattempting connection, sec = %d\n", tv.sec ())); ACE_OS::sleep (tv); } } ACE_NOTREACHED(return 0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -