📄 sfp.cpp
字号:
{
return 0;
}
ACE_Message_Block*
TAO_SFP_Object::get_fragment (ACE_Message_Block *&mb,
size_t initial_len,
size_t &last_mb_orig_len,
size_t &last_mb_current_len)
{
ACE_Message_Block *fragment_mb = 0,*temp_mb = 0;
size_t prev_len,last_len = 0;
size_t current_len = 0;
size_t message_len = initial_len;
while (mb != 0)
{
prev_len = message_len;
message_len += mb->length ();
if (fragment_mb == 0)
fragment_mb = temp_mb = mb->duplicate ();
if (message_len > TAO_SFP_MAX_PACKET_SIZE)
{
// get only the length that we can accomodate.
current_len = TAO_SFP_MAX_PACKET_SIZE - prev_len;
if (current_len < mb->length ())
{
// The above condition is an assertion.
message_len += (current_len-mb->length ());
last_len = mb->length ();
mb->length (current_len);
temp_mb->length (current_len);
}
break;
}
else
{
// we can accomodate this message block
message_len += mb->length ();
mb = mb->cont ();
temp_mb = temp_mb->cont ();
}
}
last_mb_orig_len = last_len;
last_mb_current_len = current_len;
return fragment_mb;
}
int
TAO_SFP_Object::set_policies (const TAO_AV_PolicyList& policies)
{
TAO_AV_Policy *policy = 0;
for (u_int i=0;i<policies.length ();i++)
{
policy = policies[i];
switch (policies[i]->type ())
{
case TAO_AV_SFP_CREDIT_POLICY:
{
TAO_AV_SFP_Credit_Policy *credit_policy =
ACE_reinterpret_cast (TAO_AV_SFP_Credit_Policy*,policy);
this->max_credit_ = credit_policy->value ();
}
default:
break;
}
}
return 0;
}
// TAO_SFP_Consumer_Object
TAO_SFP_Consumer_Object::TAO_SFP_Consumer_Object (TAO_AV_Callback *callback,
TAO_AV_Transport *transport,
ACE_CString& sfp_options)
:TAO_SFP_Object (callback,transport)
{
TAO_AV_PolicyList policies = callback->get_policies ();
if (policies.length () == 0)
return;
this->set_policies (policies);
if (this->max_credit_ > 0)
{
sfp_options = "sfp:1.0:credit=";
char buf[10];
ACE_OS::sprintf(buf, "%d", this->max_credit_);
sfp_options += buf;
}
}
int
TAO_SFP_Consumer_Object::handle_input (void)
{
if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_SFP_Consumer_Object::handle_input\n"));
// This is the entry point for receiving data.
TAO_AV_frame_info *frame_info = 0;
int result = TAO_SFP_Base::handle_input (this->transport_,
this->state_,
frame_info);
if (result < 0)
ACE_ERROR_RETURN ((LM_ERROR,"ERROR in TAO_SFP_Consumer_Object::handle_input"),result);
if (this->state_.frame_header_.message_type == flowProtocol::EndofStream_Msg)
this->callback_->handle_destroy ();
if (this->state_.is_complete ())
{
this->callback_->receive_frame (this->state_.frame_block_,
frame_info);
// Now release the memory for the frame.
if (this->state_.frame_block_ != &this->state_.static_frame_)
{
ACE_Message_Block *temp = 0;
for (temp = this->state_.frame_block_;
temp != 0;
temp = temp->cont ())
{
temp->release ();
delete temp;
}
}
this->state_.reset ();
}
return 0;
}
TAO_SFP_Producer_Object::TAO_SFP_Producer_Object (TAO_AV_Callback *callback,
TAO_AV_Transport *transport,
const char *sfp_options)
:TAO_SFP_Object (callback,transport),
credit_sequence_num_ (0)
{
TAO_Tokenizer flow_string (sfp_options,':');
if (flow_string [2] != 0)
{
TAO_Tokenizer options (flow_string[2],'=');
if (options [1] != 0)
this->max_credit_ = ACE_OS::atoi (options[1]);
}
}
int
TAO_SFP_Producer_Object::handle_input (void)
{
// A producer can only receive credit messages.
int result;
flowProtocol::MsgType msg_type = flowProtocol::Start_Msg;
result = TAO_SFP_Base::peek_message_type (this->transport_,
msg_type);
if (result < 0)
return result;
switch (msg_type)
{
case flowProtocol::Credit_Msg:
{
flowProtocol::credit credit;
result = TAO_SFP_Base::read_credit_message (this->transport_,
credit,
this->state_.cdr);
if (result < 0)
return result;
if (!this->credit_sequence_num_)
this->credit_sequence_num_ = credit.cred_num;
else
{
// check that the sequence number is above the last sequence number
// else its a duplicate credit message so we can ignore it.
if (credit.cred_num <= this->credit_sequence_num_)
return 0;
else // Update our credit now.
this->current_credit_ = this->max_credit_;
}
}
break;
default:
{
ACE_Message_Block mb (2*this->transport_->mtu ());
// Ignore the rest of the message by just reading.
this->transport_->recv (mb.rd_ptr (),
mb.size ());
break;
}
}
return 0;
}
// TAO_AV_SFP_Factory
TAO_AV_SFP_Factory::TAO_AV_SFP_Factory (void)
{
}
TAO_AV_SFP_Factory::~TAO_AV_SFP_Factory (void)
{
}
// Initialization hook from service configurator.
int
TAO_AV_SFP_Factory::init (int /*argc*/, char ** /*argv*/)
{
return 0;
}
int
TAO_AV_SFP_Factory::match_protocol (const char *flow_string)
{
if (ACE_OS::strncasecmp (flow_string,"sfp",3) == 0)
return 1;
return 0;
}
TAO_AV_Protocol_Object*
TAO_AV_SFP_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
TAO_Base_StreamEndPoint *endpoint,
TAO_AV_Flow_Handler *handler,
TAO_AV_Transport *transport)
{
TAO_AV_Protocol_Object *object = 0;
TAO_AV_Callback *callback = 0;
endpoint->get_callback (entry->flowname (),
callback);
ACE_CString flow_string( entry->flow_protocol_str () );
switch (entry->role ())
{
case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
{
ACE_NEW_RETURN (object,
TAO_SFP_Producer_Object (callback,
transport,
flow_string.c_str() ),
0);
}
break;
case TAO_FlowSpec_Entry::TAO_AV_CONSUMER:
{
ACE_NEW_RETURN (object,
TAO_SFP_Consumer_Object (callback,
transport,
flow_string),
0);
entry->flow_protocol_str( flow_string.c_str() );
}
break;
case TAO_FlowSpec_Entry::TAO_AV_INVALID_ROLE:
return 0;
}
callback->open (object,
handler);
endpoint->set_protocol_object (entry->flowname (),
object);
return object;
}
//------------------------------------------------------------
// TAO_SFP_Frame_State
//------------------------------------------------------------
TAO_SFP_Frame_State::TAO_SFP_Frame_State (void)
:cdr (new ACE_Data_Block (ACE_CDR::DEFAULT_BUFSIZE,
ACE_Message_Block::MB_DATA,
0,
0,
0,
0,
0),
0,
TAO_ENCAP_BYTE_ORDER),
more_fragments_ (0),
frame_block_ (0)
{
}
CORBA::Boolean
TAO_SFP_Frame_State::is_complete (void)
{
return (!this->more_fragments_) && (this->frame_block_);
}
int
TAO_SFP_Frame_State::reset (void)
{
this->frame_block_ = 0;
return 0;
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_DNode<TAO_SFP_Fragment_Node>;
template class ACE_Ordered_MultiSet<TAO_SFP_Fragment_Node>;
template class ACE_Ordered_MultiSet_Iterator<TAO_SFP_Fragment_Node>;
template class ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex>;
template class ACE_Hash_Map_Manager_Ex<CORBA::ULong, TAO_SFP_Fragment_Table_Entry *, ACE_Hash<CORBA::ULong>, ACE_Equal_To<CORBA::ULong>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Entry<CORBA::ULong, TAO_SFP_Fragment_Table_Entry *>;
template class ACE_Hash_Map_Iterator_Base_Ex<CORBA::ULong, TAO_SFP_Fragment_Table_Entry *, ACE_Hash<CORBA::ULong>, ACE_Equal_To<CORBA::ULong>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Reverse_Iterator_Ex<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Hash<CORBA::ULong>,ACE_Equal_To<CORBA::ULong>,ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator_Ex<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Hash<CORBA::ULong>,ACE_Equal_To<CORBA::ULong>,ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator<CORBA::ULong,TAO_SFP_Fragment_Table_Entry *,ACE_Null_Mutex>;
template class ACE_Hash_Map_Reverse_Iterator<CORBA::ULong,TAO_SFP_Fragment_Table_Entry *,ACE_Null_Mutex>;
template class ACE_Hash_Map_Manager<CORBA::ULong,ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex>*,ACE_Null_Mutex>;
template class ACE_Hash_Map_Manager_Ex<CORBA::ULong, ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex> *, ACE_Hash<CORBA::ULong>, ACE_Equal_To<CORBA::ULong>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Entry<CORBA::ULong, ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex> *>;
template class ACE_Hash_Map_Iterator_Base_Ex<CORBA::ULong, ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex> *, ACE_Hash<CORBA::ULong>, ACE_Equal_To<CORBA::ULong>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Reverse_Iterator_Ex<CORBA::ULong,ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex>*,ACE_Hash<CORBA::ULong>,ACE_Equal_To<CORBA::ULong>,ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator_Ex<CORBA::ULong,ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex>*,ACE_Hash<CORBA::ULong>,ACE_Equal_To<CORBA::ULong>,ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator<CORBA::ULong,ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex> *,ACE_Null_Mutex>;
template class ACE_Hash_Map_Reverse_Iterator<CORBA::ULong,ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex> *,ACE_Null_Mutex>;
template class ACE_Singleton<TAO_SFP_Base, TAO_SYNCH_MUTEX>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_DNode<TAO_SFP_Fragment_Node>
#pragma instantiate ACE_Ordered_MultiSet<TAO_SFP_Fragment_Node>
#pragma instantiate ACE_Ordered_MultiSet_Iterator<TAO_SFP_Fragment_Node>
#pragma instantiate ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Manager_Ex<CORBA::ULong, TAO_SFP_Fragment_Table_Entry *, ACE_Hash<CORBA::ULong>, ACE_Equal_To<CORBA::ULong>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Entry<CORBA::ULong, TAO_SFP_Fragment_Table_Entry *>
#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<CORBA::ULong, TAO_SFP_Fragment_Table_Entry *, ACE_Hash<CORBA::ULong>, ACE_Equal_To<CORBA::ULong>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Hash<CORBA::ULong>,ACE_Equal_To<CORBA::ULong>,ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator_Ex<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Hash<CORBA::ULong>,ACE_Equal_To<CORBA::ULong>,ACE_Null_Mutex>
#pragma instantiate ACE_Equal_To<CORBA::ULong>
#pragma instantiate ACE_Hash_Map_Iterator<CORBA::ULong,TAO_SFP_Fragment_Table_Entry *,ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Reverse_Iterator<CORBA::ULong,TAO_SFP_Fragment_Table_Entry *,ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Manager<CORBA::ULong,ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex>*,ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Manager_Ex<CORBA::ULong, ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex> *, ACE_Hash<CORBA::ULong>, ACE_Equal_To<CORBA::ULong>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Entry<CORBA::ULong, ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex> *>
#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<CORBA::ULong, ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex> *, ACE_Hash<CORBA::ULong>, ACE_Equal_To<CORBA::ULong>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<CORBA::ULong,ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex>*,ACE_Hash<CORBA::ULong>,ACE_Equal_To<CORBA::ULong>,ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator_Ex<CORBA::ULong,ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex>*,ACE_Hash<CORBA::ULong>,ACE_Equal_To<CORBA::ULong>,ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator<CORBA::ULong,ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex> *,ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Reverse_Iterator<CORBA::ULong,ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex> *,ACE_Null_Mutex>
#pragma instantiate ACE_Singleton<TAO_SFP_Base, TAO_SYNCH_MUTEX>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_SFP_Factory)
ACE_STATIC_SVC_DEFINE (TAO_AV_SFP_Factory,
ACE_TEXT ("SFP_Factory"),
ACE_SVC_OBJ_T,
&ACE_SVC_NAME (TAO_AV_SFP_Factory),
ACE_Service_Type::DELETE_THIS |
ACE_Service_Type::DELETE_OBJ,
0)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -