connection_manager.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 726 行 · 第 1/2 页

CPP
726
字号
	ACE_DEBUG ((LM_DEBUG,
		    "Connection_Manager::connect_to_receivers Flow Spec Entry %s\n",
		    sender_entry.entry_to_string ()));

      // Create the stream control for this stream.
      TAO_StreamCtrl *streamctrl;
      ACE_NEW (streamctrl,
               TAO_StreamCtrl);

      // Servant Reference Counting to manage lifetime
      PortableServer::ServantBase_var safe_streamctrl =
        streamctrl;

      // Register streamctrl.
      AVStreams::StreamCtrl_var streamctrl_object =
        streamctrl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;

      // Bind the flowname and the corresponding stream controller to
      // the stream controller map
      this->streamctrls_.bind (flowname,
                               streamctrl_object);

      // Bind the sender and receiver MMDevices.
      (void) streamctrl->bind_devs (sender,
                                    (*iterator).int_id_.in (),
                                    the_qos.inout (),
                                    flow_spec
                                    ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }
}

void
Connection_Manager::bind_to_sender (const ACE_CString &sender_name,
                                    const ACE_CString &receiver_name,
                                    AVStreams::MMDevice_ptr receiver
                                    ACE_ENV_ARG_DECL)
{
  this->sender_name_ =
    sender_name;

  this->receiver_name_ =
    receiver_name;

  this->receiver_ =
    AVStreams::MMDevice::_duplicate (receiver);

  CosNaming::Name name (1);
  name.length (1);

  int sender_context_exists = 0;

  ACE_TRY
    {
      // Try binding the sender context in the NS
      name [0].id =
        CORBA::string_dup (this->sender_name_.c_str ());

      CORBA::Object_var object =
        this->naming_client_->resolve (name
                                       ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      //
      // We reach here if there was no exception raised in <resolve>.
      // Therefore, there must be a valid sender context available.
      //
      sender_context_exists = 1;

      this->sender_context_ =
        CosNaming::NamingContext::_narrow (object.in ());

      name [0].id =
        CORBA::string_dup ("Receivers");

      // Find the receivers context under the sender's context
      object =
        this->sender_context_->resolve (name
                                        ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      this->receiver_context_ =
        CosNaming::NamingContext::_narrow (object.in ());
    }
  ACE_CATCH (CosNaming::NamingContext::NotFound, al_ex)
    {
      name [0].id =
        CORBA::string_dup (this->sender_name_.c_str ());

      // Create the sender context
      this->sender_context_ =
        this->naming_client_->bind_new_context (name
                                                ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      name [0].id =
        CORBA::string_dup ("Receivers");

      // Create the receivers context under the sender's context
      this->receiver_context_ =
        this->sender_context_->bind_new_context (name
                                                 ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }
  ACE_ENDTRY;
  ACE_CHECK;

  //
  // At this point we either have resolved the receiver context or we
  // have created a new one.
  //
  name [0].id =
    CORBA::string_dup (this->receiver_name_.c_str ());

  // Register this receiver object under the receiver context.
  this->receiver_context_->rebind (name,
                                   receiver
                                   ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  //
  // Check if the sender was registered.  Note that if we created the
  // sender context, there is no point in checking for the sender.
  //
  if (sender_context_exists)
    {
      ACE_TRY_EX(SENDER_CONTEXT_EXISTS)
        {
          // Try binding the sender under the sender context
          name [0].id =
            CORBA::string_dup (this->sender_name_.c_str ());

          CORBA::Object_var object =
            this->sender_context_->resolve (name
                                            ACE_ENV_ARG_PARAMETER);
          ACE_TRY_CHECK_EX(SENDER_CONTEXT_EXISTS);

          this->sender_ =
            AVStreams::MMDevice::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
          ACE_TRY_CHECK_EX(SENDER_CONTEXT_EXISTS);
        }
      ACE_CATCH (CosNaming::NamingContext::NotFound, al_ex)
        {
          // No problem if the sender was not there.
        }
      ACE_ENDTRY;
      ACE_CHECK;
    }
}

void
Connection_Manager::connect_to_sender (ACE_ENV_SINGLE_ARG_DECL)
{
  if (CORBA::is_nil (this->sender_.in ()))
    return;

  ACE_CString flowname =
    this->sender_name_ +
    "_" +
    this->receiver_name_;

  Endpoint_Addresses* addr = 0;
  ep_addr_.find (flowname,
		 addr);
  
  ACE_CString sender_addr_str;
  ACE_CString receiver_addr_str;
  
  if (addr != 0)
    {
      sender_addr_str = addr->sender_addr;
      receiver_addr_str = addr->receiver_addr;

      ACE_DEBUG ((LM_DEBUG,
		  "Address Strings %s %s\n",
		  sender_addr_str.c_str (),
		  receiver_addr_str.c_str ()));
    }

  ACE_INET_Addr receiver_addr (receiver_addr_str.c_str ());  
  ACE_INET_Addr sender_addr (sender_addr_str.c_str ());
  
  // Create the forward flow specification to describe the flow.
  TAO_Forward_FlowSpec_Entry sender_entry (flowname.c_str (),
                                           "IN",
                                           "USER_DEFINED",
                                           "",
                                           "UDP",
                                           &sender_addr);

  sender_entry.set_peer_addr (&receiver_addr);


  // Set the flow specification for the stream between sender and
  // receiver.
  AVStreams::flowSpec flow_spec (1);
  flow_spec.length (1);
  flow_spec [0] =
    CORBA::string_dup (sender_entry.entry_to_string ());
  
  if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG,
		"Connection_Manager::connect_to_sender Flow Spec Entry %s\n",
		sender_entry.entry_to_string ()));
  
  // Create the stream control for this stream
  TAO_StreamCtrl* streamctrl;
  ACE_NEW (streamctrl,
           TAO_StreamCtrl);

  // Servant Reference Counting to manage lifetime
  PortableServer::ServantBase_var safe_streamctrl =
    streamctrl;

  // Register streamctrl.
  AVStreams::StreamCtrl_var streamctrl_object =
    streamctrl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  //
  // Since senders terminate the streams, we don't need the streamctrl
  // for these.
  //
  // this->streamctrls_.bind (flowname,
  //                          streamctrl_object);

  // Initialize the  QoS
  AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);

  // Connect the sender and receiver devices.
  CORBA::Boolean result =
    streamctrl->bind_devs (this->sender_.in (),
                           this->receiver_.in (),
                           the_qos.inout (),
                           flow_spec
                           ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  if (result == 0)
    ACE_ERROR ((LM_ERROR,
                "Streamctrl::bind_devs failed\n"));

  // Start the data sending.
  AVStreams::flowSpec start_spec;
  streamctrl->start (start_spec
                     ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
Connection_Manager::add_streamctrl (const ACE_CString &flowname,
                                    TAO_StreamEndPoint *endpoint
                                    ACE_ENV_ARG_DECL)
{
  // Get the stream controller for this endpoint.
  CORBA::Any_var streamctrl_any =
    endpoint->get_property_value ("Related_StreamCtrl"
                                  ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  AVStreams::StreamCtrl_ptr streamctrl;

  if( streamctrl_any.in() >>= streamctrl )
  {
     // Any still owns the pointer, so we duplicate it
     AVStreams::StreamCtrl::_duplicate( streamctrl );
     this->streamctrls_.bind (flowname,
                             streamctrl);
  }
}

void
Connection_Manager::destroy (const ACE_CString &flowname
                             ACE_ENV_ARG_DECL_NOT_USED)
{
  this->protocol_objects_.unbind (flowname);
  this->receivers_.unbind (flowname);

  this->streamctrls_.unbind (flowname );

}

Connection_Manager::Receivers &
Connection_Manager::receivers (void)
{
  return this->receivers_;
}

Connection_Manager::Protocol_Objects &
Connection_Manager::protocol_objects (void)
{
  return this->protocol_objects_;
}

Connection_Manager::StreamCtrls &
Connection_Manager::streamctrls (void)
{
  return this->streamctrls_;
}

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)

template class ACE_Hash_Map_Entry<ACE_CString, AVStreams::MMDevice_var>;
template class ACE_Hash_Map_Manager<ACE_CString, AVStreams::MMDevice_var, ACE_Null_Mutex>;
template class ACE_Hash_Map_Manager_Ex<ACE_CString, AVStreams::MMDevice_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator_Base_Ex<ACE_CString, AVStreams::MMDevice_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator_Ex<ACE_CString, AVStreams::MMDevice_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Reverse_Iterator_Ex<ACE_CString, AVStreams::MMDevice_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;

template class ACE_Hash_Map_Entry<ACE_CString, Endpoint_Addresses*>;
template class ACE_Hash_Map_Manager<ACE_CString, Endpoint_Addresses*, ACE_Null_Mutex>;
template class ACE_Hash_Map_Manager_Ex<ACE_CString, Endpoint_Addresses*, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator_Base_Ex<ACE_CString, Endpoint_Addresses*, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator_Ex<ACE_CString, Endpoint_Addresses*, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Reverse_Iterator_Ex<ACE_CString, Endpoint_Addresses*, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;

template class ACE_Hash_Map_Entry<ACE_CString, TAO_AV_Protocol_Object *>;
template class ACE_Hash_Map_Manager<ACE_CString, TAO_AV_Protocol_Object *, ACE_Null_Mutex>;
template class ACE_Hash_Map_Manager_Ex<ACE_CString, TAO_AV_Protocol_Object *, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator_Base_Ex<ACE_CString, TAO_AV_Protocol_Object *, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator_Ex<ACE_CString, TAO_AV_Protocol_Object *, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Reverse_Iterator_Ex<ACE_CString, TAO_AV_Protocol_Object *, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;

template class ACE_Hash_Map_Entry<ACE_CString, AVStreams::StreamCtrl_var>;
template class ACE_Hash_Map_Manager<ACE_CString, AVStreams::StreamCtrl_var, ACE_Null_Mutex>;
template class ACE_Hash_Map_Manager_Ex<ACE_CString, AVStreams::StreamCtrl_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator_Base_Ex<ACE_CString, AVStreams::StreamCtrl_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Iterator_Ex<ACE_CString, AVStreams::StreamCtrl_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;
template class ACE_Hash_Map_Reverse_Iterator_Ex<ACE_CString, AVStreams::StreamCtrl_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>;

#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)

#pragma instantiate ACE_Hash_Map_Entry<ACE_CString, AVStreams::MMDevice_var>
#pragma instantiate ACE_Hash_Map_Manager<ACE_CString, AVStreams::MMDevice_var, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Manager_Ex<ACE_CString, AVStreams::MMDevice_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<ACE_CString, AVStreams::MMDevice_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator_Ex<ACE_CString, AVStreams::MMDevice_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<ACE_CString, AVStreams::MMDevice_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>

#pragma instantiate ACE_Hash_Map_Entry<ACE_CString, Endpoint_Addresses*>
#pragma instantiate ACE_Hash_Map_Manager<ACE_CString, Endpoint_Addresses*, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Manager_Ex<ACE_CString, Endpoint_Addresses*, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<ACE_CString, Endpoint_Addresses*, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator_Ex<ACE_CString, Endpoint_Addresses*, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<ACE_CString, Endpoint_Addresses*, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>

#pragma instantiate ACE_Hash_Map_Entry<ACE_CString, TAO_AV_Protocol_Object *>
#pragma instantiate ACE_Hash_Map_Manager<ACE_CString, TAO_AV_Protocol_Object *, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Manager_Ex<ACE_CString, TAO_AV_Protocol_Object *, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<ACE_CString, TAO_AV_Protocol_Object *, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator_Ex<ACE_CString, TAO_AV_Protocol_Object *, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<ACE_CString, TAO_AV_Protocol_Object *, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>

#pragma instantiate ACE_Hash_Map_Entry<ACE_CString, AVStreams::StreamCtrl_var>
#pragma instantiate ACE_Hash_Map_Manager<ACE_CString, AVStreams::StreamCtrl_var, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Manager_Ex<ACE_CString, AVStreams::StreamCtrl_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<ACE_CString, AVStreams::StreamCtrl_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Iterator_Ex<ACE_CString, AVStreams::StreamCtrl_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>
#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<ACE_CString, AVStreams::StreamCtrl_var, ACE_Hash<ACE_CString>, ACE_Equal_To<ACE_CString>, ACE_Null_Mutex>

#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?