📄 event_channel.cpp
字号:
if (c->qos ().is_gateway)
continue;
CORBA::ULong count = c->qos ().dependencies.length ();
for (CORBA::ULong j = 0; j < count; ++j)
{
RtecEventComm::Event& event =
c->qos ().dependencies[j].event;
RtecEventComm::EventType type = event.header.type;
// Only type and source dependencies are relevant, notice
// that we turn conjunctions into disjunctions because
// correlations could be satisfied by events coming from
// several remote ECs.
// Notice that <0> is a *not* skipped, otherwise source only
// filtering does not work.
if (1 <= type && type <= ACE_ES_EVENT_UNDEFINED)
continue;
// If the dependency is already there we don't add it.
CORBA::ULong k;
for (k = 0; k < cc; ++k)
{
if (dep[k].event.header.type == event.header.type
&& dep[k].event.header.source == event.header.source)
break;
}
if (k == cc)
{
dep[cc].event.header.type = event.header.type;
dep[cc].event.header.source = event.header.source;
dep[cc].event.header.creation_time = ORBSVCS_Time::zero ();
// The RT_Info is filled up later.
dep[cc].rt_info = 0;
cc++;
}
}
}
dep.length (cc);
// ACE_DEBUG ((LM_DEBUG, "EC (%t) Consumer::fill_qos - %d\n", cc));
}
ACE_ES_Correlation_Module::ACE_ES_Correlation_Module (ACE_EventChannel *channel)
: channel_ (channel),
up_ (0),
subscription_module_ (0)
{
}
void
ACE_ES_Correlation_Module::open (ACE_ES_Dispatching_Module *up,
ACE_ES_Subscription_Module *sm)
{
up_ = up;
subscription_module_ = sm;
}
void
ACE_ES_Correlation_Module::connected (ACE_Push_Consumer_Proxy *consumer
ACE_ENV_ARG_DECL)
{
// Initialize the consumer correlation filter.
if (consumer->correlation ().connected (consumer, this) == -1)
ACE_THROW (RtecEventChannelAdmin::EventChannel::CORRELATION_ERROR());
}
void
ACE_ES_Correlation_Module::disconnecting (ACE_Push_Consumer_Proxy *consumer
ACE_ENV_ARG_DECL_NOT_USED)
{
if (consumer->correlation ().disconnecting () == -1)
ACE_ERROR ((LM_ERROR,
"ACE_ES_Correlation_Module::disconnecting failed.\n"));
}
int
ACE_ES_Correlation_Module::subscribe (ACE_ES_Consumer_Rep *consumer)
{
return subscription_module_->subscribe (consumer);
}
int
ACE_ES_Correlation_Module::unsubscribe (ACE_ES_Consumer_Rep *cr)
{
return subscription_module_->unsubscribe (cr);
}
void
ACE_ES_Correlation_Module::push (ACE_ES_Consumer_Rep *consumer,
const TAO_EC_Event& event
ACE_ENV_ARG_DECL)
{
// ACE_DEBUG ((LM_DEBUG, "EC (%t) Correlation_Module::push\n"));
ACE_TIMEPROBE (TAO_EVENT_CHANNEL_ENTER_ACE_ES_CORRELATION_MODULE_PUSH);
ACE_ES_Dispatch_Request *request =
consumer->correlation ()->push (consumer, event);
ACE_TIMEPROBE (TAO_EVENT_CHANNEL_PUSHED_TO_CORRELATION_MODULE);
// If request == 0, then the event was queued for later. Otherwise,
// we need to push the event now.
if (request != 0)
{
up_->push (request ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
ACE_TIMEPROBE (TAO_EVENT_CHANNEL_PUSH_SOURCE_TYPE_DISPATCH_MODULE_ENQUEUING);
}
// Must check consumer->qos ().use_timeout () before calling this.
// This method is supposed properly schedule a timer with respect to
// the consumer's priority AND the correlation that should receive the
// timeout event.
int
ACE_ES_Correlation_Module::schedule_timeout (ACE_ES_Consumer_Rep_Timeout *consumer)
{
RtecEventComm::Time &interval =
consumer->dependency ()->event.header.creation_time;
RtecEventComm::Time &delay =
consumer->dependency ()->event.header.creation_time;
// Store the preemption priority so we can cancel the correct timer.
// The priority values may change during the process lifetime (e.g.,
// after the scheduler has been run).
consumer->preemption_priority (::IntervalToPriority (interval));
// ACE_DEBUG ((LM_DEBUG,
// "EC (%t) Adding timer at preemption %d, rate = (%d,%d)\n",
// consumer->preemption_priority (),
// interval.low, interval.high));
// Register the timer.
int id =
this->channel_->schedule_timer (consumer->dependency ()->rt_info,
consumer,
consumer->preemption_priority (),
delay, interval);
// Store the timer id for canceling.
consumer->timer_id (id);
if (id == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p schedule timer failed.\n",
"ACE_ES_Correlation_Module::schedule_timeout"), -1);
return 0;
}
// Must check consumer->qos ().timeout_ before calling this.
int
ACE_ES_Correlation_Module::cancel_timeout (ACE_ES_Consumer_Rep_Timeout *consumer)
{
// Cancel the timer from the Priority Timer.
ACE_Command_Base *act;
this->channel_->cancel_timer (consumer->preemption_priority (),
consumer->timer_id (),
act);
ACE_ASSERT (consumer == act);
// Free up the Timer ACT.
// delete act;
return 0;
}
int
ACE_ES_Correlation_Module::reschedule_timeout (ACE_ES_Consumer_Rep_Timeout *consumer)
{
if (this->cancel_timeout (consumer) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "ACE_ES_Disjunction_Group::reschedule_deadline"), -1);
else
{
RtecEventComm::Time &interval =
consumer->dependency ()->event.header.creation_time;
RtecEventComm::Time &delay =
consumer->dependency ()->event.header.creation_time;
// Store the preemption priority so we can cancel the correct timer.
// The priority values may change during the process lifetime (e.g.,
// after the scheduler has been run).
consumer->preemption_priority (::IntervalToPriority (interval));
// Register the timer.
int id =
this->channel_->schedule_timer (0, // Do not pass an RT_Info.
consumer,
consumer->preemption_priority (),
delay, interval);
// Store the timer id for canceling.
consumer->timer_id (id);
if (id == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p schedule timer failed.\n",
"ACE_ES_Correlation_Module::reschedule_timeout"), -1);
return 0;
}
}
void
ACE_ES_Correlation_Module::shutdown (void)
{
// Perhaps this should call disconnecting on all the consumers?
// We'll opt to just forward this message for now.
up_->shutdown ();
}
ACE_ES_Consumer_Correlation::ACE_ES_Consumer_Correlation (void) :
correlation_module_ (0),
type_id_index_ (0),
channel_ (0),
qos_ (),
pending_events_ (0),
lock_ (),
consumer_ (0),
pending_flags_ (0),
consumer_reps_ (0),
n_consumer_reps_ (0),
timer_reps_ (0),
n_timer_reps_ (0),
conjunction_groups_ (0),
n_conjunction_groups_ (0),
disjunction_groups_ (0),
n_disjunction_groups_ (0),
connected_ (0)
{
}
ACE_ES_Consumer_Correlation::~ACE_ES_Consumer_Correlation (void)
{
delete [] timer_reps_;
for (int i = 0; i < this->n_consumer_reps_; ++i)
{
ACE_ES_Consumer_Rep *r = this->consumer_reps_[i];
if (r != 0)
{
this->correlation_module_->unsubscribe (r);
r->_release ();
}
}
delete [] consumer_reps_;
delete [] conjunction_groups_;
delete [] disjunction_groups_;
delete [] pending_events_;
}
void
ACE_ES_Consumer_Correlation::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
this->connected_ = 0;
}
int
ACE_ES_Consumer_Correlation::allocate_correlation_resources (ACE_ES_Dependency_Iterator &iter)
{
n_conjunction_groups_ = iter.n_conjunctions ();
if (n_conjunction_groups_ > 0)
{
conjunction_groups_ = new ACE_ES_Conjunction_Group[n_conjunction_groups_];
if (conjunction_groups_ == 0)
ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
"ACE_ES_Consumer_Correlation::"
"allocate_correlation_resources"), -1);
for (int n=0; n < n_conjunction_groups_; n++)
conjunction_groups_[n].set_correlation_module (correlation_module_);
}
n_disjunction_groups_ = iter.n_disjunctions ();
if (n_disjunction_groups_ > 0)
{
disjunction_groups_ = new ACE_ES_Disjunction_Group[n_disjunction_groups_];
if (disjunction_groups_ == 0)
ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
"ACE_ES_Consumer_Correlation::"
"allocate_correlation_resources"), -1);
for (int n=0; n < n_disjunction_groups_; n++)
disjunction_groups_[n].set_correlation_module (correlation_module_);
}
n_consumer_reps_ = iter.n_events ();
if (n_consumer_reps_ > 0)
{
// This allocates more than is needed if there are repeats:
// (A+B)|(B+C). We allocate these individually so that they can
// be deleted individually.
typedef ACE_ES_Consumer_Rep *reparray;
consumer_reps_ = new reparray[n_consumer_reps_];
for (int cr = 0; cr < n_consumer_reps_; cr++)
{
consumer_reps_[cr] = new ACE_ES_Consumer_Rep;
if (consumer_reps_[cr] == 0)
ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
"ACE_ES_Consumer_Correlation::"
"allocate_correlation_resources"), -1);
}
}
n_timer_reps_ = iter.n_timeouts ();
if (n_timer_reps_ > 0)
{
timer_reps_ = new ACE_ES_Consumer_Rep_Timeout[n_timer_reps_];
if (timer_reps_ == 0)
ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
"ACE_ES_Consumer_Correlation::"
"allocate_correlation_resources"), -1);
}
// This allocates more than is needed.
// @@ throw an exception.
ACE_NEW_RETURN (this->pending_events_,
TAO_EC_Event_Array[n_consumer_reps_ + n_timer_reps_],
-1);
return 0;
}
// We don't need synchronization until after we've been connected and
// subscribed to events.
int
ACE_ES_Consumer_Correlation::connected (ACE_Push_Consumer_Proxy *consumer,
ACE_ES_Correlation_Module *correlation_module)
{
correlation_module_ = correlation_module;
consumer_ = consumer;
// for (CORBA_Types::ULong index=0; index < consumer->qos ().dependencies_.length (); index++)
// consumer->qos ().dependencies_[index].event.dump ();
ACE_ES_Dependency_Iterator iter (consumer->qos ().dependencies);
iter.parse ();
if (this->allocate_correlation_resources (iter) == -1)
return -1;
int cgroup_index = -1;
int dgroup_index = -1;
int crep_index = 0;
int trep_index = 0;
RtecEventComm::EventType group_type = 0;
while (iter.advance_dependency () == 0)
{
// Keep track of how many conjunction and disjunction groups are
// registered. Update the index pointers so that the helper
// functions can update the appropriate group objects.
switch ((*iter).event.header.type)
{
case ACE_ES_CONJUNCTION_DESIGNATOR:
cgroup_index++;
ACE_ASSERT (cgroup_index < n_conjunction_groups_);
group_type = ACE_ES_CONJUNCTION_DESIGNATOR;
continue;
case ACE_ES_DISJUNCTION_DESIGNATOR:
dgroup_index++;
ACE_ASSERT (dgroup_index < n_disjunction_groups_);
group_type = ACE_ES_DISJUNCTION_DESIGNATOR;
continue;
case ACE_ES_GLOBAL_DESIGNATOR:
group_type = ACE_ES_GLOBAL_DESIGNATOR;
continue;
// These Delegate to the appropriate registration method.
#if 0
// @@ TODO rt_info_ is a handle_t now, does checking against
// 0 still make sense?
// Check for a null rt_info.
if ((*iter).rt_info_ == 0)
{
ACE_ERROR ((LM_ERROR, "Found a ConsumerQOS::dependencies[].rt_info_ == 0.\n"));
continue;
}
#endif /* 0 */
case ACE_ES_EVENT_TIMEOUT:
// For backwards compatibility.
case ACE_ES_EVENT_DEADLINE_TI
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -