connection_manager.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 726 行 · 第 1/2 页
CPP
726 行
//Connection_Manager.cpp,v 1.13 2003/09/09 23:27:29 fields_t Exp
#include "Connection_Manager.h"
Connection_Manager::Connection_Manager (void)
{
}
Connection_Manager::~Connection_Manager (void)
{
}
void
Connection_Manager::load_ep_addr (const char* file_name)
{
FILE* addr_file = ACE_OS::fopen (file_name, "r");
if (addr_file == 0)
{
ACE_ERROR ((LM_DEBUG,
"Cannot open addr file %s\n",
file_name));
return;
}
else
ACE_DEBUG ((LM_DEBUG,
"Addr file opened successfully\n"));
while (1)
{
char buf [BUFSIZ];
// Read from the file into a buffer
/*
int n = ACE_OS::fread (buf,
1,
BUFSIZ,
addr_file);
*/
if ((ACE_OS::fgets (buf,BUFSIZ,addr_file)) == NULL)
{
// At end of file break the loop and end the sender.
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,"End of Addr file\n"));
break;
}
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"%s\n",
buf));
Endpoint_Addresses* addr;
ACE_NEW (addr,
Endpoint_Addresses);
TAO_Tokenizer addr_tokenizer (buf,'/');
ACE_CString flowname;
if (addr_tokenizer [0] == 0)
{
ACE_ERROR ((LM_ERROR,
"Corresponding flow name not specified for endpoint addresses\n"));
return;
}
else
flowname += addr_tokenizer [0];
if (addr_tokenizer [1] != 0)
{
ACE_CString token (addr_tokenizer [1]);
int pos = token.find ('\r');
if (pos != ACE_CString::npos)
{
addr->sender_addr = CORBA::string_dup ((token.substr (0, pos)).c_str ());
}
else addr->sender_addr = CORBA::string_dup (token.c_str());
pos = addr->sender_addr.find ('\n');
if (pos != ACE_CString::npos)
{
addr->sender_addr = (addr->sender_addr.substr (0, pos)).c_str ();
}
}
if (addr_tokenizer [2] != 0)
{
ACE_CString token (addr_tokenizer [2]);
int pos = token.find ('\r');
if (pos != ACE_CString::npos)
{
addr->receiver_addr = CORBA::string_dup ((token.substr (0, pos)).c_str ());
}
else addr->receiver_addr = CORBA::string_dup (token.c_str());
pos = addr->receiver_addr.find ('\n');
if (pos != ACE_CString::npos)
{
addr->receiver_addr = (addr->receiver_addr.substr (0, pos)).c_str ();
}
}
int result = ep_addr_.bind (flowname,
addr);
if (result == 0)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"Flowname %s Bound Successfully\n",
flowname.c_str ()));
}
else if (result == 1)
ACE_DEBUG ((LM_DEBUG,
"Flowname %s already exists\n",
flowname.c_str ()));
else ACE_DEBUG ((LM_DEBUG,
"Flowname %s Bound Failed\n",
flowname.c_str ()));
}
}
int
Connection_Manager::init (CORBA::ORB_ptr orb)
{
// Initialize the naming service
if (this->naming_client_.init (orb) != 0)
ACE_ERROR_RETURN ((LM_ERROR,
" (%P|%t) Unable to initialize "
"the TAO_Naming_Client. \n"),
-1);
return 0;
}
void
Connection_Manager::bind_to_receivers (const ACE_CString &sender_name,
AVStreams::MMDevice_ptr sender
ACE_ENV_ARG_DECL)
{
this->sender_name_ =
sender_name;
/*
this->sender_ =
AVStreams::MMDevice::_duplicate (sender);
*/
CosNaming::Name name (1);
name.length (1);
ACE_TRY
{
// Try binding the sender context in the NS
name [0].id =
CORBA::string_dup (this->sender_name_.c_str ());
this->sender_context_ =
this->naming_client_->bind_new_context (name
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
//
// We reach here if there was no exception raised in
// <bind_new_context>. We then create a receiver context.
//
// Create the context for storing the receivers
name [0].id =
CORBA::string_dup ("Receivers");
// Try binding the receivers context under the sender context.
this->receiver_context_ =
this->sender_context_->bind_new_context (name
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCH (CosNaming::NamingContext::AlreadyBound, al_ex)
{
//
// The sender context already exists, probably created by the
// receiver(s).
//
// Get the sender context.
name [0].id =
CORBA::string_dup (this->sender_name_.c_str ());
CORBA::Object_var object =
this->naming_client_->resolve (name
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->sender_context_ =
CosNaming::NamingContext::_narrow (object.in ());
// Find the Receiver context.
name [0].id =
CORBA::string_dup ("Receivers");
object =
this->sender_context_->resolve (name
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->receiver_context_ =
CosNaming::NamingContext::_narrow (object.in ());
this->find_receivers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
ACE_ENDTRY;
ACE_CHECK;
name [0].id =
CORBA::string_dup (this->sender_name_.c_str ());
// Register the sender object with the sender context.
this->sender_context_->rebind (name,
sender
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
Connection_Manager::find_receivers (ACE_ENV_SINGLE_ARG_DECL)
{
CosNaming::BindingIterator_var iterator;
CosNaming::BindingList_var binding_list;
const CORBA::ULong chunk = 100;
// Get the list of receivers registered for this sender.
this->receiver_context_->list (chunk,
binding_list,
iterator
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
// Add the receivers found in the bindinglist to the <receivers>.
this->add_to_receivers (binding_list
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
if (!CORBA::is_nil (iterator.in ()))
{
CORBA::Boolean more = 1;
// Check to see if there are more receivers listed.
while (more)
{
more = iterator->next_n (chunk,
binding_list
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->add_to_receivers (binding_list
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
}
void
Connection_Manager::add_to_receivers (CosNaming::BindingList &binding_list
ACE_ENV_ARG_DECL)
{
for (CORBA::ULong i = 0;
i < binding_list.length ();
i++)
{
// Get the receiver name from the binding list.
ACE_CString receiver_name =
binding_list [i].binding_name [0].id.in ();
CosNaming::Name name (1);
name.length (1);
name [0].id =
CORBA::string_dup (receiver_name.c_str ());
// Resolve the reference of the receiver from the receiver
// context.
CORBA::Object_var obj =
this->receiver_context_->resolve (name
ACE_ENV_ARG_PARAMETER);
AVStreams::MMDevice_var receiver_device =
AVStreams::MMDevice::_narrow (obj.in ());
// Add this receiver to the receiver map.
ACE_CString flowname =
this->sender_name_ +
"_" +
receiver_name;
this->receivers_.bind (flowname,
receiver_device);
}
}
void
Connection_Manager::connect_to_receivers (AVStreams::MMDevice_ptr sender
ACE_ENV_ARG_DECL)
{
// Connect to all receivers that we know about.
for (Receivers::iterator iterator = this->receivers_.begin ();
iterator != this->receivers_.end ();
++iterator)
{
// Initialize the QoS
AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
ACE_CString flowname =
(*iterator).ext_id_;
Endpoint_Addresses* addr = 0;
ep_addr_.find (flowname,
addr);
ACE_CString sender_addr_str;
ACE_CString receiver_addr_str;
if (addr != 0)
{
sender_addr_str = addr->sender_addr;
receiver_addr_str = addr->receiver_addr;
ACE_DEBUG ((LM_DEBUG,
"Address Strings %s %s\n",
sender_addr_str.c_str (),
receiver_addr_str.c_str ()));
}
else ACE_DEBUG ((LM_DEBUG,
"No endpoint address for flowname %s\n",
flowname.c_str ()));
ACE_INET_Addr receiver_addr (receiver_addr_str.c_str ());
ACE_INET_Addr sender_addr (sender_addr_str.c_str ());
// Create the forward flow specification to describe the flow.
TAO_Forward_FlowSpec_Entry sender_entry (flowname.c_str (),
"IN",
"USER_DEFINED",
"",
"UDP",
&sender_addr);
sender_entry.set_peer_addr (&receiver_addr);
// Set the flow specification for the stream between receiver
// and distributer
AVStreams::flowSpec flow_spec (1);
flow_spec.length (1);
flow_spec [0] =
CORBA::string_dup (sender_entry.entry_to_string ());
if (TAO_debug_level > 0)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?