📄 av_core.cpp
字号:
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_Core::init_forward_flows: remove_handler failed\n"));
}
}
default:
break;
}
// Now if the address_set has been changed due to the addition of a control entry we should
// add that to the flow_spec_set also.
if (flow_spec_set.find (entry) < 0)
{
// entry doesn't exist so add it.
flow_spec_set.insert (entry);
}
}
}
AVStreams::flowSpec new_flowspec (ACE_static_cast (CORBA::ULong,
flow_spec_set.size ()));
int i=0;
TAO_AV_FlowSpecSetItor connect_end = address_flow_set.end ();
TAO_AV_FlowSpecSetItor connect = address_flow_set.begin ();
for (;connect != connect_end; ++connect)
{
ACE_Addr *local_addr;
ACE_Addr *local_control_addr;
local_addr = (*connect)->get_local_addr ();
local_control_addr = (*connect)->get_local_control_addr ();
if (local_addr != 0)
{
TAO_Reverse_FlowSpec_Entry entry ((*connect)->flowname (),
(*connect)->direction_str (),
(*connect)->format (),
(*connect)->flow_protocol_str (),
(*connect)->carrier_protocol_str (),
local_addr,
local_control_addr);
/*
ACE_Addr *addr;
if ((addr = (*connect)->get_peer_addr ()) != 0)
{
entry.set_peer_addr (addr);
};
*/
int len = new_flowspec.length ();
if (i == len)
new_flowspec.length (len+1);
new_flowspec [i++] = entry.entry_to_string ();
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "reverse Flow Spec Is %s\n", entry.entry_to_string ()));
}
}
connect_end = flow_set.end ();
for (connect = flow_set.begin ();
connect != connect_end; ++connect)
{
ACE_Addr *local_addr;
ACE_Addr *local_control_addr;
local_addr = (*connect)->get_local_addr ();
local_control_addr = (*connect)->get_local_control_addr ();
if (local_addr != 0)
{
TAO_Reverse_FlowSpec_Entry entry ((*connect)->flowname (),
(*connect)->direction_str (),
(*connect)->format (),
(*connect)->flow_protocol_str (),
(*connect)->carrier_protocol_str (),
local_addr,
local_control_addr);
int len = new_flowspec.length ();
if (i == len)
new_flowspec.length (len+1);
new_flowspec [i++] = entry.entry_to_string ();
}
}
// Change the reverse flow spec to be sent.
// int index = flow_spec.length () + 1;
int index = new_flowspec.length ();
flow_spec.length (index);
for (i = 0; i < index; i++)
{
flow_spec [i] = new_flowspec [i];
}
}
break;
default:
break;
}
return 0;
}
int
TAO_AV_Core::init_reverse_flows (TAO_Base_StreamEndPoint *endpoint,
TAO_AV_FlowSpecSet &forward_flow_spec_set,
TAO_AV_FlowSpecSet &reverse_flow_spec_set,
TAO_AV_Core::EndPoint direction)
{
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"(%P|%t)TAO_AV_Core::init_reverse_flows\n"));
TAO_AV_FlowSpecSet acceptor_flow_set;
TAO_AV_FlowSpecSet connector_flow_set;
TAO_AV_FlowSpecSetItor end = reverse_flow_spec_set.end ();
TAO_AV_FlowSpecSetItor start = reverse_flow_spec_set.begin ();
for (;start != end; ++start)
{
TAO_FlowSpec_Entry *entry = (*start);
ACE_Addr *address = entry->address ();
switch (direction)
{
case TAO_AV_Core::TAO_AV_ENDPOINT_B:
{
switch (entry->direction ())
{
case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
entry->role (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
break;
case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
entry->role (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
break;
}
break;
}
case TAO_AV_Core::TAO_AV_ENDPOINT_A:
{
switch (entry->direction ())
{
case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
entry->role (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
break;
case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
entry->role (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
break;
}
break;
}
default: break;
}
if (address != 0)
{
if (this->get_acceptor (entry->flowname ())!= 0)
{
ACE_Addr *address = entry->address ();
TAO_FlowSpec_Entry *forward_entry =
this->get_flow_spec_entry (forward_flow_spec_set,
entry->flowname ());
if (forward_entry != 0)
forward_entry->set_peer_addr (address);
}
else
connector_flow_set.insert (entry);
}
}
int result = -1;
switch (direction)
{
case TAO_AV_Core::TAO_AV_ENDPOINT_A:
{
result = this->connector_registry_->open (endpoint,
this,
connector_flow_set);
}
break;
default:
break;
}
if (result == -1)
ACE_ERROR_RETURN ((LM_ERROR,"acceptor_registry::open"),-1);
return 0;
}
TAO_FlowSpec_Entry *
TAO_AV_Core::get_flow_spec_entry (TAO_AV_FlowSpecSet &flow_spec_set,
const char *flowname)
{
TAO_AV_FlowSpecSetItor end = flow_spec_set.end ();
TAO_AV_FlowSpecSetItor begin = flow_spec_set.begin ();
for (;
begin != end;
++begin)
{
if (ACE_OS::strcmp ((*begin)->flowname (),flowname) == 0)
return (*begin);
}
return 0;
}
TAO_AV_Acceptor*
TAO_AV_Core::get_acceptor (const char *flowname)
{
ACE_TRY_NEW_ENV
{
TAO_AV_AcceptorSetItor acceptor = this->acceptor_registry_->begin ();
ACE_TRY_CHECK;
TAO_AV_AcceptorSetItor end =
this->acceptor_registry_->end ();
for (;acceptor != end; ++acceptor)
{
if (ACE_OS::strcmp ((*acceptor)->flowname (),flowname) == 0)
return *acceptor;
}
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_AV_Core::get_acceptor");
}
ACE_ENDTRY;
return 0;
}
int
TAO_AV_Core::remove_acceptor (const char *flowname)
{
ACE_TRY_NEW_ENV
{
TAO_AV_AcceptorSetItor acceptor = this->acceptor_registry_->begin ();
ACE_TRY_CHECK;
TAO_AV_AcceptorSetItor end =
this->acceptor_registry_->end ();
for (;acceptor != end; ++acceptor)
{
if (ACE_OS::strcmp ((*acceptor)->flowname (),flowname) == 0)
{
this->acceptor_registry_->close (*acceptor);
return 0;
}
}
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_AV_Core::get_acceptor");
}
ACE_ENDTRY;
return -1;
}
TAO_AV_Connector*
TAO_AV_Core::get_connector (const char *flowname)
{
TAO_AV_ConnectorSetItor connector =
this->connector_registry_->begin ();
TAO_AV_ConnectorSetItor end =
this->connector_registry_->end ();
for (;connector != end; ++connector)
{
if (ACE_OS::strcmp ((*connector)->flowname (),flowname) == 0)
return *connector;
}
return 0;
}
int
TAO_AV_Core::remove_connector (const char *flowname)
{
TAO_AV_ConnectorSetItor connector =
this->connector_registry_->begin ();
TAO_AV_ConnectorSetItor end =
this->connector_registry_->end ();
for (;connector != end; ++connector)
{
if (ACE_OS::strcmp ((*connector)->flowname (),flowname) == 0)
{
this->connector_registry_->close (*connector);
return 0;
}
}
return -1;
}
TAO_AV_Flow_Protocol_Factory *
TAO_AV_Core::get_flow_protocol_factory(const char *flow_protocol)
{
if (flow_protocol == 0)
return 0;
for (TAO_AV_Flow_ProtocolFactorySetItor control_flow_factory =
this->flow_protocol_factories_.begin ();
control_flow_factory !=
this->flow_protocol_factories_.end ();
++control_flow_factory)
{
if ((*control_flow_factory)->factory ()->match_protocol (flow_protocol))
{
return (*control_flow_factory)->factory ();
}
}
// Not found.
return 0;
}
TAO_AV_Transport_Factory *
TAO_AV_Core::get_transport_factory(const char *transport_protocol)
{
if (transport_protocol == 0)
return 0;
for (TAO_AV_TransportFactorySetItor transport_factory =
this->transport_factories_.begin ();
transport_factory != this->transport_factories_.end ();
++transport_factory)
{
if ((*transport_factory)->factory ()->match_protocol (transport_protocol))
{
return (*transport_factory)->factory ();
}
}
// Not found.
return 0;
}
int
TAO_AV_Core::load_default_transport_factories (void)
{
const char *udp_factory_str = "UDP_Factory";
const char *tcp_factory_str = "TCP_Factory";
TAO_AV_Transport_Factory *udp_factory = 0;
TAO_AV_Transport_Item *udp_item = 0;
udp_factory =
ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (udp_factory_str);
if (udp_factory == 0)
{
if (TAO_debug_level)
ACE_ERROR ((LM_WARNING,
"(%P|%t) WARNING - No %s found in Service Repository."
" Using default instance.\n",
"UDP Factory"));
ACE_NEW_RETURN (udp_factory,
TAO_AV_UDP_Factory,
-1);
}
else udp_factory->ref_count = 1;
ACE_NEW_RETURN (udp_item, TAO_AV_Transport_Item ("UDP_Factory"), -1);
udp_item->factory (udp_factory);
this->transport_factories_.insert (udp_item);
TAO_AV_Transport_Factory *tcp_factory = 0;
TAO_AV_Transport_Item *tcp_item = 0;
tcp_factory =
ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (tcp_factory_str);
if (tcp_factory == 0)
{
if (TAO_debug_level)
ACE_ERROR ((LM_WARNING,
"(%P|%t) WARNING - No %s found in Service Repository."
" Using default instance.\n",
"TCP Factory"));
ACE_NEW_RETURN (tcp_factory,
TAO_AV_TCP_Factory,
-1);
}
else tcp_factory->ref_count = 1;
ACE_NEW_RETURN (tcp_item, TAO_AV_Transport_Item ("TCP_Factory"), -1);
tcp_item->factory (tcp_factory);
this->transport_factories_.insert (tcp_item);
#if defined (ACE_HAS_RAPI) || defined (ACE_HAS_WINSOCK2_GQOS)
const char *udp_qos_factory_str = "UDP_QoS_Factory";
TAO_AV_Transport_Factory *udp_qos_factory = 0;
TAO_AV_Transport_Item *udp_qos_item = 0;
udp_qos_factory =
ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (udp_qos_factory_str);
if (udp_qos_factory == 0)
{
if (TAO_debug_level)
ACE_ERROR ((LM_WARNING,
"(%P|%t) WARNING - No %s found in Service Repository."
" Using default instance.\n",
"UDP QoS Factory"));
ACE_NEW_RETURN (udp_qos_factory,
TAO_AV_UDP_QoS_Factory,
-1);
}
else udp_qos_factory->ref_count = 1;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -