⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_channel.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 5 页
字号:
  this->source_id_ = qos_.publications[0].event.header.source;

  supplier_module_->connected (this ACE_ENV_ARG_PARAMETER);
}

void
ACE_Push_Supplier_Proxy::push (const RtecEventComm::EventSet &event
                               ACE_ENV_ARG_DECL)
    ACE_THROW_SPEC ((CORBA::SystemException))
{
  ACE_TIMEPROBE (TAO_EVENT_CHANNEL_ENTER_PUSH_SUPPLIER_PROXY_PUSH);

  // NOTE: Detecting that the supplier is collocated is a TAOism.
  if (!this->push_supplier_->_is_collocated ())
    {
      // NOTE: This is *extremely* non-portable, we know that the ORB
      // core allocates this buffer from the global heap, hence it is
      // safe to steal it (further the EC will release the buffer, but
      // in another thread!). Other ORBs may do different things and
      // this may not work!
      RtecEventComm::EventSet& copy =
        ACE_const_cast (RtecEventComm::EventSet&, event);

      this->time_stamp (copy);
      this->supplier_module_->push (this, copy ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }
  else
    {
      RtecEventComm::EventSet copy = event;
      this->time_stamp (copy);
      this->supplier_module_->push (this, copy ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }
}

void
ACE_Push_Supplier_Proxy::time_stamp (RtecEventComm::EventSet& event)
{
#if !defined(TAO_LACKS_EVENT_CHANNEL_TIMESTAMPS)
  ACE_hrtime_t ec_recv = ACE_OS::gethrtime ();
  for (CORBA::ULong i = 0; i < event.length (); ++i)
    {
      ORBSVCS_Time::hrtime_to_TimeT (event[i].header.ec_recv_time,
                                     ec_recv);
    }
#else
  ACE_UNUSED_ARG (event);
#endif /* TAO_LACKS_EVENT_CHANNEL_TIMESTAMPS */
}

void
ACE_Push_Supplier_Proxy::disconnect_push_consumer (
    ACE_ENV_SINGLE_ARG_DECL)
      ACE_THROW_SPEC ((CORBA::SystemException))
{
  ACE_TIMEPROBE_PRINT;
  if (this->connected ())
    {
      this->push_supplier_ = 0;
      this->supplier_module_->disconnecting (this ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }
}

void
ACE_Push_Supplier_Proxy::shutdown (void)
{
  ACE_DECLARE_NEW_CORBA_ENV;
  ACE_TRY
    {
      push_supplier_->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                           "ACE_Push_Supplier_Proxy::shutdown failed.\n");
    }
  ACE_ENDTRY;
}

ACE_Push_Consumer_Proxy::ACE_Push_Consumer_Proxy (ACE_ES_Consumer_Module *cm)
  : push_consumer_ (0),
    consumer_module_ (cm)
{
}

ACE_Push_Consumer_Proxy::~ACE_Push_Consumer_Proxy (void)
{
}

void
ACE_Push_Consumer_Proxy::push (const RtecEventComm::EventSet &events
                               ACE_ENV_ARG_DECL)
{
  ACE_TIMEPROBE (TAO_EVENT_CHANNEL_DELIVER_EVENT_TO_CONSUMER_PROXY);

  if (CORBA::is_nil (push_consumer_.in ()))
    {
      ACE_DEBUG ((LM_DEBUG,
                  "EC (%t) Push to disconnected consumer\n"));
      return;
    }

  push_consumer_->push (events ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
ACE_Push_Consumer_Proxy::connect_push_consumer (
    RtecEventComm::PushConsumer_ptr push_consumer,
    const RtecEventChannelAdmin::ConsumerQOS &qos
    ACE_ENV_ARG_DECL)
      ACE_THROW_SPEC ((CORBA::SystemException,
                       RtecEventChannelAdmin::AlreadyConnected,
                       RtecEventChannelAdmin::TypeError))
{
  if (this->connected ())
    ACE_THROW (RtecEventChannelAdmin::AlreadyConnected());

  this->push_consumer_ =
    RtecEventComm::PushConsumer::_duplicate(push_consumer);
  // @@ TODO Find out why are two duplicates needed...
  RtecEventComm::PushConsumer::_duplicate(push_consumer);

  //ACE_DEBUG ((LM_DEBUG, "EC (%t) connect_push_consumer QOS is "));
  //ACE_ConsumerQOS_Factory::debug (qos);

  // Copy by value.
  this->qos_ = qos;

  // ACE_ConsumerQOS_Factory::debug (qos_);

  this->consumer_module_->connected (this ACE_ENV_ARG_PARAMETER);
}

void
ACE_Push_Consumer_Proxy::disconnect_push_supplier (
    ACE_ENV_SINGLE_ARG_DECL)
      ACE_THROW_SPEC ((CORBA::SystemException))
{
  ACE_TIMEPROBE_PRINT;
  this->push_consumer_ = RtecEventComm::PushConsumer::_nil ();
  this->consumer_module_->disconnecting (this ACE_ENV_ARG_PARAMETER);
}

void
ACE_Push_Consumer_Proxy::suspend_connection (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
    ACE_THROW_SPEC ((CORBA::SystemException))
{
  correlation_.suspend ();
}

void
ACE_Push_Consumer_Proxy::resume_connection (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
    ACE_THROW_SPEC ((CORBA::SystemException))
{
  correlation_.resume ();
}

void
ACE_Push_Consumer_Proxy::shutdown (void)
{
  ACE_DECLARE_NEW_CORBA_ENV;
  ACE_TRY
    {
      this->push_consumer_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                           "ACE_Push_Consumer_Proxy::shutdown failed.\n");
    }
  ACE_ENDTRY;
}

ACE_EventChannel::ACE_EventChannel (RtecScheduler::Scheduler_ptr scheduler,
                                    CORBA::Boolean activate_threads,
                                    u_long type,
                                    TAO_Module_Factory* factory)
  : rtu_manager_ (0),
    type_ (type),
    state_ (INITIAL_STATE),
    destroyed_ (0),
    handle_generator_ (0),
    own_factory_ (0),
    module_factory_ (factory)
{
  this->scheduler_ =
    RtecScheduler::Scheduler::_duplicate (scheduler);

  this->init (activate_threads);
}

ACE_EventChannel::ACE_EventChannel (CORBA::Boolean activate_threads,
                                    u_long type,
                                    TAO_Module_Factory* factory)
  : rtu_manager_ (0),
    type_ (type),
    state_ (INITIAL_STATE),
    destroyed_ (0),
    handle_generator_ (0),
    own_factory_ (0),
    module_factory_ (factory)
{
  this->scheduler_ =
    RtecScheduler::Scheduler::_duplicate (ACE_Scheduler_Factory::server ());
  this->init (activate_threads);
}

void
ACE_EventChannel::init (int activate_threads)
{
  if (this->module_factory_ == 0)
    {
      this->own_factory_ = 1;
      ACE_NEW (this->module_factory_, TAO_Default_Module_Factory);
    }

  consumer_module_ =
    this->module_factory_->create_consumer_module (this);

  this->timer_module_ =
    this->module_factory_->create_timer_module (this);

  this->dispatching_module_ =
    this->module_factory_->create_dispatching_module(this);

  this->correlation_module_ =
    this->module_factory_->create_correlation_module (this);
  this->subscription_module_ =
    this->module_factory_->create_subscription_module (this);
  this->supplier_module_ =
    this->module_factory_->create_supplier_module (this);

  consumer_module_->open (dispatching_module_);
  dispatching_module_->open (consumer_module_, correlation_module_);
  correlation_module_->open (dispatching_module_, subscription_module_);
  subscription_module_->open (correlation_module_, supplier_module_);
  supplier_module_->open (subscription_module_);

  if (activate_threads)
    this->activate ();
}

ACE_EventChannel::~ACE_EventChannel (void)
{
  ACE_DEBUG ((LM_DEBUG,
              "EC (%t) ACE_EventChannel deleting all modules.\n"));

  // @@ This should go away, it is too late to raise a CORBA
  // exception, at this point we should only be cleaning up memory,
  // not sending messages.
  ACE_DECLARE_NEW_CORBA_ENV;
  ACE_TRY
    {
      this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                           "ACE_EventChannel::~ACE_EventChannel");
    }
  ACE_ENDTRY;

  this->cleanup_observers ();

  this->timer_module_->shutdown ();
  this->dispatching_module_->shutdown ();

  this->module_factory_->destroy_timer_module (this->timer_module_);
  this->module_factory_->destroy_supplier_module (this->supplier_module_);
  this->module_factory_->destroy_subscription_module (this->subscription_module_);
  this->module_factory_->destroy_correlation_module (this->correlation_module_);
  this->module_factory_->destroy_dispatching_module(this->dispatching_module_);
  this->module_factory_->destroy_consumer_module (this->consumer_module_);

  if (this->own_factory_)
    delete this->module_factory_;
}

void
ACE_EventChannel::destroy (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  {
    ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_);

    if (this->destroyed_ != 0)
      return;

    this->destroyed_ = 1;
    ACE_DEBUG ((LM_DEBUG, "EC (%t) Event Channel shutting down.\n"));

  }
  this->cleanup_observers ();

  // Send a shutdown message through the modules.
  this->supplier_module_->shutdown ();

#if 0
  // Flush all messages in the channel.
  Shutdown_Channel *sc = new Shutdown_Channel (this);
  if (sc == 0)
    ACE_THROW (CORBA::NO_MEMORY ());

  // Create a wrapper around the dispatch request.
  Flush_Queue_ACT *act = new Flush_Queue_ACT (sc, dispatching_module_);
  if (act == 0)
    ACE_THROW (CORBA::NO_MEMORY ());

  // Set a 100ns timer.
  if (this->timer_module ()->schedule_timer (0, // no rt-info
                                             act,
                                             ACE_Scheduler_MIN_PREEMPTION_PRIORITY,
                                             100, // 10 usec delta
                                             0) == -1) // no interval
    {
      ACE_ERROR ((LM_ERROR, "%p queue_request failed.\n", "ACE_ES_Consumer_Module"));
      delete sc;
      delete act;
    }
#endif
}

void
ACE_EventChannel::activate (void)
{
  this->dispatching_module_->activate (THREADS_PER_DISPATCH_QUEUE);
  this->timer_module_->activate ();
}

void
ACE_EventChannel::shutdown (void)
{
  this->cleanup_observers ();

  this->timer_module_->shutdown ();
  this->dispatching_module_->shutdown ();
}

void
ACE_EventChannel::report_connect (u_long event)
{
  ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_);

  this->report_connect_i (event);
}

void
ACE_EventChannel::report_connect_i (u_long event)
{
  ACE_CLR_BITS (state_, event);
}

void
ACE_EventChannel::report_disconnect (u_long event)
{
  // No need to gtrab the lock is already take by our callers.
  ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_);

  this->report_disconnect (event);
}

void
ACE_EventChannel::report_disconnect_i (u_long event)
{
  ACE_SET_BITS (state_, event);
  if (state_ == SHUTDOWN)
    ACE_DEBUG ((LM_DEBUG,
                "EC (%t) Event Channel has no consumers or suppliers.\n"));
}

void
ACE_EventChannel::add_gateway (TAO_EC_Gateway* gw
                               ACE_ENV_ARG_DECL)
{
  RtecEventChannelAdmin::Observer_var observer = gw->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  gw->observer_handle (this->append_observer (observer.in () ACE_ENV_ARG_PARAMETER));
  ACE_CHECK;
}

void
ACE_EventChannel::del_gateway (TAO_EC_Gateway* gw
                               ACE_ENV_ARG_DECL)
{

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -