📄 event_channel.cpp
字号:
// Event_Channel.cpp,v 4.39 2006/01/23 18:13:38 shuston Exp#define ACE_BUILD_SVC_DLL#include "Connection_Handler_Connector.h"#include "Event_Channel.h"#include "ace/OS_NS_sys_select.h"#include "ace/Signal.h"ACE_RCSID(Gateway, Event_Channel, "Event_Channel.cpp,v 4.39 2006/01/23 18:13:38 shuston Exp")Event_Channel::~Event_Channel (void){}Event_Channel::Event_Channel (void) : supplier_acceptor_ (*this, 'S'), consumer_acceptor_ (*this, 'C'){}intEvent_Channel::compute_performance_statistics (void){ ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n")); CONNECTION_MAP_ITERATOR cmi (this->connection_map_); // If we've got a <ACE_Thread_Manager> then use it to suspend all // the threads. This will enable us to get an accurate count. if (Options::instance ()->threading_strategy () != Options::REACTIVE) { if (ACE_Thread_Manager::instance ()->suspend_all () == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads...")); } size_t total_bytes_in = 0; size_t total_bytes_out = 0; // Iterate through the connection map summing up the number of bytes // sent/received. for (CONNECTION_MAP_ENTRY *me = 0; cmi.next (me) != 0; cmi.advance ()) { Connection_Handler *connection_handler = me->int_id_; if (connection_handler->connection_role () == 'C') total_bytes_out += connection_handler->total_bytes (); else // connection_handler->connection_role () == 'S' total_bytes_in += connection_handler->total_bytes (); } ACE_DEBUG ((LM_DEBUG, "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n", Options::instance ()->performance_window (), total_bytes_in, total_bytes_out)); ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec received.\n", (float) (total_bytes_in * 8 / (float) (1024 * 1024 * Options::instance ()->performance_window ())))); ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec sent.\n", (float) (total_bytes_out * 8 / (float) (1024 * 1024 * Options::instance ()->performance_window ())))); // Resume all the threads again. if (Options::instance ()->threading_strategy () != Options::REACTIVE) { if (ACE_Thread_Manager::instance ()->resume_all () == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "resume_all"), -1); ACE_DEBUG ((LM_DEBUG, "(%t) resuming all threads...")); } return 0;}intEvent_Channel::handle_timeout (const ACE_Time_Value &, const void *){ // This is called periodically to compute performance statistics. return this->compute_performance_statistics ();}intEvent_Channel::put (ACE_Message_Block *event, ACE_Time_Value *){ // We got a valid event, so determine its type, which is stored in // the first of the two <ACE_Message_Block>s, which are chained // together by <ACE::recv>. Event_Key *event_key = (Event_Key *) event->rd_ptr (); // Skip over the address portion and get the data, which is in the // second <ACE_Message_Block>. ACE_Message_Block *data = event->cont (); switch (event_key->type_) { case ROUTING_EVENT: this->routing_event (event_key, data); break; case SUBSCRIPTION_EVENT: this->subscription_event (data); break; } // Release the memory in the message block. event->release (); return 0;}voidEvent_Channel::subscription_event (ACE_Message_Block *data){ Event *event = (Event *) data->rd_ptr (); ACE_DEBUG ((LM_DEBUG, "(%t) received a subscription with %d bytes from connection id %d\n", event->header_.len_, event->header_.connection_id_)); Subscription *subscription = (Subscription *) event->data_; // Convert the subscription into host byte order so that we can // access it directly without having to repeatedly muck with it... subscription->decode (); ACE_DEBUG ((LM_DEBUG, "(%t) connection_id_ = %d, total_consumers_ = %d\n", subscription->connection_id_, subscription->total_consumers_)); for (ACE_INT32 i = 0; i < subscription->total_consumers_; i++) ACE_DEBUG ((LM_DEBUG, "(%t) consumers_[%d] = %d\n", i, subscription->consumers_[i]));}voidEvent_Channel::routing_event (Event_Key *forwarding_address, ACE_Message_Block *data){ Consumer_Dispatch_Set *dispatch_set = 0; // Initialize the <dispatch_set> to points to the set of Consumers // associated with this forwarding address. if (this->efd_.find (*forwarding_address, dispatch_set) == -1) // Failure. ACE_ERROR ((LM_DEBUG, "(%t) find failed on connection id = %d, type = %d\n", forwarding_address->connection_id_, forwarding_address->type_)); else { // Check to see if there are any consumers. if (dispatch_set->size () == 0) ACE_DEBUG ((LM_WARNING, "there are no active consumers for this event currently\n")); else // There are consumers, so forward the event. { // Initialize the interator. Consumer_Dispatch_Set_Iterator dsi (*dispatch_set); // At this point, we should assign a thread-safe locking // strategy to the <ACE_Message_Block> is we're running in a // multi-threaded configuration. data->locking_strategy (Options::instance ()->locking_strategy ()); for (Connection_Handler **connection_handler = 0; dsi.next (connection_handler) != 0; dsi.advance ()) { // Only process active connection_handlers. if ((*connection_handler)->state () == Connection_Handler::ESTABLISHED) { // Duplicate the event portion via reference // counting. ACE_Message_Block *dup_msg = data->duplicate (); ACE_DEBUG ((LM_DEBUG, "(%t) forwarding to Consumer %d\n", (*connection_handler)->connection_id ())); if ((*connection_handler)->put (dup_msg) == -1) { if (errno == EWOULDBLOCK) // The queue has filled up! ACE_ERROR ((LM_ERROR, "(%t) %p\n", "gateway is flow controlled, so we're dropping events")); else ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n", "put", (*connection_handler)->connection_id ())); // We are responsible for releasing an // ACE_Message_Block if failures occur. dup_msg->release (); } } } } }}intEvent_Channel::initiate_connection_connection (Connection_Handler *connection_handler, int sync_directly){ ACE_Synch_Options synch_options; if (sync_directly) // In separated connection handler thread, connection can be // initiated by block mode (synch mode) directly. synch_options = ACE_Synch_Options::synch; else if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK) synch_options = ACE_Synch_Options::asynch; else synch_options = ACE_Synch_Options::synch; return this->connector_.initiate_connection (connection_handler, synch_options);}intEvent_Channel::complete_connection_connection (Connection_Handler *connection_handler){ int option = connection_handler->connection_role () == 'S' ? SO_RCVBUF : SO_SNDBUF; int socket_queue_size = Options::instance ()->socket_queue_size (); if (socket_queue_size > 0) if (connection_handler->peer ().set_option (SOL_SOCKET, option, &socket_queue_size, sizeof (int)) == -1) ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); connection_handler->thr_mgr (ACE_Thread_Manager::instance ()); // Our state is now "established." connection_handler->state (Connection_Handler::ESTABLISHED); // Restart the timeout to 1. connection_handler->timeout (1); ACE_INT32 id = htonl (connection_handler->connection_id ()); // Send the connection id to the peerd. ssize_t n = connection_handler->peer ().send ((const void *) &id, sizeof id); if (n != sizeof id) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", n == 0 ? "peer has closed down unexpectedly" : "send"), -1); return 0;}// Restart connection (blocking_semantics dicates whether we restart// synchronously or asynchronously).intEvent_Channel::reinitiate_connection_connection (Connection_Handler *connection_handler){ // Cancel asynchronous connecting before re-initializing. It will // close the peer and cancel the asynchronous connecting. this->cancel_connection_connection(connection_handler); if (connection_handler->state () != Connection_Handler::DISCONNECTING) { ACE_DEBUG ((LM_DEBUG,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -