⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_channel.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 5 页
字号:
      if (c->qos ().is_gateway)
        continue;

      CORBA::ULong count = c->qos ().dependencies.length ();
      for (CORBA::ULong j = 0; j < count; ++j)
        {
          RtecEventComm::Event& event =
            c->qos ().dependencies[j].event;

          RtecEventComm::EventType type = event.header.type;

          // Only type and source dependencies are relevant, notice
          // that we turn conjunctions into disjunctions because
          // correlations could be satisfied by events coming from
          // several remote ECs.
          // Notice that <0> is a *not* skipped, otherwise source only
          // filtering does not work.
          if (1 <= type && type <= ACE_ES_EVENT_UNDEFINED)
            continue;

          // If the dependency is already there we don't add it.
          CORBA::ULong k;
          for (k = 0; k < cc; ++k)
            {
              if (dep[k].event.header.type == event.header.type
                  && dep[k].event.header.source == event.header.source)
                break;
            }
          if (k == cc)
            {
              dep[cc].event.header.type = event.header.type;
              dep[cc].event.header.source = event.header.source;
              dep[cc].event.header.creation_time = ORBSVCS_Time::zero ();
              // The RT_Info is filled up later.
              dep[cc].rt_info = 0;
              cc++;
            }
        }
    }
  dep.length (cc);

  // ACE_DEBUG ((LM_DEBUG, "EC (%t) Consumer::fill_qos - %d\n", cc));
}

ACE_ES_Correlation_Module::ACE_ES_Correlation_Module (ACE_EventChannel *channel)
  : channel_ (channel),
    up_ (0),
    subscription_module_ (0)
{
}

void
ACE_ES_Correlation_Module::open (ACE_ES_Dispatching_Module *up,
                                 ACE_ES_Subscription_Module *sm)
{
  up_ = up;
  subscription_module_ = sm;
}

void
ACE_ES_Correlation_Module::connected (ACE_Push_Consumer_Proxy *consumer
                                      ACE_ENV_ARG_DECL)
{
  // Initialize the consumer correlation filter.
  if (consumer->correlation ().connected (consumer, this) == -1)
    ACE_THROW (RtecEventChannelAdmin::EventChannel::CORRELATION_ERROR());
}

void
ACE_ES_Correlation_Module::disconnecting (ACE_Push_Consumer_Proxy *consumer
                                          ACE_ENV_ARG_DECL_NOT_USED)
{
  if (consumer->correlation ().disconnecting () == -1)
    ACE_ERROR ((LM_ERROR,
                "ACE_ES_Correlation_Module::disconnecting failed.\n"));
}

int
ACE_ES_Correlation_Module::subscribe (ACE_ES_Consumer_Rep *consumer)
{
  return subscription_module_->subscribe (consumer);
}

int
ACE_ES_Correlation_Module::unsubscribe (ACE_ES_Consumer_Rep *cr)
{
  return subscription_module_->unsubscribe (cr);
}

void
ACE_ES_Correlation_Module::push (ACE_ES_Consumer_Rep *consumer,
                                 const TAO_EC_Event& event
                                 ACE_ENV_ARG_DECL)
{
  // ACE_DEBUG ((LM_DEBUG, "EC (%t) Correlation_Module::push\n"));

  ACE_TIMEPROBE (TAO_EVENT_CHANNEL_ENTER_ACE_ES_CORRELATION_MODULE_PUSH);
  ACE_ES_Dispatch_Request *request =
    consumer->correlation ()->push (consumer, event);
  ACE_TIMEPROBE (TAO_EVENT_CHANNEL_PUSHED_TO_CORRELATION_MODULE);

  // If request == 0, then the event was queued for later.  Otherwise,
  // we need to push the event now.
  if (request != 0)
    {
      up_->push (request ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }

  ACE_TIMEPROBE (TAO_EVENT_CHANNEL_PUSH_SOURCE_TYPE_DISPATCH_MODULE_ENQUEUING);
}

// Must check consumer->qos ().use_timeout () before calling this.
// This method is supposed properly schedule a timer with respect to
// the consumer's priority AND the correlation that should receive the
// timeout event.
int
ACE_ES_Correlation_Module::schedule_timeout (ACE_ES_Consumer_Rep_Timeout *consumer)
{
  RtecEventComm::Time &interval =
    consumer->dependency ()->event.header.creation_time;
  RtecEventComm::Time &delay =
    consumer->dependency ()->event.header.creation_time;

  // Store the preemption priority so we can cancel the correct timer.
  // The priority values may change during the process lifetime (e.g.,
  // after the scheduler has been run).
  consumer->preemption_priority (::IntervalToPriority (interval));

  // ACE_DEBUG ((LM_DEBUG,
  // "EC (%t) Adding timer at preemption %d, rate = (%d,%d)\n",
  // consumer->preemption_priority (),
  // interval.low, interval.high));

  // Register the timer.
  int id =
    this->channel_->schedule_timer (consumer->dependency ()->rt_info,
                                    consumer,
                                    consumer->preemption_priority (),
                                    delay, interval);

  // Store the timer id for canceling.
  consumer->timer_id (id);

  if (id == -1)
    ACE_ERROR_RETURN ((LM_ERROR, "%p schedule timer failed.\n",
                       "ACE_ES_Correlation_Module::schedule_timeout"), -1);

  return 0;
}

// Must check consumer->qos ().timeout_ before calling this.
int
ACE_ES_Correlation_Module::cancel_timeout (ACE_ES_Consumer_Rep_Timeout *consumer)
{
  // Cancel the timer from the Priority Timer.
  ACE_Command_Base *act;
  this->channel_->cancel_timer (consumer->preemption_priority (),
                                consumer->timer_id (),
                                act);

  ACE_ASSERT (consumer == act);

  // Free up the Timer ACT.
  //  delete act;

  return 0;
}


int
ACE_ES_Correlation_Module::reschedule_timeout (ACE_ES_Consumer_Rep_Timeout *consumer)
{
  if (this->cancel_timeout (consumer) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "ACE_ES_Disjunction_Group::reschedule_deadline"), -1);
  else
    {
      RtecEventComm::Time &interval =
        consumer->dependency ()->event.header.creation_time;
      RtecEventComm::Time &delay =
        consumer->dependency ()->event.header.creation_time;

      // Store the preemption priority so we can cancel the correct timer.
      // The priority values may change during the process lifetime (e.g.,
      // after the scheduler has been run).
      consumer->preemption_priority (::IntervalToPriority (interval));

      // Register the timer.
      int id =
        this->channel_->schedule_timer (0, // Do not pass an RT_Info.
                                        consumer,
                                        consumer->preemption_priority (),
                                        delay, interval);

      // Store the timer id for canceling.
      consumer->timer_id (id);

      if (id == -1)
        ACE_ERROR_RETURN ((LM_ERROR, "%p schedule timer failed.\n",
                           "ACE_ES_Correlation_Module::reschedule_timeout"), -1);

      return 0;
    }
}

void
ACE_ES_Correlation_Module::shutdown (void)
{
  // Perhaps this should call disconnecting on all the consumers?
  // We'll opt to just forward this message for now.
  up_->shutdown ();
}

ACE_ES_Consumer_Correlation::ACE_ES_Consumer_Correlation (void) :
  correlation_module_ (0),
  type_id_index_ (0),
  channel_ (0),
  qos_ (),
  pending_events_ (0),
  lock_ (),
  consumer_ (0),
  pending_flags_ (0),
  consumer_reps_ (0),
  n_consumer_reps_ (0),
  timer_reps_ (0),
  n_timer_reps_ (0),
  conjunction_groups_ (0),
  n_conjunction_groups_ (0),
  disjunction_groups_ (0),
  n_disjunction_groups_ (0),
  connected_ (0)
{
}

ACE_ES_Consumer_Correlation::~ACE_ES_Consumer_Correlation (void)
{
  delete [] timer_reps_;
  for (int i = 0; i < this->n_consumer_reps_; ++i)
    {
      ACE_ES_Consumer_Rep *r = this->consumer_reps_[i];
      if (r != 0)
        {
          this->correlation_module_->unsubscribe (r);
          r->_release ();
        }
    }
  delete [] consumer_reps_;
  delete [] conjunction_groups_;
  delete [] disjunction_groups_;
  delete [] pending_events_;
}

void
ACE_ES_Consumer_Correlation::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
      ACE_THROW_SPEC ((CORBA::SystemException))
{
  this->connected_ = 0;
}

int
ACE_ES_Consumer_Correlation::allocate_correlation_resources (ACE_ES_Dependency_Iterator &iter)
{
  n_conjunction_groups_ = iter.n_conjunctions ();
  if (n_conjunction_groups_ > 0)
    {
      conjunction_groups_ = new ACE_ES_Conjunction_Group[n_conjunction_groups_];
      if (conjunction_groups_ == 0)
        ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
                           "ACE_ES_Consumer_Correlation::"
                           "allocate_correlation_resources"), -1);
      for (int n=0; n < n_conjunction_groups_; n++)
        conjunction_groups_[n].set_correlation_module (correlation_module_);
    }

  n_disjunction_groups_ = iter.n_disjunctions ();
  if (n_disjunction_groups_ > 0)
    {
      disjunction_groups_ = new ACE_ES_Disjunction_Group[n_disjunction_groups_];
      if (disjunction_groups_ == 0)
        ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
                           "ACE_ES_Consumer_Correlation::"
                           "allocate_correlation_resources"), -1);
      for (int n=0; n < n_disjunction_groups_; n++)
        disjunction_groups_[n].set_correlation_module (correlation_module_);
    }

  n_consumer_reps_ = iter.n_events ();
  if (n_consumer_reps_ > 0)
    {
      // This allocates more than is needed if there are repeats:
      // (A+B)|(B+C).  We allocate these individually so that they can
      // be deleted individually.

      typedef ACE_ES_Consumer_Rep *reparray;
      consumer_reps_ = new reparray[n_consumer_reps_];

      for (int cr = 0; cr < n_consumer_reps_; cr++)
        {
          consumer_reps_[cr] = new ACE_ES_Consumer_Rep;
          if (consumer_reps_[cr] == 0)
            ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
                               "ACE_ES_Consumer_Correlation::"
                               "allocate_correlation_resources"), -1);
        }
    }

  n_timer_reps_ = iter.n_timeouts ();
  if (n_timer_reps_ > 0)
    {
      timer_reps_ = new ACE_ES_Consumer_Rep_Timeout[n_timer_reps_];
      if (timer_reps_ == 0)
        ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
                           "ACE_ES_Consumer_Correlation::"
                           "allocate_correlation_resources"), -1);
    }

  // This allocates more than is needed.
  // @@ throw an exception.
  ACE_NEW_RETURN (this->pending_events_,
                  TAO_EC_Event_Array[n_consumer_reps_ + n_timer_reps_],
                  -1);

  return 0;
}

// We don't need synchronization until after we've been connected and
// subscribed to events.
int
ACE_ES_Consumer_Correlation::connected (ACE_Push_Consumer_Proxy *consumer,
                                        ACE_ES_Correlation_Module *correlation_module)
{
  correlation_module_ = correlation_module;
  consumer_ = consumer;

  //  for (CORBA_Types::ULong index=0; index < consumer->qos ().dependencies_.length (); index++)
  //    consumer->qos ().dependencies_[index].event.dump ();

  ACE_ES_Dependency_Iterator iter (consumer->qos ().dependencies);
  iter.parse ();
  if (this->allocate_correlation_resources (iter) == -1)
    return -1;


  int cgroup_index = -1;
  int dgroup_index = -1;
  int crep_index = 0;
  int trep_index = 0;
  RtecEventComm::EventType group_type = 0;

  while (iter.advance_dependency () == 0)
    {
      // Keep track of how many conjunction and disjunction groups are
      // registered.  Update the index pointers so that the helper
      // functions can update the appropriate group objects.
      switch ((*iter).event.header.type)
        {
        case ACE_ES_CONJUNCTION_DESIGNATOR:
          cgroup_index++;
          ACE_ASSERT (cgroup_index < n_conjunction_groups_);
          group_type = ACE_ES_CONJUNCTION_DESIGNATOR;
          continue;

        case ACE_ES_DISJUNCTION_DESIGNATOR:
          dgroup_index++;
          ACE_ASSERT (dgroup_index < n_disjunction_groups_);
          group_type = ACE_ES_DISJUNCTION_DESIGNATOR;
          continue;

        case ACE_ES_GLOBAL_DESIGNATOR:
          group_type = ACE_ES_GLOBAL_DESIGNATOR;
          continue;

          // These Delegate to the appropriate registration method.
#if 0
          // @@ TODO rt_info_ is a handle_t now, does checking against
          // 0  still make sense?
          // Check for a null rt_info.
          if ((*iter).rt_info_ == 0)
            {
              ACE_ERROR ((LM_ERROR, "Found a ConsumerQOS::dependencies[].rt_info_ == 0.\n"));
              continue;
            }
#endif /* 0 */

        case ACE_ES_EVENT_TIMEOUT:
          // For backwards compatibility.
        case ACE_ES_EVENT_DEADLINE_TI

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -