connection_manager.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 726 行 · 第 1/2 页

CPP
726
字号
//Connection_Manager.cpp,v 1.13 2003/09/09 23:27:29 fields_t Exp

#include "Connection_Manager.h"

Connection_Manager::Connection_Manager (void)
{
}

Connection_Manager::~Connection_Manager (void)
{
}

void
Connection_Manager::load_ep_addr (const char* file_name)
{
  FILE* addr_file = ACE_OS::fopen (file_name, "r");
  
  if (addr_file == 0)
    {
      ACE_ERROR ((LM_DEBUG,
		  "Cannot open addr file %s\n",
		  file_name));
      return;
    }
  else
    ACE_DEBUG ((LM_DEBUG,
                "Addr file opened successfully\n"));

  while (1)
    {
      char buf [BUFSIZ];
      
      // Read from the file into a buffer

      
      /*
      int n = ACE_OS::fread (buf,
			     1,
			     BUFSIZ,
			     addr_file);
      */

      if ((ACE_OS::fgets (buf,BUFSIZ,addr_file)) == NULL)
	{
	  // At end of file break the loop and end the sender.
	  if (TAO_debug_level > 0)
	    ACE_DEBUG ((LM_DEBUG,"End of Addr file\n"));
	  break;
	}
	

      if (TAO_debug_level > 0)
	ACE_DEBUG ((LM_DEBUG,
		    "%s\n",
		    buf));

      Endpoint_Addresses* addr;
      ACE_NEW (addr,
	       Endpoint_Addresses);

      TAO_Tokenizer addr_tokenizer (buf,'/');

      ACE_CString flowname;

      if (addr_tokenizer [0] == 0)
	{
	  ACE_ERROR ((LM_ERROR,
		      "Corresponding flow name not specified for endpoint addresses\n"));
	  return;
	}
      else
	flowname += addr_tokenizer [0];
      
      if (addr_tokenizer [1] != 0)
	{
	  ACE_CString token (addr_tokenizer [1]);

	  int pos = token.find ('\r');
	  if (pos != ACE_CString::npos)
	    {
	      addr->sender_addr = CORBA::string_dup ((token.substr (0, pos)).c_str ());
	    }
	  else addr->sender_addr = CORBA::string_dup (token.c_str());

	  pos = addr->sender_addr.find ('\n');
	  if (pos != ACE_CString::npos)
	    {
	      addr->sender_addr = (addr->sender_addr.substr (0, pos)).c_str ();
	    }
	}

      if (addr_tokenizer [2] != 0)
	{
	  ACE_CString token (addr_tokenizer [2]);

	  int pos = token.find ('\r');
	  if (pos != ACE_CString::npos)
	    {
	      addr->receiver_addr = CORBA::string_dup ((token.substr (0, pos)).c_str ());
	    }
	  else addr->receiver_addr = CORBA::string_dup (token.c_str());

	  pos = addr->receiver_addr.find ('\n');
	  if (pos != ACE_CString::npos)
	    {
	      addr->receiver_addr = (addr->receiver_addr.substr (0, pos)).c_str ();
	    }
	}
      
      int result = ep_addr_.bind (flowname,
				  addr);
      if (result == 0)
	{
	if (TAO_debug_level > 0)
	  ACE_DEBUG ((LM_DEBUG,
		      "Flowname %s Bound Successfully\n",
		      flowname.c_str ()));
	}
      else if (result == 1)
	ACE_DEBUG ((LM_DEBUG,
		    "Flowname %s already exists\n",
		    flowname.c_str ()));
      else ACE_DEBUG ((LM_DEBUG,
		       "Flowname %s Bound Failed\n",
		       flowname.c_str ()));

      
    }

}

int
Connection_Manager::init (CORBA::ORB_ptr orb)
{
  // Initialize the naming service
  if (this->naming_client_.init (orb) != 0)
    ACE_ERROR_RETURN ((LM_ERROR,
                       " (%P|%t) Unable to initialize "
                       "the TAO_Naming_Client. \n"),
                      -1);
  return 0;
}

void
Connection_Manager::bind_to_receivers (const ACE_CString &sender_name,
                                       AVStreams::MMDevice_ptr sender
                                       ACE_ENV_ARG_DECL)
{
  this->sender_name_ =
    sender_name;

  /* 
  this->sender_ =
    AVStreams::MMDevice::_duplicate (sender);
  */

  CosNaming::Name name (1);
  name.length (1);

  ACE_TRY
    {
      // Try binding the sender context in the NS
      name [0].id =
        CORBA::string_dup (this->sender_name_.c_str ());

      this->sender_context_ =
        this->naming_client_->bind_new_context (name
                                                ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      //
      // We reach here if there was no exception raised in
      // <bind_new_context>.  We then create a receiver context.
      //

      // Create the context for storing the receivers
      name [0].id =
        CORBA::string_dup ("Receivers");

      // Try binding the receivers context under the sender context.
      this->receiver_context_ =
        this->sender_context_->bind_new_context (name
                                                 ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCH (CosNaming::NamingContext::AlreadyBound, al_ex)
    {
      //
      // The sender context already exists, probably created by the
      // receiver(s).
      //

      // Get the sender context.
      name [0].id =
        CORBA::string_dup (this->sender_name_.c_str ());

      CORBA::Object_var object =
        this->naming_client_->resolve (name
                                       ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      this->sender_context_ =
        CosNaming::NamingContext::_narrow (object.in ());

      // Find the Receiver context.
      name [0].id =
        CORBA::string_dup ("Receivers");

      object =
        this->sender_context_->resolve (name
                                        ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      this->receiver_context_ =
        CosNaming::NamingContext::_narrow (object.in ());

      this->find_receivers (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;
    }
  ACE_ENDTRY;
  ACE_CHECK;

  name [0].id =
    CORBA::string_dup (this->sender_name_.c_str ());

  // Register the sender object with the sender context.
  this->sender_context_->rebind (name,
                                 sender
                                 ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
Connection_Manager::find_receivers (ACE_ENV_SINGLE_ARG_DECL)
{
  CosNaming::BindingIterator_var iterator;
  CosNaming::BindingList_var binding_list;
  const CORBA::ULong chunk = 100;

  // Get the list of receivers registered for this sender.
  this->receiver_context_->list (chunk,
                                 binding_list,
                                 iterator
                                 ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  // Add the receivers found in the bindinglist to the <receivers>.
  this->add_to_receivers (binding_list
                          ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  if (!CORBA::is_nil (iterator.in ()))
    {
      CORBA::Boolean more = 1;

      // Check to see if there are more receivers listed.
      while (more)
        {
          more = iterator->next_n (chunk,
                                   binding_list
                                   ACE_ENV_ARG_PARAMETER);
          ACE_CHECK;

          this->add_to_receivers (binding_list
                                  ACE_ENV_ARG_PARAMETER);
          ACE_CHECK;
        }
    }
}

void
Connection_Manager::add_to_receivers (CosNaming::BindingList &binding_list
                                      ACE_ENV_ARG_DECL)
{
  for (CORBA::ULong i = 0;
       i < binding_list.length ();
       i++)
    {
      // Get the receiver name from the binding list.
      ACE_CString receiver_name =
        binding_list [i].binding_name [0].id.in ();

      CosNaming::Name name (1);
      name.length (1);
      name [0].id =
        CORBA::string_dup (receiver_name.c_str ());

      // Resolve the reference of the receiver from the receiver
      // context.
      CORBA::Object_var obj =
        this->receiver_context_->resolve (name
                                          ACE_ENV_ARG_PARAMETER);

      AVStreams::MMDevice_var receiver_device =
        AVStreams::MMDevice::_narrow (obj.in ());

      // Add this receiver to the receiver map.
      ACE_CString flowname =
        this->sender_name_ +
        "_" +
        receiver_name;
      this->receivers_.bind (flowname,
                             receiver_device);
    }
}

void
Connection_Manager::connect_to_receivers (AVStreams::MMDevice_ptr sender
					  ACE_ENV_ARG_DECL)
{
  // Connect to all receivers that we know about.
  for (Receivers::iterator iterator = this->receivers_.begin ();
       iterator != this->receivers_.end ();
       ++iterator)
    {
      // Initialize the QoS
      AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);

      ACE_CString flowname =
        (*iterator).ext_id_;

      Endpoint_Addresses* addr = 0;
      ep_addr_.find (flowname,
		     addr);

      ACE_CString sender_addr_str;
      ACE_CString receiver_addr_str;

      if (addr != 0)
	{
	  sender_addr_str = addr->sender_addr;
	  receiver_addr_str = addr->receiver_addr;
	  ACE_DEBUG ((LM_DEBUG,
		      "Address Strings %s %s\n",
		      sender_addr_str.c_str (),
		      receiver_addr_str.c_str ()));
	  
	}
      else ACE_DEBUG ((LM_DEBUG,
		       "No endpoint address for flowname %s\n",
		       flowname.c_str ()));
      
      ACE_INET_Addr receiver_addr (receiver_addr_str.c_str ());      
      ACE_INET_Addr sender_addr (sender_addr_str.c_str ());

      // Create the forward flow specification to describe the flow.
      TAO_Forward_FlowSpec_Entry sender_entry (flowname.c_str (),
                                               "IN",
                                               "USER_DEFINED",
                                               "",
                                               "UDP",
                                               &sender_addr);

      sender_entry.set_peer_addr (&receiver_addr);

      // Set the flow specification for the stream between receiver
      // and distributer
      AVStreams::flowSpec flow_spec (1);
      flow_spec.length (1);
      flow_spec [0] =
        CORBA::string_dup (sender_entry.entry_to_string ());

      if (TAO_debug_level > 0)

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?