📄 event_channel.cpp
字号:
this->remove_observer (gw->observer_handle () ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
gw->observer_handle (0);
}
void
ACE_EventChannel::update_consumer_gwys (ACE_ENV_SINGLE_ARG_DECL)
{
Observer_Map observers;
{
ACE_GUARD_THROW_EX (
ACE_ES_MUTEX, ace_mon, this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
ACE_CHECK;
if (this->observers_.current_size () == 0
|| this->state_ == ACE_EventChannel::SHUTDOWN)
return;
observers.open (this->observers_.current_size ());
for (Observer_Map_Iterator i = this->observers_.begin ();
i != this->observers_.end ();
++i)
{
observers.bind ((*i).ext_id_, (*i).int_id_);
}
}
// ACE_DEBUG ((LM_DEBUG,
// "EC (%t) Event_Channel::update_consumer_gwys\n"));
RtecEventChannelAdmin::ConsumerQOS c_qos;
this->consumer_module_->fill_qos (c_qos);
for (Observer_Map_Iterator i = observers.begin ();
i != observers.end ();
++i)
{
(*i).int_id_.observer->update_consumer (c_qos ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
void
ACE_EventChannel::update_supplier_gwys (ACE_ENV_SINGLE_ARG_DECL)
{
Observer_Map observers;
{
ACE_GUARD_THROW_EX (ACE_ES_MUTEX, ace_mon, this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
ACE_CHECK;
if (this->observers_.current_size () == 0
|| this->state_ == ACE_EventChannel::SHUTDOWN)
return;
observers.open (this->observers_.current_size ());
for (Observer_Map_Iterator i = this->observers_.begin ();
i != this->observers_.end ();
++i)
{
observers.bind ((*i).ext_id_, (*i).int_id_);
}
}
// ACE_DEBUG ((LM_DEBUG,
// "EC (%t) Event_Channel::update_supplier_gwys\n"));
RtecEventChannelAdmin::SupplierQOS s_qos;
this->supplier_module_->fill_qos (s_qos);
for (Observer_Map_Iterator i = observers.begin ();
i != observers.end ();
++i)
{
(*i).int_id_.observer->update_supplier (s_qos ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
RtecEventChannelAdmin::Observer_Handle
ACE_EventChannel::append_observer (RtecEventChannelAdmin::Observer_ptr obs
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((
CORBA::SystemException,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR,
RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER))
{
ACE_GUARD_THROW_EX (ACE_ES_MUTEX, ace_mon, this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
ACE_CHECK_RETURN (0);
this->handle_generator_++;
Observer_Entry entry (this->handle_generator_,
RtecEventChannelAdmin::Observer::_duplicate (obs));
if (this->observers_.bind (entry.handle, entry) == -1)
ACE_THROW_RETURN (
RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER(),
0);
RtecEventChannelAdmin::ConsumerQOS c_qos;
this->consumer_module_->fill_qos (c_qos);
obs->update_consumer (c_qos ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (0);
RtecEventChannelAdmin::SupplierQOS s_qos;
this->supplier_module_->fill_qos (s_qos);
obs->update_supplier (s_qos ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (0);
return entry.handle;
}
void
ACE_EventChannel::remove_observer (RtecEventChannelAdmin::Observer_Handle h
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((
CORBA::SystemException,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR,
RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER))
{
ACE_GUARD_THROW_EX (ACE_ES_MUTEX, ace_mon, this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
ACE_CHECK;
if (this->observers_.unbind (h) == -1)
ACE_THROW (
RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER());
}
void
ACE_EventChannel::cleanup_observers (void)
{
ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_);
// @@ TODO report back any errors here...
this->observers_.close ();
}
int
ACE_EventChannel::schedule_timer (RtecScheduler::handle_t rt_info,
const ACE_Command_Base *act,
RtecScheduler::Preemption_Priority_t preemption_priority,
const RtecScheduler::Time &delta,
const RtecScheduler::Time &interval)
{
if (rt_info != 0)
{
// Add the timer to the task's dependency list.
RtecScheduler::handle_t timer_rtinfo =
this->timer_module ()->rt_info (preemption_priority);
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
#if 1
this->scheduler_->add_dependency (rt_info,
timer_rtinfo,
1,
RtecBase::ONE_WAY_CALL
ACE_ENV_ARG_PARAMETER);
#else
ACE_Scheduler_Factory::server()->add_dependency
(rt_info,
timer_rtinfo,
1,
RtecBase::ONE_WAY_CALL
ACE_ENV_ARG_PARAMETER);
#endif
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"add dependency failed");
}
ACE_ENDTRY;
}
// @@ We're losing resolution here.
ACE_Time_Value tv_delta;
ORBSVCS_Time::TimeT_to_Time_Value (tv_delta, delta);
ACE_Time_Value tv_interval;
ORBSVCS_Time::TimeT_to_Time_Value (tv_interval, interval);
return this->timer_module ()->schedule_timer (preemption_priority,
ACE_const_cast(ACE_Command_Base*,act),
tv_delta,
tv_interval);
}
ACE_EventChannel::Observer_Entry::Observer_Entry (void)
: handle (0)
{
}
ACE_EventChannel::Observer_Entry::Observer_Entry (RtecEventChannelAdmin::Observer_Handle h,
RtecEventChannelAdmin::Observer_ptr o)
: handle (h),
observer (o)
{
}
ACE_ES_Disjunction_Group::~ACE_ES_Disjunction_Group (void)
{
}
ACE_ES_Conjunction_Group::~ACE_ES_Conjunction_Group (void)
{
}
ACE_ES_Subscription_Info::~ACE_ES_Subscription_Info (void)
{
Subscriber_Map_Iterator iter (type_subscribers_);
// Delete all type collections.
for (Subscriber_Map_Entry *temp = 0;
iter.next (temp) != 0;
iter.advance ())
{
delete temp->int_id_;
}
}
/*
void
ACE_ES_Subscription_Info::Type_Subscribers::operator=
(const ACE_ES_Subscription_Info::Type_Subscribers &rhs)
{
ACE_ES_Subscription_Info::Subscriber_Set_Iterator iter (rhs.subscribers_);
for (ACE_ES_Consumer_Rep **consumer = 0;
iter.next (consumer) != 0;
iter.advance ())
{
if (subscribers_.insert (consumer) != 0)
ACE_ERROR ((LM_ERROR, "%p insert failed.\n",
"ACE_ES_Subscription_Info::Type_Subscribers::operator="));
}
// Pointer copy.
dependency_info_ = rhs.dependency_info_;
}
*/
// Remove <consumer> from the consumer set in <type_map> set
// corresponding to <type>.
int
ACE_ES_Subscription_Info::remove (Subscriber_Map &type_map,
ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventType type)
{
Type_Subscribers *subscribers;
// Find the type set within the type collection.
if (type_map.find (type, subscribers) == -1)
{
ACE_DEBUG ((LM_DEBUG,
"EC (%t) Info::remove - not found %d\n", type));
// type_map does not contain the type.
return -1;
}
// Remove the consumer from the type set.
if (subscribers->consumers_.remove (consumer) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p remove failed.\n",
"ACE_ES_Subscriber_Info::remove"), -1);
// @@ Should probably remove the supplier from the consumers caller
// list.
// @@ Should we release here? consumer->_release ();
#if 0
// If the set is empty, remove it from the type collection.
// NOT!!!! In some cases the map is initialized to the types that a
// certain supplier export; removing an entry from the map renders
// that supplier unable to send that event type.
// Before changing this ask me (coryan).
if (subscribers->consumers_.size () == 0)
{
Type_Subscribers *removed_subscribers;
if (type_map.unbind (type, removed_subscribers) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p unbind failed.\n",
"ACE_ES_Subscriber_Info::remove"), -1);
// Sanity check.
if (removed_subscribers != subscribers)
ACE_ERROR_RETURN ((LM_ERROR, "ACE_ES_Subscriber_Info::remove: "
"removed wrong set!\n"), -1);
// Free up the set removed.
delete removed_subscribers;
}
#endif /* 0 */
return 0;
}
int
ACE_ES_Subscription_Info::remove (SourceID_Map &source_subscribers,
ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventSourceID sid)
{
Subscriber_Set *subscribers;
// Find the subscribers of <sid>.
if (source_subscribers.find (sid, subscribers) == -1)
// does not contain the <sid>.
return -1;
// Remove the consumer from the subscriber set.
if (subscribers->remove (consumer) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p remove failed.\n",
"ACE_ES_Subscriber_Info::remove"), -1);
// @@ Should we release here? consumer->_release ();
// @@ Should probably remove the supplier from the consumers caller
// list.
#if 0
// If the set is empty, remove it from the type collection.
// NOT!!!! In some cases the map is initialized to the types that a
// certain supplier export; removing an entry from the map renders
// that supplier unable to send that event type.
// Before changing this ask me (coryan).
if (subscribers->size () == 0)
{
Subscriber_Set *removed_subscribers;
if (source_subscribers.unbind (sid, removed_subscribers) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p unbind failed.\n",
"ACE_ES_Subscriber_Info::remove"), -1);
// Sanity check.
if (removed_subscribers != subscribers)
ACE_ERROR_RETURN ((LM_ERROR, "ACE_ES_Subscriber_Info::remove: "
"removed wrong set!\n"), -1);
// Free up the set removed.
delete removed_subscribers;
}
#endif /* 0 */
return 0;
}
void
ACE_ES_Subscription_Info::append_subscribers (Subscriber_Set &dest,
Subscriber_Set &src)
{
Subscriber_Set_Iterator src_iter (src);
// Iterate through the source set. Add each source proxy to the
// destination set.
for (ACE_ES_Consumer_Rep **proxy = 0;
src_iter.next (proxy) != 0;
src_iter.advance ())
{
if (dest.insert (*proxy) == -1)
ACE_ERROR ((LM_ERROR, "%p: insert failed.\n", "append_subscribers"));
}
}
int
ACE_ES_Subscription_Info::insert_or_allocate (SourceID_Map &sid_map,
ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventSourceID sid)
{
Subscriber_Set *subscribers;
if (sid_map.find (sid, subscribers) == -1)
{
// If the correct type set does not exist, make one with a null
// dependency info (since there is no supplier of this event).
subscribers = new Subscriber_Set;
if (sid_map.bind (sid, subscribers) == -1)
{
ACE_ERROR ((LM_ERROR, "%p bind failed.\n",
"ACE_ES_Subscription_Info::insert_or_allocate"));
delete subscribers;
return -1;
}
}
// 0 and 1 are success for insert.
if (subscribers->insert (consumer) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p insert failed.\n",
"ACE_ES_Subscription_Info::insert_or_allocate"),
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -