📄 qos_udp.cpp
字号:
ssize_t
TAO_AV_UDP_QoS_Transport::recv (iovec *iov,
int /*iovcnt*/,
ACE_Time_Value *timeout)
{
return handler_->get_socket ()->recv (iov,this->peer_addr_,0,timeout);
}
//------------------------------------------------------------
// TAO_AV_UDP_Acceptor
//------------------------------------------------------------
TAO_AV_UDP_QoS_Acceptor::TAO_AV_UDP_QoS_Acceptor (void)
{
}
TAO_AV_UDP_QoS_Acceptor::~TAO_AV_UDP_QoS_Acceptor (void)
{
}
int
TAO_AV_UDP_QoS_Acceptor::activate_svc_handler (TAO_AV_UDP_QoS_Flow_Handler *handler)
{
int result = 0;
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) Acceptor Svc Handler QOS ENABLED \n"));
TAO_AV_UDP_QoS_Session_Helper helper;
result = helper.activate_qos_handler (handler->qos_session (),
handler);
if (result == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"Error in registering the Decorator with the Reactor\n"),
-1);
return result;
}
int
TAO_AV_UDP_QoS_Acceptor::open (TAO_Base_StreamEndPoint *endpoint,
TAO_AV_Core *av_core,
TAO_FlowSpec_Entry *entry,
TAO_AV_Flow_Protocol_Factory *factory,
TAO_AV_Core::Flow_Component flow_comp)
{
ACE_UNUSED_ARG (flow_comp);
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) TAO_AV_UDP_QoS_Acceptor::open "));
this->av_core_ = av_core;
this->endpoint_ = endpoint;
this->entry_ = entry;
this->flow_protocol_factory_ = factory;
this->flowname_ = entry->flowname ();
ACE_INET_Addr *inet_addr = (ACE_INET_Addr *) entry->address ();
// inet_addr->set (inet_addr->get_port_number (),
// inet_addr->get_host_name ());
char buf[BUFSIZ];
inet_addr->addr_to_string (buf,
BUFSIZ);
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) TAO_AV_UDP_QoS_Acceptor::open: %s",
buf));
int result = this->open_i (inet_addr);
if (result < 0)
return result;
return 0;
}
int
TAO_AV_UDP_QoS_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
TAO_AV_Core *av_core,
TAO_FlowSpec_Entry *entry,
TAO_AV_Flow_Protocol_Factory *factory,
TAO_AV_Core::Flow_Component flow_comp)
{
ACE_UNUSED_ARG (flow_comp);
this->av_core_ = av_core;
this->endpoint_ = endpoint;
this->entry_ = entry;
this->flow_protocol_factory_ = factory;
this->flowname_ = entry->flowname ();
char buf [BUFSIZ];
ACE_OS::hostname (buf,
BUFSIZ);
qos_acceptor_addr_.set((u_short)0, buf);
/* ACE_INET_Addr *address;
ACE_NEW_RETURN (address,
ACE_INET_Addr ("0"),
-1);
address->addr_to_string (buf,
BUFSIZ);*/
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) ADDRESS IS %s:%d\n",
buf, qos_acceptor_addr_.get_port_number() ));
int result = this->open_i (&qos_acceptor_addr_);
if (result < 0)
return result;
return 0;
}
int
TAO_AV_UDP_QoS_Acceptor::open_i (ACE_INET_Addr *inet_addr)
{
ACE_DECLARE_NEW_CORBA_ENV;
int result = 0;
// TAO_AV_Callback *callback = 0;
// this->endpoint_->get_callback (this->flowname_.c_str (),
// callback);
ACE_INET_Addr *local_addr;
ACE_NEW_RETURN (local_addr,
ACE_INET_Addr (*inet_addr),
-1);
ACE_QoS_Params qos_params;
ACE_QoS* ace_qos = 0;
FillQoSParams (qos_params,
0,
ace_qos);
TAO_AV_UDP_QoS_Flow_Handler* handler;
ACE_NEW_RETURN (handler,
TAO_AV_UDP_QoS_Flow_Handler,
-1);
TAO_AV_Flow_Handler *flow_handler = 0;
flow_handler = handler;
handler->endpoint (this->endpoint_);
handler->flowspec_entry (this->entry_);
handler->av_core (this->av_core_);
if (this->entry_->role () == TAO_FlowSpec_Entry::TAO_AV_CONSUMER)
{
TAO_AV_UDP_QoS_Session_Helper helper;
int result = handler->get_socket ()->open (*inet_addr,
qos_params,
AF_INET,
0,
0,
0,
ACE_OVERLAPPED_SOCKET_FLAG
| ACE_FLAG_MULTIPOINT_C_LEAF
| ACE_FLAG_MULTIPOINT_D_LEAF,
1);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,
"TAO_AV_QOS_UDP_MCast_Acceptor data socket open failed (%N|%l)\n"),
-1);
result = handler->get_socket ()->get_local_addr (*local_addr);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,
"Error in getting Local Address (%N|%l)\n"),
-1);
// Create a destination address for the QoS session. The same
// address should be used for the subscribe call later. A copy
// is made below only to distinguish the two usages of the dest
// address.
ACE_INET_Addr dest_addr;
dest_addr.set (local_addr->get_port_number (),
local_addr->get_host_name ());
char dest_buf [BUFSIZ];
dest_addr.addr_to_string (dest_buf,
BUFSIZ);
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"Session Address is %s\n",
dest_buf));
this->qos_session_ = helper.open_qos_session (handler,
dest_addr);
if (this->qos_session_ == 0)
ACE_ERROR_RETURN ((LM_ERROR,
"QoS Session Open Failed (%N|%l)\n"),
-1);
handler->qos_session (this->qos_session_);
if (this->activate_svc_handler (handler) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"Activate Svc Handler Failed (%N|%l)\n"),
-1);
AVStreams::QoS qos;
int qos_available = this->endpoint_->qos ().get_flow_qos (this->flowname_.c_str (),
qos);
if (qos_available == 0)
{
ACE_Flow_Spec *ace_flow_spec = 0;
ACE_NEW_RETURN (ace_flow_spec,
ACE_Flow_Spec,
-1);
handler->translate (qos.QoSParams,
ace_flow_spec);
if (helper.set_qos (*ace_flow_spec,
handler) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"Set QoS Failed (%N|%l)\n"),
-1);
}
}
else
{
int result = handler->get_socket ()->open (*inet_addr,
qos_params,
AF_INET,
0,
0,
0,
ACE_OVERLAPPED_SOCKET_FLAG
| ACE_FLAG_MULTIPOINT_C_LEAF
| ACE_FLAG_MULTIPOINT_D_LEAF,
1);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,
"TAO_AV_QOS_UDP_MCast_Acceptor data socket open failed (%N|%l)\n"),
-1);
}
TAO_AV_Protocol_Object *object =
this->flow_protocol_factory_->make_protocol_object (this->entry_,
this->endpoint_,
flow_handler,
flow_handler->transport ());
flow_handler->protocol_object (object);
AVStreams::Negotiator_ptr negotiator;
ACE_TRY_EX (negotiator)
{
CORBA::Any_ptr negotiator_any =
this->endpoint_->get_property_value ("Negotiator"
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK_EX (negotiator);
*negotiator_any >>= negotiator;
handler->negotiator (negotiator);
}
ACE_CATCHANY
{
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) Negotiator Not Found \n"));
}
ACE_ENDTRY;
this->endpoint_->set_flow_handler (this->flowname_.c_str (),flow_handler);
this->entry_->protocol_object (object);
result = handler->get_socket ()->get_local_addr (*local_addr);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Dgram_Connector::open: get_local_addr failed\n"),result);
local_addr->set (local_addr->get_port_number (),
local_addr->get_host_name ());
if (TAO_debug_level > 0)
{
char buf [BUFSIZ];
local_addr->addr_to_string (buf,
BUFSIZ);
ACE_DEBUG ((LM_DEBUG,
"Local Address is %s\n",
buf));
}
this->entry_->set_local_addr (local_addr);
this->entry_->handler (flow_handler);
return 0;
}
int
TAO_AV_UDP_QoS_Acceptor::close (void)
{
return 0;
}
//------------------------------------------------------------
// TAO_AV_UDP_Connector
//------------------------------------------------------------
TAO_AV_UDP_QoS_Connector::TAO_AV_UDP_QoS_Connector (void)
{
}
TAO_AV_UDP_QoS_Connector::~TAO_AV_UDP_QoS_Connector (void)
{
}
int
TAO_AV_UDP_QoS_Connector::open (TAO_Base_StreamEndPoint *endpoint,
TAO_AV_Core *av_core,
TAO_AV_Flow_Protocol_Factory *factory)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"TAO_AV_UDP_QoS_Connector::open "));
this->endpoint_ = endpoint;
this->av_core_ = av_core;
this->flow_protocol_factory_ = factory;
return 0;
}
// int
// TAO_AV_UDP_QoS_Connector::translate (CosPropertyService::Properties &qos_params,
// ACE_Flow_Spec *ace_flow_spec)
// {
// for (unsigned int i = 0;
// i < qos_params.length ();
// i++)
// {
// if (ACE_OS::strcmp (qos_params [i].property_name, "Service_Type") == 0)
// {
// CORBA::Short type;
// qos_params [i].property_value >>= type;
// ace_flow_spec->service_type (type);
// }
// else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Rate") == 0)
// {
// CORBA::ULong tok_rate;
// qos_params [i].property_value >>= tok_rate;
// ace_flow_spec->token_rate (tok_rate);
// }
// else if (ACE_OS::strcmp (qos_params [i].property_name, "Token_Bucket_Rate") == 0)
// {
// CORBA::ULong tok_buck_size;
// qos_params [i].property_value >>= tok_buck_size;
// ace_flow_spec->token_bucket_size (tok_buck_size);
// }
// else if (ACE_OS::strcmp (qos_params [i].property_name, "Peak_Bandwidth") == 0)
// {
// CORBA::ULong peak_bw;
// qos_params [i].property_value >>= peak_bw;
// ace_flow_spec->peak_bandwidth (peak_bw);
// }
// else if (ACE_OS::strcmp (qos_params [i].property_name, "Latency") == 0)
// {
// CORBA::ULong lat;
// qos_params [i].property_value >>= lat;
// ace_flow_spec->latency (lat);
// }
// else if (ACE_OS::strcmp (qos_params [i].property_name, "Delay_Variation") == 0)
// {
// CORBA::ULong delay_var;
// qos_params [i].property_value >>= delay_var;
// ace_flow_spec->delay_variation (delay_var);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -