📄 event_channel.h
字号:
// void operator= (const Subscriber_Set &);
// Copy.
/// All the consumers that have registered for this event.
Subscriber_Set consumers_;
/// Description of the method that generates this event.
RtecScheduler::Dependency_Info *dependency_info_;
};
typedef RtecEventComm::EventType EXT;
typedef Type_Subscribers *INT;
typedef ACE_Null_Mutex SYNCH;
typedef ACE_Map_Manager<EXT, INT, SYNCH> Subscriber_Map;
typedef ACE_Map_Iterator<EXT, INT, SYNCH> Subscriber_Map_Iterator;
typedef ACE_Map_Entry<EXT, INT> Subscriber_Map_Entry;
/// Source-based subscribers.
Subscriber_Set source_subscribers_;
/// Type-based subscribers.
Subscriber_Map type_subscribers_;
// = These are just typedefs for source-based subscriptions.
typedef RtecEventComm::EventSourceID sEXT;
typedef Subscriber_Set *sINT;
typedef ACE_Map_Manager<sEXT, sINT, SYNCH> SourceID_Map;
typedef ACE_Map_Iterator<sEXT, sINT, SYNCH> SourceID_Map_Iterator;
typedef ACE_Map_Entry<sEXT, sINT> SourceID_Map_Entry;
/// Serializes writes to source_subscribers_ and type_subscribers_.
ACE_ES_RW_LOCK lock_;
/**
* <source_subscribers> contains a mapping of source id to consumer
* list. Insert <consumer> into the list of consumers subscribed to
* <sid>. Allocate a list for <sid> if necessary.
*/
static int insert_or_allocate (SourceID_Map &source_subscribers,
ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventSourceID sid);
/**
* Add <consumer> to the set of consumers bound to <type> in
* <type_subscribers>. If there is consumer set for <type>, one is
* allocated. Returns -1 on failure, 0 otherwise.
*/
static int insert_or_allocate (Subscriber_Map &type_subscribers,
ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventType type);
/**
* Add <consumer> to the set of consumers bound to <type> in
* <type_subscribers>. If there is consumer set for <type>, the
* operation fails. Returns -1 on failure, 0 otherwise.
*/
static int insert_or_fail (Subscriber_Map &type_subscribers,
ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventType type,
RtecScheduler::Dependency_Info *&dependency);
/// Remove <consumer> from the consumer set in <type_map> set
/// corresponding to <type>.
static int remove (Subscriber_Map &type_map,
ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventType type);
/// Remove <consumer> from the consumer set in the
/// <source_subscribers> set corresponding to <sid>.
static int remove (SourceID_Map &source_subscribers,
ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventSourceID sid);
/// Insert all elements of <src> into <dest>.
static void append_subscribers (Subscriber_Set &dest,
Subscriber_Set &src);
};
// ************************************************************
// Forward declarations.
class ACE_ES_Dispatch_Request;
class ACE_Push_Consumer_Proxy;
/**
* @class ACE_ES_Consumer_Correlation
*
* @brief Event Service Consumer_Correlation
*
* There is one Consumer Correlation object per call to
* connect_push_consumer. It handles all the consumer's
* correlation dependencies including timeouts. This is also a
* PushSupplier to support event forwarding.
*/
class TAO_RTOLDEvent_Export ACE_ES_Consumer_Correlation : public POA_RtecEventComm::PushSupplier
{
public:
/// Default construction.
ACE_ES_Consumer_Correlation (void);
/// Deletes lock_.
virtual ~ACE_ES_Consumer_Correlation (void);
/**
* Initialization. <correlation_module> is stored for delegating
* channel operations. <consumer> is stored to access the consumers
* qos and filterin data. Returns 0 on success, -1 on failure.
*/
int connected (ACE_Push_Consumer_Proxy *consumer,
ACE_ES_Correlation_Module *correlation_module);
/// Shutdown.
int disconnecting (void);
/// Takes <event> and adds it to the correlation. Returns the
/// dispatch request that should be forwarded.
ACE_ES_Dispatch_Request *push (ACE_ES_Consumer_Rep *consumer,
const TAO_EC_Event& event);
/// Stop forwarding events to the calling consumer.
void suspend (void);
/// Resume forwarding events to the calling consumer.
void resume (void);
/// Pointer back to the main correlation module. This is public so
/// that ACE_ES_Consumer_Rep_Timeout::execute can access it.
ACE_ES_Correlation_Module *correlation_module_;
private:
/// Called when the channel disconnects us.
virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException));
/// Dynamically allocates structures needed for correlations. 0 on
/// success, -1 on failure.
int allocate_correlation_resources (ACE_ES_Dependency_Iterator &iter);
/// Helper function for this->push.
ACE_ES_Dispatch_Request * correlate (ACE_ES_Consumer_Rep *cr,
const TAO_EC_Event& event);
// = Registration helper functions.
int register_deadline_timeout (RtecEventChannelAdmin::Dependency &dependency,
RtecEventComm::EventType group_type,
int cgindex,
int dgindex,
int &trep_index);
int register_interval_timeout (RtecEventChannelAdmin::Dependency &dependency,
RtecEventComm::EventType group_type,
int cgindex,
int dgindex,
int &trep_index);
int register_event (RtecEventChannelAdmin::Dependency &dependency,
RtecEventComm::EventType group_type,
int cgindex,
int dgindex,
int &crep_index);
ACE_ES_Consumer_Rep *get_consumer_rep (RtecEventChannelAdmin::Dependency &dependency,
int &crep_index);
int new_type_id (void);
int type_id_index_;
/// For event forwarding.
RtecEventChannelAdmin::ProxyPushConsumer_ptr channel_;
/// Supplier QOS specifications.
RtecEventChannelAdmin::SupplierQOS qos_;
// Events waiting to be forwarded.
TAO_EC_Event_Array *pending_events_;
// Used to synchronize pending_events_ and by the correlation module.
/// Used to lock shared state.
ACE_ES_MUTEX lock_;
ACE_Push_Consumer_Proxy *consumer_;
/// A bit is set for each dependency satisfied.
u_long pending_flags_;
/// Array of consumer rep pointers.
ACE_ES_Consumer_Rep **consumer_reps_;
int n_consumer_reps_;
ACE_ES_Consumer_Rep_Timeout *timer_reps_;
int n_timer_reps_;
ACE_ES_Conjunction_Group *conjunction_groups_;
int n_conjunction_groups_;
ACE_ES_Disjunction_Group *disjunction_groups_;
int n_disjunction_groups_;
/// True when we're connected to the channel for forwarding.
int connected_;
};
// ************************************************************
/**
* @class ACE_ES_ACT
*
* @brief Event Service ACT
*
*/
class TAO_RTOLDEvent_Export ACE_ES_ACT
{
public:
ACE_ES_ACT (void);
int has_act_;
RtecEventComm::Event act_;
};
// ************************************************************
// Forward declarations.
class ACE_ES_Dispatch_Request;
/**
* @class ACE_ES_Consumer_Module
*
* @brief Event Service Consumer Module
*
* ProxyPushSupplier factory.
*/
class TAO_RTOLDEvent_Export ACE_ES_Consumer_Module : public POA_RtecEventChannelAdmin::ConsumerAdmin
{
public:
/// Default construction.
ACE_ES_Consumer_Module (ACE_EventChannel *channel);
/// Link to the next module.
void open (ACE_ES_Dispatching_Module *down);
/// Factory method for push consumer proxies.
virtual RtecEventChannelAdmin::ProxyPushSupplier_ptr
obtain_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException));
/// Register the consumer with the Event Service. This handles all
/// the details regarding Correlation_Module and Subscription_Module.
void connected (ACE_Push_Consumer_Proxy *consumer
ACE_ENV_ARG_DECL_NOT_USED);
/// Unregister the consumer from the Event Service.
void disconnecting (ACE_Push_Consumer_Proxy *consumer
ACE_ENV_ARG_DECL_NOT_USED);
virtual void push (const ACE_ES_Dispatch_Request *request
ACE_ENV_ARG_DECL_NOT_USED);
/// Allow transformations to RtecEventChannelAdmin::ConsumerAdmin.
RtecEventChannelAdmin::ConsumerAdmin_ptr get_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED);
/// This is called by Shutdown_Consumer command objects when a
/// consumer proxy is ready to be deleted.
void shutdown_request (ACE_ES_Dispatch_Request *request);
/// Actively disconnect from all consumers.
void shutdown (void);
/**
* Fill the QoS with the disjuction off all the subscriptions in
* this EC.
* It leaves the gateways out of the list.
*/
void fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos);
private:
typedef ACE_Unbounded_Set_Ex_Iterator<ACE_Push_Consumer_Proxy *> Consumer_Iterator;
typedef ACE_Unbounded_Set_Ex<ACE_Push_Consumer_Proxy *> Consumers;
/// Protects access to all_consumers_.
ACE_ES_MUTEX lock_;
Consumers all_consumers_;
/// Used to test for shutdown.
ACE_EventChannel *channel_;
/// Next module down.
ACE_ES_Dispatching_Module *down_;
};
// ************************************************************
// Forward declaration.
class ACE_ES_Subscription_Module;
/**
* @class ACE_ES_Correlation_Module
*
* @brief Event Service Correlation Module
*
*/
class TAO_RTOLDEvent_Export ACE_ES_Correlation_Module
{
public:
/// Default construction.
ACE_ES_Correlation_Module (ACE_EventChannel *channel);
/// Link to adjacent modules.
void open (ACE_ES_Dispatching_Module *up,
ACE_ES_Subscription_Module *down);
/// Create the consumers filter object.
void connected (ACE_Push_Consumer_Proxy *consumer
ACE_ENV_ARG_DECL_NOT_USED);
/// Release the consumers filter object.
void disconnecting (ACE_Push_Consumer_Proxy *consumer
ACE_ENV_ARG_DECL_NOT_USED);
/**
* Take in an event and its subscriber. Apply consumer-specific
* filters to each event and forward any dispatch requests to the
* Dispatching Module.
*/
void push (ACE_ES_Consumer_Rep *consumer,
const TAO_EC_Event &event
ACE_ENV_ARG_DECL_NOT_USED);
// = These are called by ACE_ES_Consumer_Reps.
/// Forwards to the subscription module.
int subscribe (ACE_ES_Consumer_Rep *consumer);
/// Forwards to the subscription module.
int unsubscribe (ACE_ES_Consumer_Rep *consumer);
/// Schedule consumer timeout. Return 0 on success, -1 on failure.
int schedule_timeout (ACE_ES_Consumer_Rep_Timeout *consumer);
/// Cancel consumer timeout. Return 0 on success, -1 on failure.
int cancel_timeout (ACE_ES_Consumer_Rep_Timeout *consumer);
/// Reschedule consumer timeout. Return 0 on success, -1 on failure.
int reschedule_timeout (ACE_ES_Consumer_Rep_Timeout *consumer);
/// The master channel. This is public so that Consumer_Correlation
/// objects can access it.
ACE_EventChannel *channel_;
/// Does nothing.
void shutdown (void);
private:
/// Next module up.
ACE_ES_Dispatching_Module *up_;
/// Next module down.
ACE_ES_Subscription_Module *subscription_module_;
};
// ************************************************************
// Forward declaration.
class ACE_ES_Supplier_Module;
class ACE_Push_Supplier_Proxy;
/**
* @class ACE_ES_Subscription_Module
*
* @brief Event Service Subscription Module
*
* = SYNCHRONIZATION
* This is currently implemented with very coarse-grain
* synchronization. Basically, there is a single readers/writer
* lock. All operations acquire the writer lock to change any
* subscription record. All operations acquire a reader lock to
* read any subscription record. This is fine for normal
* operations (which are *all* read operations). However, the
* initialization and shutdown periods might benefit from the
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -