⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_channel.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 5 页
字号:
  this->remove_observer (gw->observer_handle () ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  gw->observer_handle (0);
}

void
ACE_EventChannel::update_consumer_gwys (ACE_ENV_SINGLE_ARG_DECL)
{
  Observer_Map observers;
  {
    ACE_GUARD_THROW_EX (
         ACE_ES_MUTEX, ace_mon, this->lock_,
         RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
    ACE_CHECK;

    if (this->observers_.current_size () == 0
        || this->state_ == ACE_EventChannel::SHUTDOWN)
      return;

    observers.open (this->observers_.current_size ());
    for (Observer_Map_Iterator i = this->observers_.begin ();
         i != this->observers_.end ();
         ++i)
      {
        observers.bind ((*i).ext_id_, (*i).int_id_);
      }
  }

  // ACE_DEBUG ((LM_DEBUG,
  //              "EC (%t) Event_Channel::update_consumer_gwys\n"));

  RtecEventChannelAdmin::ConsumerQOS c_qos;
  this->consumer_module_->fill_qos (c_qos);
  for (Observer_Map_Iterator i = observers.begin ();
       i != observers.end ();
       ++i)
    {
      (*i).int_id_.observer->update_consumer (c_qos ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }
}

void
ACE_EventChannel::update_supplier_gwys (ACE_ENV_SINGLE_ARG_DECL)
{
  Observer_Map observers;
  {
    ACE_GUARD_THROW_EX (ACE_ES_MUTEX, ace_mon, this->lock_,
        RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
    ACE_CHECK;

    if (this->observers_.current_size () == 0
        || this->state_ == ACE_EventChannel::SHUTDOWN)
      return;

    observers.open (this->observers_.current_size ());
    for (Observer_Map_Iterator i = this->observers_.begin ();
         i != this->observers_.end ();
         ++i)
      {
        observers.bind ((*i).ext_id_, (*i).int_id_);
      }
  }

  // ACE_DEBUG ((LM_DEBUG,
  //            "EC (%t) Event_Channel::update_supplier_gwys\n"));

  RtecEventChannelAdmin::SupplierQOS s_qos;
  this->supplier_module_->fill_qos (s_qos);
  for (Observer_Map_Iterator i = observers.begin ();
       i != observers.end ();
       ++i)
    {
      (*i).int_id_.observer->update_supplier (s_qos ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }
}

RtecEventChannelAdmin::Observer_Handle
ACE_EventChannel::append_observer (RtecEventChannelAdmin::Observer_ptr obs
                                   ACE_ENV_ARG_DECL)
    ACE_THROW_SPEC ((
        CORBA::SystemException,
        RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR,
        RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER))
{
  ACE_GUARD_THROW_EX (ACE_ES_MUTEX, ace_mon, this->lock_,
      RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
  ACE_CHECK_RETURN (0);

  this->handle_generator_++;
  Observer_Entry entry (this->handle_generator_,
                        RtecEventChannelAdmin::Observer::_duplicate (obs));

  if (this->observers_.bind (entry.handle, entry) == -1)
    ACE_THROW_RETURN (
        RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER(),
        0);

  RtecEventChannelAdmin::ConsumerQOS c_qos;
  this->consumer_module_->fill_qos (c_qos);
  obs->update_consumer (c_qos ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (0);

  RtecEventChannelAdmin::SupplierQOS s_qos;
  this->supplier_module_->fill_qos (s_qos);
  obs->update_supplier (s_qos ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (0);

  return entry.handle;
}

void
ACE_EventChannel::remove_observer (RtecEventChannelAdmin::Observer_Handle h
                                   ACE_ENV_ARG_DECL)
    ACE_THROW_SPEC ((
        CORBA::SystemException,
        RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR,
        RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER))
{
  ACE_GUARD_THROW_EX (ACE_ES_MUTEX, ace_mon, this->lock_,
      RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
  ACE_CHECK;

  if (this->observers_.unbind (h) == -1)
    ACE_THROW (
        RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER());
}

void
ACE_EventChannel::cleanup_observers (void)
{
  ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_);

  // @@ TODO report back any errors here...
  this->observers_.close ();
}

int
ACE_EventChannel::schedule_timer (RtecScheduler::handle_t rt_info,
                                  const ACE_Command_Base *act,
                                  RtecScheduler::Preemption_Priority_t preemption_priority,
                                  const RtecScheduler::Time &delta,
                                  const RtecScheduler::Time &interval)
{
  if (rt_info != 0)
    {
      // Add the timer to the task's dependency list.
      RtecScheduler::handle_t timer_rtinfo =
        this->timer_module ()->rt_info (preemption_priority);

      ACE_DECLARE_NEW_CORBA_ENV;
      ACE_TRY
        {
#if 1
          this->scheduler_->add_dependency (rt_info,
                                            timer_rtinfo,
                                            1,
                                            RtecBase::ONE_WAY_CALL
                                             ACE_ENV_ARG_PARAMETER);
#else
          ACE_Scheduler_Factory::server()->add_dependency
            (rt_info,
             timer_rtinfo,
             1,
             RtecBase::ONE_WAY_CALL
              ACE_ENV_ARG_PARAMETER);
#endif
          ACE_TRY_CHECK;
        }
      ACE_CATCHANY
        {
          ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                               "add dependency failed");
        }
      ACE_ENDTRY;
    }

  // @@ We're losing resolution here.
  ACE_Time_Value tv_delta;
  ORBSVCS_Time::TimeT_to_Time_Value (tv_delta, delta);

  ACE_Time_Value tv_interval;
  ORBSVCS_Time::TimeT_to_Time_Value (tv_interval, interval);

  return this->timer_module ()->schedule_timer (preemption_priority,
                                                ACE_const_cast(ACE_Command_Base*,act),
                                                tv_delta,
                                                tv_interval);
}

ACE_EventChannel::Observer_Entry::Observer_Entry (void)
  :  handle (0)
{
}

ACE_EventChannel::Observer_Entry::Observer_Entry (RtecEventChannelAdmin::Observer_Handle h,
                                                  RtecEventChannelAdmin::Observer_ptr o)
  :  handle (h),
     observer (o)
{
}

ACE_ES_Disjunction_Group::~ACE_ES_Disjunction_Group (void)
{
}

ACE_ES_Conjunction_Group::~ACE_ES_Conjunction_Group (void)
{
}

ACE_ES_Subscription_Info::~ACE_ES_Subscription_Info (void)
{
  Subscriber_Map_Iterator iter (type_subscribers_);

  // Delete all type collections.
  for (Subscriber_Map_Entry *temp = 0;
       iter.next (temp) != 0;
       iter.advance ())
    {
      delete temp->int_id_;
    }
}

/*
void
ACE_ES_Subscription_Info::Type_Subscribers::operator=
(const ACE_ES_Subscription_Info::Type_Subscribers &rhs)
{
  ACE_ES_Subscription_Info::Subscriber_Set_Iterator iter (rhs.subscribers_);

  for (ACE_ES_Consumer_Rep **consumer = 0;
       iter.next (consumer) != 0;
       iter.advance ())
    {
      if (subscribers_.insert (consumer) != 0)
        ACE_ERROR ((LM_ERROR, "%p insert failed.\n",
                    "ACE_ES_Subscription_Info::Type_Subscribers::operator="));
    }

  // Pointer copy.
  dependency_info_ = rhs.dependency_info_;
}
*/

// Remove <consumer> from the consumer set in <type_map> set
// corresponding to <type>.
int
ACE_ES_Subscription_Info::remove (Subscriber_Map &type_map,
                                  ACE_ES_Consumer_Rep *consumer,
                                  RtecEventComm::EventType type)
{
  Type_Subscribers *subscribers;

  // Find the type set within the type collection.
  if (type_map.find (type, subscribers) == -1)
    {
      ACE_DEBUG ((LM_DEBUG,
                  "EC (%t) Info::remove - not found %d\n", type));
      // type_map does not contain the type.
      return -1;
    }

  // Remove the consumer from the type set.
  if (subscribers->consumers_.remove (consumer) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, "%p remove failed.\n",
                       "ACE_ES_Subscriber_Info::remove"), -1);
  // @@ Should probably remove the supplier from the consumers caller
  // list.

  // @@ Should we release here? consumer->_release ();

#if 0
  // If the set is empty, remove it from the type collection.
  // NOT!!!! In some cases the map is initialized to the types that a
  // certain supplier export; removing an entry from the map renders
  // that supplier unable to send that event type.
  // Before changing this ask me (coryan).
  if (subscribers->consumers_.size () == 0)
    {
      Type_Subscribers *removed_subscribers;
      if (type_map.unbind (type, removed_subscribers) == -1)
        ACE_ERROR_RETURN ((LM_ERROR, "%p unbind failed.\n",
                           "ACE_ES_Subscriber_Info::remove"), -1);

      // Sanity check.
      if (removed_subscribers != subscribers)
        ACE_ERROR_RETURN ((LM_ERROR, "ACE_ES_Subscriber_Info::remove: "
                           "removed wrong set!\n"), -1);

      // Free up the set removed.
      delete removed_subscribers;
    }
#endif /* 0 */

  return 0;
}


int
ACE_ES_Subscription_Info::remove (SourceID_Map &source_subscribers,
                                  ACE_ES_Consumer_Rep *consumer,
                                  RtecEventComm::EventSourceID sid)
{
  Subscriber_Set *subscribers;

  // Find the subscribers of <sid>.
  if (source_subscribers.find (sid, subscribers) == -1)
    // does not contain the <sid>.
    return -1;

  // Remove the consumer from the subscriber set.
  if (subscribers->remove (consumer) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, "%p remove failed.\n",
                       "ACE_ES_Subscriber_Info::remove"), -1);

  // @@ Should we release here? consumer->_release ();

  // @@ Should probably remove the supplier from the consumers caller
  // list.

#if 0
  // If the set is empty, remove it from the type collection.
  // NOT!!!! In some cases the map is initialized to the types that a
  // certain supplier export; removing an entry from the map renders
  // that supplier unable to send that event type.
  // Before changing this ask me (coryan).
  if (subscribers->size () == 0)
    {
      Subscriber_Set *removed_subscribers;
      if (source_subscribers.unbind (sid, removed_subscribers) == -1)
        ACE_ERROR_RETURN ((LM_ERROR, "%p unbind failed.\n",
                           "ACE_ES_Subscriber_Info::remove"), -1);

      // Sanity check.
      if (removed_subscribers != subscribers)
        ACE_ERROR_RETURN ((LM_ERROR, "ACE_ES_Subscriber_Info::remove: "
                           "removed wrong set!\n"), -1);

      // Free up the set removed.
      delete removed_subscribers;
    }
#endif /* 0 */

  return 0;
}

void
ACE_ES_Subscription_Info::append_subscribers (Subscriber_Set &dest,
                                              Subscriber_Set &src)
{
  Subscriber_Set_Iterator src_iter (src);

  // Iterate through the source set.  Add each source proxy to the
  // destination set.
  for (ACE_ES_Consumer_Rep **proxy = 0;
       src_iter.next (proxy) != 0;
       src_iter.advance ())
    {
      if (dest.insert (*proxy) == -1)
        ACE_ERROR ((LM_ERROR, "%p: insert failed.\n", "append_subscribers"));
    }
}

int
ACE_ES_Subscription_Info::insert_or_allocate (SourceID_Map &sid_map,
                                              ACE_ES_Consumer_Rep *consumer,
                                              RtecEventComm::EventSourceID sid)
{
  Subscriber_Set *subscribers;

  if (sid_map.find (sid, subscribers) == -1)
    {
      // If the correct type set does not exist, make one with a null
      // dependency info (since there is no supplier of this event).
      subscribers = new Subscriber_Set;

      if (sid_map.bind (sid, subscribers) == -1)
        {
          ACE_ERROR ((LM_ERROR, "%p bind failed.\n",
                      "ACE_ES_Subscription_Info::insert_or_allocate"));
          delete subscribers;
          return -1;
        }
    }

  // 0 and 1 are success for insert.
  if (subscribers->insert (consumer) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, "%p insert failed.\n",
                       "ACE_ES_Subscription_Info::insert_or_allocate"),

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -