📄 sctp_seq.cpp
字号:
av_core->reactor ());
return result;
}
int
TAO_AV_SCTP_SEQ_Connector::connect (TAO_FlowSpec_Entry *entry,
TAO_AV_Transport *&transport,
TAO_AV_Core::Flow_Component flow_comp)
{
this->entry_ = entry;
if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
else
this->flowname_ = entry->flowname ();
ACE_Addr *remote_addr = entry->address ();
ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr *,remote_addr);
TAO_AV_SCTP_SEQ_Flow_Handler *handler = 0;
ACE_Multihomed_INET_Addr remote_multi_addr;
remote_multi_addr.set (inet_addr->get_port_number (),
inet_addr->get_ip_address (),
1,
0,
0);
ACE_Multihomed_INET_Addr local_addr; //This can be a multihomed address
ACE_INET_Addr *addr;
if (entry->get_peer_addr () != 0)
{
addr = ACE_dynamic_cast (ACE_INET_Addr *, entry->get_peer_addr ());
}
else
{
ACE_NEW_RETURN (addr,
ACE_INET_Addr ("0"),
-1);
}
ACE_UINT32 local_ip_addr [entry->num_peer_sec_addrs ()];
ACE_INET_Addr ip_addr;
char** addrs = entry->get_peer_sec_addr ();
for (int i = 0; i < entry->num_peer_sec_addrs (); i++)
{
ACE_CString addr_str (addrs[i]);
addr_str += ":";
ip_addr.set (addr_str.c_str ());
local_ip_addr [i] = ip_addr.get_ip_address ();
}
if (entry->num_peer_sec_addrs () != 0)
local_addr.set (addr->get_port_number (),
addr->get_ip_address (),
1,
local_ip_addr,
entry->num_peer_sec_addrs ());
else
local_addr.set (addr->get_port_number (),
addr->get_ip_address (),
1,
0,
entry->num_peer_sec_addrs ());
int result = this->connector_.connector_connect (handler,
remote_multi_addr,
local_addr);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_connector::connect failed\n"),-1);
entry->handler (handler);
transport = handler->transport ();
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
"Local Addrs\n"));
char buf [BUFSIZ];
size_t size = BUFSIZ;
ACE_INET_Addr *peer_addrs = 0;
ACE_NEW_RETURN (peer_addrs,ACE_INET_Addr [size], -1);
handler->peer ().get_local_addrs (peer_addrs, size);
for (unsigned int i=0; i < size;i++)
{
peer_addrs [i].addr_to_string (buf,
BUFSIZ);
ACE_DEBUG ((LM_DEBUG,
"%s %d\n",
buf,
size));
}
ACE_DEBUG ((LM_DEBUG,
"Remote Addrs\n"));
size = BUFSIZ;
handler->peer ().get_remote_addrs (peer_addrs, size);
for (unsigned int i=0; i < size;i++)
{
peer_addrs [i].addr_to_string (buf,
BUFSIZ);
ACE_DEBUG ((LM_DEBUG,
"%s %d\n",
buf,
size));
}
//delete peer_addrs;
}
return 0;
}
int
TAO_AV_SCTP_SEQ_Connector::close (void)
{
return 0;
}
//------------------------------------------------------------
// TAO_AV_SCTP_SEQ_Protocol_Factory
//------------------------------------------------------------
TAO_AV_SCTP_SEQ_Factory::TAO_AV_SCTP_SEQ_Factory (void)
{
}
TAO_AV_SCTP_SEQ_Factory::~TAO_AV_SCTP_SEQ_Factory (void)
{
}
int
TAO_AV_SCTP_SEQ_Factory::match_protocol (const char *protocol_string)
{
if (ACE_OS::strcasecmp (protocol_string,"SCTP_SEQ") == 0)
return 1;
return 0;
}
TAO_AV_Acceptor*
TAO_AV_SCTP_SEQ_Factory::make_acceptor (void)
{
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_SCTP_SEQ_Factory::make_acceptor\n"));
TAO_AV_Acceptor *acceptor = 0;
ACE_NEW_RETURN (acceptor,
TAO_AV_SCTP_SEQ_Acceptor,
0);
return acceptor;
}
TAO_AV_Connector*
TAO_AV_SCTP_SEQ_Factory::make_connector (void)
{
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_SCTP_SEQ_Factory::make_connector\n"));
TAO_AV_Connector *connector = 0;
ACE_NEW_RETURN (connector,
TAO_AV_SCTP_SEQ_Connector,
0);
return connector;
}
int
TAO_AV_SCTP_SEQ_Factory::init (int,
char *[])
{
return 0;
}
//------------------------------------------------------------
// TAO_AV_SCTP_SEQ_Object
//------------------------------------------------------------
int
TAO_AV_SCTP_SEQ_Object::handle_input (void)
{
int n = this->transport_->recv (this->frame_.rd_ptr (),
this->frame_.size ());
if (n == -1)
ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_Flow_Handler::handle_input recv failed\n"),-1);
if (n == 0)
ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_Flow_Handler::handle_input connection closed\n"),-1);
this->frame_.wr_ptr (this->frame_.rd_ptr () + n);
return this->callback_->receive_frame (&this->frame_);
}
int
TAO_AV_SCTP_SEQ_Object::send_frame (ACE_Message_Block *frame,
TAO_AV_frame_info * /*frame_info*/)
{
int result = this->transport_->send (frame);
return result;
}
int
TAO_AV_SCTP_SEQ_Object::send_frame (const iovec *iov,
int iovcnt,
TAO_AV_frame_info * /*frame_info*/)
{
return this->transport_->send (iov,iovcnt);
}
int
TAO_AV_SCTP_SEQ_Object::send_frame (const char*buf,
size_t len)
{
int result = this->transport_->send (buf, len, 0);
return result;
}
TAO_AV_SCTP_SEQ_Object::TAO_AV_SCTP_SEQ_Object (TAO_AV_Callback *callback,
TAO_AV_Transport *transport)
:TAO_AV_Protocol_Object (callback,transport)
{
// @@ Is this a good size?
this->frame_.size (BUFSIZ);
}
TAO_AV_SCTP_SEQ_Object::~TAO_AV_SCTP_SEQ_Object (void)
{
// No-op
}
int
TAO_AV_SCTP_SEQ_Object::destroy (void)
{
this->callback_->handle_destroy ();
delete this;
return 0;
}
//------------------------------------------------------------
// TAO_AV_SCTP_SEQ_Flow_Handler
//------------------------------------------------------------
TAO_AV_SCTP_SEQ_Flow_Handler::TAO_AV_SCTP_SEQ_Flow_Handler (TAO_AV_Callback * /*callback*/)
// :TAO_AV_Flow_Handler (callback)
{
ACE_NEW (this->transport_,
TAO_AV_SCTP_SEQ_Transport (this));
}
TAO_AV_SCTP_SEQ_Flow_Handler::~TAO_AV_SCTP_SEQ_Flow_Handler (void)
{
delete this->transport_;
}
TAO_AV_Transport *
TAO_AV_SCTP_SEQ_Flow_Handler::transport (void)
{
return this->transport_;
}
int
TAO_AV_SCTP_SEQ_Flow_Handler::change_qos (AVStreams::QoS qos)
{
if( TAO_debug_level > 0 )
{
ACE_DEBUG ((LM_DEBUG,
"(%N,%l) TAO_AV_SCTP_SEQ_Flow_Handler::change_qos\n"));
}
unsigned int i=0;
int ret = 0;
CORBA::Long dscp = 0;
CORBA::Long ecn = 0;
int dscp_flag=0;
for(i=0; i < qos.QoSParams.length(); i++)
{
if( ACE_OS::strcmp( qos.QoSParams[i].property_name.in(), "Diffserv_Codepoint") == 0)
{
qos.QoSParams[i].property_value >>= dscp;
ACE_DEBUG ((LM_DEBUG,
"DSCP %d\n",
dscp));
dscp_flag=1;
// DSCP value can only be 6 bits wide
if(!((dscp >= 0) && (dscp <= 63)))
{
dscp_flag = 0;
ACE_DEBUG((LM_DEBUG, "(%N,%l) ECN value can only be (0-3) not %d\n", ecn));
return -1;
}
}
if( ACE_OS::strcmp( qos.QoSParams[i].property_name.in(), "ECN") == 0)
{
qos.QoSParams[i].property_value >>= ecn;
// ECN value can only occupy bits 6 and 7 of the
// IP Diffserv byte
if(!((ecn >= 0) && (ecn <= 3)))
{
ACE_DEBUG((LM_DEBUG, "(%N,%l) ECN value can only be (0-3) not %d\n", ecn));
ecn = 0;
}
}
}
// Set the Diffserv byte only if we specified
// the Diffserv Codepoint (DSCP) or ECN via QoSParams
// passed into this method
if(dscp_flag || ecn)
{
int tos;
tos = (int)(dscp << 2);
if(ecn)
{
tos |= ecn;
}
ret = this->peer ().set_option(IPPROTO_IP, IP_TOS, (int *)&tos , (int)sizeof(tos));
if(TAO_debug_level > 0)
{
ACE_DEBUG((LM_DEBUG, "(%N,%l) set tos: ret: %d %d\n", tos, ret));
}
}
if(TAO_debug_level > 0)
{
if(ret < 0 )
{
ACE_DEBUG((LM_DEBUG, "(%N,%l) errno: %p\n"));
}
}
return ret;
}
int
TAO_AV_SCTP_SEQ_Flow_Handler::open (void * /*arg*/)
{
ACE_CDR::Long nodelay = 1;
#if defined (SCTP_NODELAY)
if (this->peer ().set_option (IPPROTO_SCTP,
SCTP_NODELAY,
(void *) &nodelay,
sizeof (nodelay)) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"NODELAY failed\n"),
-1);
#endif /* SCTP_NODELAY */
// Called by the <Strategy_Acceptor> when the handler is completely
// connected.
ACE_INET_Addr addr;
if (this->peer ().get_remote_addr (addr) == -1)
return -1;
char server[MAXHOSTNAMELEN + 16];
(void) addr.addr_to_string (server, sizeof (server));
if (TAO_debug_level > 0)
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
"(%P|%t) connection to server <%s> on %d\n",
server, this->peer ().get_handle ()));
this->peer ().disable (ACE_NONBLOCK);
// Register the handler with the reactor.
if (this->reactor ()
&& this->reactor ()->register_handler
(this,
ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p\n"),
ACE_TEXT ("unable to register client handler")),
-1);
return 0;
}
int
TAO_AV_SCTP_SEQ_Flow_Handler::handle_input (ACE_HANDLE /*fd*/)
{
return this->protocol_object_->handle_input ();
}
int
TAO_AV_SCTP_SEQ_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
const void *arg)
{
return TAO_AV_Flow_Handler::handle_timeout (tv,arg);
}
//------------------------------------------------------------
// TAO_AV_SCTP_SEQ_Flow_Factory
//------------------------------------------------------------
TAO_AV_SCTP_SEQ_Flow_Factory::TAO_AV_SCTP_SEQ_Flow_Factory (void)
{
}
TAO_AV_SCTP_SEQ_Flow_Factory::~TAO_AV_SCTP_SEQ_Flow_Factory (void)
{
}
int
TAO_AV_SCTP_SEQ_Flow_Factory::init (int /* argc */,
char * /* argv */ [])
{
return 0;
}
int
TAO_AV_SCTP_SEQ_Flow_Factory::match_protocol (const char *flow_string)
{
if (ACE_OS::strcasecmp (flow_string,"SCTP_SEQ") == 0)
return 1;
return 0;
}
TAO_AV_Protocol_Object*
TAO_AV_SCTP_SEQ_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
TAO_Base_StreamEndPoint *endpoint,
TAO_AV_Flow_Handler *handler,
TAO_AV_Transport *transport)
{
TAO_AV_Callback *callback = 0;
if( endpoint->get_callback (entry->flowname (), callback) ) {
ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) Invalid callback\n"), 0);
}
TAO_AV_SCTP_SEQ_Object *object = 0;
ACE_NEW_RETURN (object,
TAO_AV_SCTP_SEQ_Object (callback,
transport),
0);
callback->open (object,
handler);
endpoint->set_protocol_object (entry->flowname (),
object);
return object;
}
ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_SCTP_SEQ_Flow_Factory)
ACE_STATIC_SVC_DEFINE (TAO_AV_SCTP_SEQ_Flow_Factory,
ACE_TEXT ("SCTP_SEQ_Flow_Factory"),
ACE_SVC_OBJ_T,
&ACE_SVC_NAME (TAO_AV_SCTP_SEQ_Flow_Factory),
ACE_Service_Type::DELETE_THIS |
ACE_Service_Type::DELETE_OBJ,
0)
ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_SCTP_SEQ_Factory)
ACE_STATIC_SVC_DEFINE (TAO_AV_SCTP_SEQ_Factory,
ACE_TEXT ("SCTP_SEQ_Factory"),
ACE_SVC_OBJ_T,
&ACE_SVC_NAME (TAO_AV_SCTP_SEQ_Factory),
ACE_Service_Type::DELETE_THIS |
ACE_Service_Type::DELETE_OBJ,
0)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -