⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_channel.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 5 页
字号:
                      -1);

  consumer->_duplicate ();
  return 0;
}

int
ACE_ES_Subscription_Info::insert_or_allocate (Subscriber_Map &type_map,
                                              ACE_ES_Consumer_Rep *consumer,
                                              RtecEventComm::EventType type)
{
  Type_Subscribers *subscribers;

  if (type_map.find (type, subscribers) == -1)
    {
      // If the correct type set does not exist, make one with a null
      // dependency info (since there is no supplier of this event).
      subscribers = new Type_Subscribers (0);

      if (type_map.bind (type, subscribers) == -1)
        {
          ACE_ERROR ((LM_ERROR, "%p bind failed.\n",
                      "ACE_ES_Subscription_Info::insert_or_allocate"));
          delete subscribers;
          return -1;
        }
    }

  if (subscribers->consumers_.insert (consumer) == -1)
    {
      ACE_ERROR ((LM_ERROR, "%p insert failed.\n",
                  "ACE_ES_Subscription_Info::insert_or_allocate"));
    }

  consumer->_duplicate ();
  return 0;
}

int
ACE_ES_Subscription_Info::insert_or_fail (Subscriber_Map &type_map,
                                          ACE_ES_Consumer_Rep *consumer,
                                          RtecEventComm::EventType type,
                                          RtecScheduler::Dependency_Info *&dependency)
{
  Type_Subscribers *subscribers;

  // Get the subscriber set for <type>.
  if (type_map.find (type, subscribers) == -1)
    return -1;

  // Pass back the description of the method generating <type>.
  dependency = subscribers->dependency_info_;

  // Insert the new consumer into the subscriber set.
  if (subscribers->consumers_.insert (consumer) == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR, "%p insert failed.\n",
                         "ACE_ES_Subscription_Info::insert_or_fail"),
                        -1);
    }

  consumer->_duplicate ();
  return 0;
}

ACE_ES_Consumer_Module::ACE_ES_Consumer_Module (ACE_EventChannel* channel)
  :  lock_ (),
     all_consumers_ (),
     channel_ (channel),
     down_ (0)
{
}

void
ACE_ES_Consumer_Module::open (ACE_ES_Dispatching_Module *down)
{
  down_ = down;
}

void
ACE_ES_Consumer_Module::connected (ACE_Push_Consumer_Proxy *consumer
                                   ACE_ENV_ARG_DECL)
{
  // ACE_DEBUG ((LM_DEBUG,
  //             "EC (%t) Consumer_Module - connecting consumer %x\n",
  //  consumer));

  this->channel_->report_connect (ACE_EventChannel::CONSUMER);
  this->down_->connected (consumer ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  if (!consumer->qos ().is_gateway)
    this->channel_->update_consumer_gwys (ACE_ENV_SINGLE_ARG_PARAMETER);
}

void
ACE_ES_Consumer_Module::shutdown_request (ACE_ES_Dispatch_Request *request)
{
  ACE_TRY_NEW_ENV
    {
      Shutdown_Consumer *sc = (Shutdown_Consumer *) request;

      // Tell everyone else that the consumer is disconnected.  This means
      // that *nothing* is left in the system for the consumer, so
      // everyone can free up any resources.
      this->down_->disconnected (sc->consumer ());

      // ACE_DEBUG ((LM_DEBUG,
      //             "EC (%t) Consumer_Module - remove consumer %x\n",
      //  sc->consumer ()));

      CORBA::Boolean dont_update = sc->consumer ()->qos ().is_gateway;

      // Deactivate the consumer proxy
      PortableServer::POA_var poa =
        sc->consumer ()->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
      PortableServer::ObjectId_var id =
        poa->servant_to_id (sc->consumer () ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;
      poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // Delete the consumer proxy, no need to delete it, is is owned
      // by the POA
      // delete sc->consumer ();

      if (!dont_update)
        this->channel_->update_consumer_gwys (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      ACE_GUARD_THROW_EX (
          ACE_ES_MUTEX, ace_mon, this->lock_,
          RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
      ACE_TRY_CHECK;

      // Tell the channel that we may need to shut down.
      if (all_consumers_.size () <= 0)
        {
          // ACE_DEBUG ((LM_DEBUG,
          //             "EC (%t) No more consumers connected.\n"));
          channel_->report_disconnect_i (ACE_EventChannel::CONSUMER);
        }
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                           "Consumer_Module::shutdown_request");
    }
  ACE_ENDTRY;
}

void
ACE_ES_Consumer_Module::shutdown (void)
{
  Consumers copy;

  {
    ACE_Guard<ACE_ES_MUTEX> ace_mon (lock_);
    if (ace_mon.locked () == 0)
      goto DONE;

    if (all_consumers_.size () == 0)
      goto DONE;

    // Make a copy so that the consumers can disconnect without the
    // lock being held.
    copy = all_consumers_;
  }

  // This scope is just to thwart the compiler.  It was complaining
  // about the above goto's bypassing variable initializations.  Yadda
  // yadda.
  {
    Consumer_Iterator iter (copy);

    ACE_DECLARE_NEW_CORBA_ENV;
    ACE_TRY
      {
        for (ACE_Push_Consumer_Proxy **proxy = 0;
             iter.next (proxy) != 0;
             iter.advance ())
          {
            (*proxy)->shutdown ();
            // @@ Cannnot use CORBA::release (*proxy), since it is a
            // servant.
            // Deactivate the proxy...
            PortableServer::POA_var poa =
              (*proxy)->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
            ACE_TRY_CHECK;
            PortableServer::ObjectId_var id =
              poa->servant_to_id (*proxy ACE_ENV_ARG_PARAMETER);
            ACE_TRY_CHECK;
            poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
            ACE_TRY_CHECK;

            // Remove the consumer from our list.
            {
              ACE_Guard<ACE_ES_MUTEX> ace_mon (lock_);
              if (ace_mon.locked () == 0)
                ACE_ERROR ((LM_ERROR, "%p Failed to acquire lock.\n", "ACE_ES_Consumer_Module::shutdown"));

              if (all_consumers_.remove (*proxy) == -1)
                ACE_ERROR ((LM_ERROR, "%p Failed to remove consumer.\n", "ACE_ES_Consumer_Module::shutdown"));
            }

            // No need to delete it, owned by the POA
            // delete *proxy;
          }
      }
    ACE_CATCHANY
      {
        // Ignore the exceptions...
      }
    ACE_ENDTRY;
  }

DONE:
  channel_->shutdown ();
}

void
ACE_ES_Consumer_Module::disconnecting (ACE_Push_Consumer_Proxy *consumer
                                       ACE_ENV_ARG_DECL)
{
  {
    ACE_GUARD_THROW_EX (
        ACE_ES_MUTEX, ace_mon, this->lock_,
        RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
    ACE_CHECK;

    if (all_consumers_.remove (consumer) == -1)
      ACE_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR());
  }

  // Tell everyone else that the consumer is disconnecting.  This
  // allows them to remove the consumer from any subscription lists
  // etc.  However, messages may still be queued in the ReactorEx or
  // in the Dispatching Module for this consumer, so no queues or
  // proxies can be deleted just yet.
  down_->disconnecting (consumer ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  // Send a shutdown message through the system.  When this is
  // dispatched, the consumer proxy will be deleted.  <request> is
  // queued in the Priority_Timer at <priority> level.  It will be
  // scheduled for dispatching in 1 nanosecond.  This gives components
  // a hook into the first queueing point in the channel.

  // Create a shutdown message.  When this is dispatched, it will
  // delete the proxy.
  RtecScheduler::Scheduler_var scheduler =
    this->channel_->scheduler ();
  Shutdown_Consumer *sc =
    new Shutdown_Consumer (this, consumer, scheduler.in ());
  if (sc == 0)
    ACE_THROW (CORBA::NO_MEMORY ());

  // Create a wrapper around the dispatch request.
  Flush_Queue_ACT *act =
    new Flush_Queue_ACT (sc, channel_->dispatching_module_);
  if (act == 0)
    ACE_THROW (CORBA::NO_MEMORY ());

  // ACE_DEBUG ((LM_DEBUG, "EC (%t) initiating consumer disconnect.\n"));

  // Set a 100ns timer.
  TimeBase::TimeT ns100;
  ORBSVCS_Time::hrtime_to_TimeT (ns100, 100);
  if (this->channel_->schedule_timer (0, // no rt_info
                                      act,
                                      ACE_Scheduler_MIN_PREEMPTION_PRIORITY,
                                      ns100,
                                      ORBSVCS_Time::zero ()) == -1)
    {
      ACE_ERROR ((LM_ERROR, "%p queue_request failed.\n", "ACE_ES_Consumer_Module"));
      delete sc;
      delete act;
    }
}

// This method executes in the same thread of control that will hand
// the event set to the consumer (or it's proxy).  A network proxy may
// copy the event set to the network buffer.  An active client may
// copy the event set to be queued.  Or a same address-space consumer
// can read the set we allocated off the stack.
void
ACE_ES_Consumer_Module::push (const ACE_ES_Dispatch_Request *request
                              ACE_ENV_ARG_DECL)
{
  // ACE_DEBUG ((LM_DEBUG, "EC (%t) Consumer_Module::push\n"));

  ACE_FUNCTION_TIMEPROBE (TAO_EVENT_CHANNEL_ENTER_ES_CONSUMER_MODULE_PUSH);
  // We'll create a temporary event set with the size of the incoming
  // request.
  RtecEventComm::EventSet event_set;
  request->make_copy (event_set);

  // Forward the event set.
#if !defined(TAO_LACKS_EVENT_CHANNEL_TIMESTAMPS)
  ACE_hrtime_t ec_send = ACE_OS::gethrtime ();
  for (CORBA::ULong i = 0; i < event_set.length (); ++i)
    {
      RtecEventComm::Event& ev = event_set[i];
      ORBSVCS_Time::hrtime_to_TimeT (ev.header.ec_send_time, ec_send);
    }
#endif /* TAO_LACKS_EVENT_CHANNEL_TIMESTAMPS */
  request->consumer ()->push (event_set ACE_ENV_ARG_PARAMETER);
}

RtecEventChannelAdmin::ProxyPushSupplier_ptr
ACE_ES_Consumer_Module::obtain_push_supplier (
    ACE_ENV_SINGLE_ARG_DECL)
      ACE_THROW_SPEC ((CORBA::SystemException))
{
  RtecEventChannelAdmin::ProxyPushSupplier_ptr proxy =
    RtecEventChannelAdmin::ProxyPushSupplier::_nil ();

  auto_ptr<ACE_Push_Consumer_Proxy> new_consumer (new ACE_Push_Consumer_Proxy (this));

  // Get a new supplier proxy object.
  if (new_consumer.get () == 0)
    {
      ACE_ERROR ((LM_ERROR, "ACE_EventChannel"
                  "::obtain_push_supplier failed.\n"));
      ACE_THROW_RETURN (CORBA::NO_MEMORY (), proxy);
    }

  {
    ACE_GUARD_THROW_EX (
        ACE_ES_MUTEX, ace_mon, this->lock_,
        CORBA::INTERNAL ());
    // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
    ACE_CHECK_RETURN (proxy);

    if (all_consumers_.insert (new_consumer.get ()) == -1)
      ACE_ERROR ((LM_ERROR, "ACE_ES_Consumer_Module insert failed.\n"));
  }

  proxy = new_consumer->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (proxy);

  // Give away ownership to the POA....
  new_consumer.release ()->_remove_ref (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (proxy);

  return proxy;
}

void
ACE_ES_Consumer_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos)
{
  ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_);

  c_qos.is_gateway = 1;

  int count = 0;
  {
    for (Consumer_Iterator i = this->all_consumers_.begin ();
         i != this->all_consumers_.end ();
         ++i)
      {
        ACE_Push_Consumer_Proxy *c = *i;

        if (c->qos ().is_gateway)
          continue;

        count += c->qos ().dependencies.length ();
      }
  }

  RtecEventChannelAdmin::DependencySet& dep = c_qos.dependencies;

  dep.length (count + 1);

  CORBA::ULong cc = 0;
  dep[cc].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR;
  dep[cc].event.header.source = 0;
  dep[cc].event.header.creation_time = ORBSVCS_Time::zero ();
  dep[cc].rt_info = 0;
  cc++;

  for (Consumer_Iterator i = this->all_consumers_.begin ();
       i != this->all_consumers_.end ();
       ++i)
    {
      ACE_Push_Consumer_Proxy *c = *i;

      // ACE_DEBUG ((LM_DEBUG, "EC (%t) fill_qos "));
      // ACE_ConsumerQOS_Factory::debug (c->qos ());

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -