distributer.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 518 行

CPP
518
字号
// distributer.cpp,v 1.14 2002/05/08 22:13:05 crodrigu Exp

#include "distributer.h"
#include "tao/debug.h"
#include "ace/Get_Opt.h"
#include "orbsvcs/AV/Protocol_Factory.h"
#include "orbsvcs/AV/FlowSpec_Entry.h"

#include "tao/Strategies/advanced_resource.h"

typedef ACE_Singleton<Distributer, ACE_Null_Mutex> DISTRIBUTER;

// constructor.
Signal_Handler::Signal_Handler (void)
{
}

int
Signal_Handler::handle_signal (int signum, siginfo_t *, ucontext_t*)
{
  if (signum == SIGINT)
    {
      if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG,
                    "In the signal handler\n"));

      DISTRIBUTER::instance ()->done (1);

    }
  return 0;
}

int
Distributer_Sender_StreamEndPoint::get_callback (const char *flow_name,
                                                 TAO_AV_Callback *&callback)
{
  /// Create and return the sender application callback to AVStreams
  /// for further upcalls.
  callback = &this->callback_;

  ACE_CString fname = flow_name;

  this->callback_.flowname (fname);

  return 0;
}

int
Distributer_Sender_StreamEndPoint::set_protocol_object (const char *flowname,
                                                        TAO_AV_Protocol_Object *object)
{
  Connection_Manager &connection_manager =
    DISTRIBUTER::instance ()->connection_manager ();

  /// Add to the map of protocol objects.
  connection_manager.protocol_objects ().bind (flowname,
                                               object);

  /// Store the related streamctrl.
  connection_manager.add_streamctrl (flowname,
                                     this);

  return 0;
}

int
Distributer_Receiver_StreamEndPoint::get_callback (const char *flow_name,
                                                   TAO_AV_Callback *&callback)
{
  /// Create and return the receiver application callback to AVStreams
  /// for further upcalls.
  callback = &this->callback_;

  ACE_CString flowname (flow_name);
  this->callback_.flowname (flowname);

  return 0;
}

int
Distributer_Receiver_StreamEndPoint::set_protocol_object (const char *,
                                                          TAO_AV_Protocol_Object *)
{
  /// Increment the stream count.
  DISTRIBUTER::instance ()->stream_created ();

  return 0;
}

CORBA::Boolean
Distributer_Receiver_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &flowspec
                                                                  ACE_ENV_ARG_DECL_NOT_USED)
{
  //if (TAO_debug_level > 0)
    ACE_DEBUG ((LM_DEBUG,
                "Distributer_Receiver_StreamEndPoint::handle_connection_requested\n"));

  Connection_Manager &connection_manager =
    DISTRIBUTER::instance ()->connection_manager ();

  /// Check to see if the flow already exists. If it does then close the
  /// old connection and setup a new one with the new sender.

  for (CORBA::ULong i = 0;
       i < flowspec.length ();
       i++)
    {
      TAO_Forward_FlowSpec_Entry entry;
      entry.parse (flowspec[i].in ());

      //if (TAO_debug_level > 0)
        ACE_DEBUG ((LM_DEBUG,
                    "Handle Conection Requested flowname %s \n",
                    entry.flowname ()));

      ACE_CString flowname (entry.flowname ());

      int result =
               connection_manager.streamctrls ().find (flowname);

      /// If the flowname is found.
      if (result == 0)
        {
          ACE_DEBUG ((LM_DEBUG, "\nDistributer switching senders handle connection requested\n\n"));

          ///Destroy old stream with the same flowname.
          connection_manager.destroy (flowname);

        }

      /// Store the related streamctrl.
      connection_manager.add_streamctrl (flowname.c_str (),
                                         this);

    }
  return 1;

}


Distributer_Receiver_Callback::Distributer_Receiver_Callback (void)
  : frame_count_ (1)
{
}

ACE_CString &
Distributer_Receiver_Callback::flowname (void)
{
  return this->flowname_;
}

void
Distributer_Receiver_Callback::flowname (const ACE_CString &flowname)
{
  this->flowname_ = flowname;
}


int
Distributer_Receiver_Callback::receive_frame (ACE_Message_Block *frame,
                                              TAO_AV_frame_info *,
                                              const ACE_Addr &)
{
  /// Upcall from the AVStreams when there is data to be received from
  /// the sender.
  ACE_DEBUG ((LM_DEBUG,
              "Distributer_Callback::receive_frame for frame %d\n",
              this->frame_count_++));

  Connection_Manager::Protocol_Objects &protocol_objects =
    DISTRIBUTER::instance ()->connection_manager ().protocol_objects ();

  /// Send frame to all receivers.
  for (Connection_Manager::Protocol_Objects::iterator iterator = protocol_objects.begin ();
       iterator != protocol_objects.end ();
       ++iterator)
    {
      int result =
        (*iterator).int_id_->send_frame (frame);

      if (result < 0)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "send failed:%p",
                           "Sender::pace_data send\n"),
                          -1);
    }

  return 0;
}

int
Distributer_Receiver_Callback::handle_destroy (void)
{
  /// Called when the sender requests the stream to be shutdown.
  ACE_DEBUG ((LM_DEBUG,
              "Distributer_Receiver_Callback::end_stream\n"));

  DISTRIBUTER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_.c_str ());

  /// Decrement the stream count.
  DISTRIBUTER::instance ()->stream_destroyed ();

  return 0;
}

ACE_CString &
Distributer_Sender_Callback::flowname (void)
{
  return this->flowname_;
}

void
Distributer_Sender_Callback::flowname (const ACE_CString &flowname)
{
  this->flowname_ = flowname;
}

int
Distributer_Sender_Callback::handle_destroy (void)
{
  /// Called when the sender requests the stream to be shutdown.

  ACE_DEBUG ((LM_DEBUG,
              "Distributer_Sender_Callback::end_stream\n"));

  DISTRIBUTER::instance ()->connection_manager ().protocol_objects ().unbind (this->flowname_.c_str ());

  DISTRIBUTER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_.c_str ());

  DISTRIBUTER::instance ()->connection_manager ().receivers ().unbind (this->flowname_.c_str ());

  return 0;
}

Distributer::Distributer (void)
  : sender_name_ ("sender"),
    distributer_name_ ("distributer"),
    done_ (0),
    stream_count_ (0)
{
}

Distributer::~Distributer (void)
{
}

void
Distributer::stream_created (void)
{
  this->stream_count_++;
}

void
Distributer::stream_destroyed (void)
{
  this->stream_count_--;

  if (this->stream_count_ == 0)
    this->done_ = 1;
}


Connection_Manager &
Distributer::connection_manager (void)
{
  return this->connection_manager_;
}

int
Distributer::parse_args (int argc,
                         char **argv)
{
  /// Parse command line arguments
  ACE_Get_Opt opts (argc, argv, "s:r:");

  int c;
  while ((c= opts ()) != -1)
    {
      switch (c)
        {
        case 's':
          this->sender_name_ = opts.opt_arg ();
          break;
        case 'r':
          this->distributer_name_ = opts.opt_arg ();
          break;
        default:
          ACE_DEBUG ((LM_DEBUG,"Unknown Option\n"));
          return -1;
        }
    }
  return 0;
}


int
Distributer::init (int argc,
                   char ** argv
                   ACE_ENV_ARG_DECL)
{
  /// Initialize the connection class.
  int result =
    this->connection_manager_.init (TAO_AV_CORE::instance ()->orb ());
  if (result != 0)
    return result;

  /// Initialize the endpoint strategy with the orb and poa.
  result =
    this->sender_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
                                          TAO_AV_CORE::instance ()->poa ());
  if (result != 0)
    return result;

  result =
    this->receiver_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
                                            TAO_AV_CORE::instance ()->poa ());
  if (result != 0)
    return result;

  /// Parse the command line arguments
  result =
    this->parse_args (argc,
                      argv);
  if (result != 0)
    return result;

  ACE_Reactor *reactor =
    TAO_AV_CORE::instance ()->reactor ();

  if (reactor->register_handler (SIGINT,
                                 &this->signal_handler_) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "Error in handler register\n"),
                      -1);
  /// Register the signal handler for clean termination of the process.

  ACE_NEW_RETURN (this->distributer_sender_mmdevice_,
                  TAO_MMDevice (&this->sender_endpoint_strategy_),
                  -1);

  /// Servant Reference Counting to manage lifetime
  PortableServer::ServantBase_var safe_sender_mmdevice =
    this->distributer_sender_mmdevice_;

  AVStreams::MMDevice_var distributer_sender_mmdevice =
    this->distributer_sender_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  ACE_NEW_RETURN (this->distributer_receiver_mmdevice_,
                  TAO_MMDevice (&this->receiver_endpoint_strategy_),
                  -1);

  /// Servant Reference Counting to manage lifetime
  PortableServer::ServantBase_var safe_receiver_mmdevice =
    this->distributer_receiver_mmdevice_;

  AVStreams::MMDevice_var distributer_receiver_mmdevice =
    this->distributer_receiver_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);


  /// Bind to sender.
  this->connection_manager_.bind_to_sender (this->sender_name_,
                                            this->distributer_name_,
                                            distributer_receiver_mmdevice.in ()
                                            ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  /// Connect to sender.
  this->connection_manager_.connect_to_sender (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  /// Bind to receivers.
  this->connection_manager_.bind_to_receivers (this->distributer_name_,
                                               distributer_sender_mmdevice.in ()
                                               ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  /// Connect to receivers
  this->connection_manager_.connect_to_receivers (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  return 0;
}

int
Distributer::done (void) const
{
  return this->done_;
}

void
Distributer::shut_down (ACE_ENV_SINGLE_ARG_DECL)
{
  ACE_TRY
    {
      AVStreams::MMDevice_var receiver_mmdevice =
        this->distributer_receiver_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      DISTRIBUTER::instance ()->connection_manager ().unbind_receiver (this->sender_name_,
                                                                       this->distributer_name_,
                                                                       receiver_mmdevice.in ());
      AVStreams::MMDevice_var sender_mmdevice =
        this->distributer_sender_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      DISTRIBUTER::instance ()->connection_manager ().unbind_sender (this->distributer_name_,
                                                                     sender_mmdevice.in ());

    //  DISTRIBUTER::instance ()->connection_manager ().destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
    //  ACE_TRY_CHECK;

    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"Distributer::shut_down");
    }
  ACE_ENDTRY;
}

void
Distributer::done (int done)
{
  this->done_ = done;
}

int
main (int argc,
      char **argv)
{
  ACE_DECLARE_NEW_CORBA_ENV;
  ACE_TRY
    {
      /// Initialize the ORB first.
      CORBA::ORB_var orb =
        CORBA::ORB_init (argc,
                         argv,
                         0
                         ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      CORBA::Object_var obj
        = orb->resolve_initial_references ("RootPOA"
                                           ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      /// Get the POA_var object from Object_var.
      PortableServer::POA_var root_poa =
        PortableServer::POA::_narrow (obj.in ()
                                      ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      PortableServer::POAManager_var mgr
        = root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      mgr->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      /// Initialize the AVStreams components.
      TAO_AV_CORE::instance ()->init (orb.in (),
                                      root_poa.in ()
                                      ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      /// Initialize the Distributer
      int result =
        DISTRIBUTER::instance ()->init (argc,
                                        argv
                                        ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      if (result != 0)
        return result;

      while (!DISTRIBUTER::instance ()->done ())
        {
          if( orb->work_pending( ACE_ENV_SINGLE_ARG_PARAMETER ) )
          {
            orb->perform_work (ACE_ENV_SINGLE_ARG_PARAMETER);
	  
          ACE_TRY_CHECK;
          }
        }

      DISTRIBUTER::instance ()->shut_down (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

//      orb->shutdown(1 ACE_ENV_ARG_PARAMETER);
//      ACE_TRY_CHECK;

    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"main");
      return -1;
    }
  ACE_ENDTRY;
  ACE_CHECK_RETURN (-1);

  return 0;
}

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Singleton <Distributer,ACE_Null_Mutex>;
template class TAO_AV_Endpoint_Reactive_Strategy_A<Distributer_Sender_StreamEndPoint,   TAO_VDev, AV_Null_MediaCtrl>;
template class TAO_AV_Endpoint_Reactive_Strategy  <Distributer_Sender_StreamEndPoint,   TAO_VDev, AV_Null_MediaCtrl>;
template class TAO_AV_Endpoint_Reactive_Strategy_B<Distributer_Receiver_StreamEndPoint, TAO_VDev, AV_Null_MediaCtrl>;
template class TAO_AV_Endpoint_Reactive_Strategy  <Distributer_Receiver_StreamEndPoint, TAO_VDev, AV_Null_MediaCtrl>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Singleton <Distributer,ACE_Null_Mutex>
#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_A<Distributer_Sender_StreamEndPoint,   TAO_VDev, AV_Null_MediaCtrl>
#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy  <Distributer_Sender_StreamEndPoint,   TAO_VDev, AV_Null_MediaCtrl>
#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_B<Distributer_Receiver_StreamEndPoint, TAO_VDev, AV_Null_MediaCtrl>
#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy  <Distributer_Receiver_StreamEndPoint, TAO_VDev, AV_Null_MediaCtrl>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?