📄 event_channel.h
字号:
* potential increase in concurrency if we used finer grain locks
* (e.g., lock-per-source).
*/
class TAO_RTOLDEvent_Export ACE_ES_Subscription_Module
{
public:
/// Default construction.
ACE_ES_Subscription_Module (ACE_EventChannel *channel);
/// Link to the adjacent modules.
void open (ACE_ES_Correlation_Module *up,
ACE_ES_Supplier_Module *down);
/// Deletes the lock_.
~ACE_ES_Subscription_Module (void);
/// Register a new consumer. Calls into <consumer> to figure out the
/// subscription options. Returns 0 on success, -1 on failure.
int subscribe (ACE_ES_Consumer_Rep *consumer);
/// Removes the -consumer- from any subscription lists.
int unsubscribe (ACE_ES_Consumer_Rep *consumer);
void connected (ACE_Push_Supplier_Proxy *supplier
ACE_ENV_ARG_DECL_NOT_USED);
void disconnecting (ACE_Push_Supplier_Proxy *supplier
ACE_ENV_ARG_DECL_NOT_USED);
/// Takes in an event and pushes subscriber sets to the
/// Correlation Module.
void push (ACE_Push_Supplier_Proxy *source,
const TAO_EC_Event &event
ACE_ENV_ARG_DECL_NOT_USED);
/// Unsubscribes all consumers from the suppliers.
void shutdown (void);
private:
/// Reregister any consumers that registered for <source_id> before
/// it actually connected to the channel.
void reregister_consumers (RtecEventComm::EventSourceID source_id);
/// The channel of all channels.
ACE_EventChannel *channel_;
/// Source-only subscribers.
/*
typedef ACE_ES_Subscription_Info::Subscriber_Set INT;
typedef ACE_Null_Mutex SYNCH;
typedef ACE_Map_Manager<EXT, INT, SYNCH> Source_Collection;
typedef ACE_Map_Iterator<EXT, INT, SYNCH> Source_Collection_Iterator;
typedef ACE_Map_Entry<EXT, INT> Source_Collection_Entry;
Source_Collection source_subscription_info_;
*/
// = Subscribe helper methods. Returns 0 on success, -1 on failure.
int subscribe_all (ACE_ES_Consumer_Rep *consumer);
int subscribe_type (ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventType type);
int subscribe_source (ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventSourceID source);
int subscribe_source_type (ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventSourceID source,
RtecEventComm::EventType type);
int unsubscribe_all (ACE_ES_Consumer_Rep *consumer);
int unsubscribe_type (ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventType type);
int unsubscribe_source (ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventSourceID source);
int unsubscribe_source_type (ACE_ES_Consumer_Rep *consumer,
RtecEventComm::EventSourceID source,
RtecEventComm::EventType type);
// = Push helper methods.
/// Push <event> to all consumers subscribed to all events from
/// <source>. Returns 0 on success, -1 on failure.
int push_source (ACE_Push_Supplier_Proxy *source,
const TAO_EC_Event &event
ACE_ENV_ARG_DECL);
/// Push <event> to all consumers subscribed to <event>.type_ from
/// <source>. Returns 0 on success, -1 on failure.
int push_source_type (ACE_Push_Supplier_Proxy *source,
const TAO_EC_Event &event
ACE_ENV_ARG_DECL);
/// Push <event> to all_suppliers_.
void push_all (const TAO_EC_Event &event
ACE_ENV_ARG_DECL_NOT_USED);
/// Next module up stream.
ACE_ES_Correlation_Module *up_;
/// Next module down stream.
ACE_ES_Supplier_Module *down_;
typedef ACE_Unbounded_Set_Ex_Iterator<ACE_Push_Supplier_Proxy *> Supplier_Iterator;
typedef ACE_Unbounded_Set_Ex<ACE_Push_Supplier_Proxy *> Suppliers;
/// All suppliers.
Suppliers all_suppliers_;
/// Type-based subscribers.
ACE_ES_Subscription_Info::Subscriber_Map type_subscribers_;
/// Source-based subscribers.
ACE_ES_Subscription_Info::SourceID_Map source_subscribers_;
/// Protects access to all_suppliers_ and type_suppliers_;
ACE_ES_RW_LOCK lock_;
/// The scheduler;
RtecScheduler::Scheduler_ptr scheduler_;
};
// ************************************************************
/**
* @class ACE_ES_Supplier_Module
*
* @brief Event Service Supplier Proxy Module
*
* ProxyPushConsumer factory.
*/
class TAO_RTOLDEvent_Export ACE_ES_Supplier_Module : public POA_RtecEventChannelAdmin::SupplierAdmin
{
public:
/// Default construction.
ACE_ES_Supplier_Module (ACE_EventChannel *channel);
/// Associate the module to a channel.
void open (ACE_ES_Subscription_Module *up);
/// Factory method for push supplier proxies.
virtual RtecEventChannelAdmin::ProxyPushConsumer_ptr
obtain_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException));
/// The supplier module acts on behalf of the supplier proxy to
/// forward events through the channel.
virtual void push (ACE_Push_Supplier_Proxy *proxy,
RtecEventComm::EventSet &event
ACE_ENV_ARG_DECL_NOT_USED);
/// Register the consumer with the Event Service. This handles all
/// the details regarding Correlation_Module and Subscription_Module.
void connected (ACE_Push_Supplier_Proxy *supplier
ACE_ENV_ARG_DECL_NOT_USED);
/// Unregister the consumer from the Event Service.
void disconnecting (ACE_Push_Supplier_Proxy *supplier
ACE_ENV_ARG_DECL_NOT_USED);
/// Allow transformations to RtecEventComm::PushConsumer.
RtecEventChannelAdmin::SupplierAdmin_ptr get_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED);
/// Actively disconnect from all suppliers.
void shutdown (void);
/**
* Fill the QoS with the disjuction off all the publications in
* this EC.
* It leaves the gateways out of the list.
*/
void fill_qos (RtecEventChannelAdmin::SupplierQOS& s_qos);
private:
typedef ACE_Unbounded_Set_Ex_Iterator<ACE_Push_Supplier_Proxy *> Supplier_Iterator;
typedef ACE_Unbounded_Set_Ex<ACE_Push_Supplier_Proxy *> Suppliers;
/// All suppliers.
Suppliers all_suppliers_;
/// Protects access to all_suppliers_ and type_suppliers_;
ACE_ES_MUTEX lock_;
ACE_ES_Subscription_Module *up_;
/// Used to test for shutdown.
ACE_EventChannel *channel_;
};
// ************************************************************
// Forward declarations.
class ACE_EventChannel;
// = Event Channel interfaces.
/**
* @class ACE_Push_Supplier_Proxy
*
* @brief Push Supplier Proxy.
*
* To the channel, this is a proxy to suppliers. To suppliers, it
* exports a PushConsumer interface. It is a
* RtecEventChannelAdmin::ProxyPushConsumer. Suppliers use this
* interface to connect to the channel, push events to consumers,
* and to disconnect from the channel.
*/
class TAO_RTOLDEvent_Export ACE_Push_Supplier_Proxy : public POA_RtecEventChannelAdmin::ProxyPushConsumer, public PortableServer::RefCountServantBase
{
public:
/// Must be created with an owning supplier admin.
ACE_Push_Supplier_Proxy (ACE_ES_Supplier_Module *supplier_module);
// = Operations public to suppliers.
/**
* Suppliers connect via this interface. <push_supplier> is a
* reference to the supplier. <qos> represents the publish types of
* the supplier.
*/
virtual void connect_push_supplier (
RtecEventComm::PushSupplier_ptr push_supplier,
const RtecEventChannelAdmin::SupplierQOS& qos
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException,
RtecEventChannelAdmin::AlreadyConnected));
/// Data arriving from a PushSupplier that must be sent to
/// consumers. This is the entry point of all events.
virtual void push (const RtecEventComm::EventSet &event
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException));
/// Disconnect the supplier from the channel.
virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException));
// = Operations for the Event Channel.
/// Returns 1 if the proxy has been connected to a "remote" client.
int connected (void);
/// Actively disconnect from the supplier.
void shutdown (void);
// This is a hook so that the Subscription Module can associate
// state with supplier proxies.
ACE_ES_Subscription_Info &subscription_info (void);
/// Filtering criteria.
RtecEventChannelAdmin::SupplierQOS &qos (void);
/// Is this object a proxy for -rhs-. Simple pointer comparison for now.
int operator== (const RtecEventComm::EventSourceID rhs);
/// Returns underlying supplier object ref.
RtecEventComm::EventSourceID source_id (void);
/// The QoS for this supplier
const RtecEventChannelAdmin::SupplierQOS& qos (void) const;
private:
void time_stamp (RtecEventComm::EventSet &event);
private:
/// Reference to the supplier's qos params.
RtecEventChannelAdmin::SupplierQOS qos_;
ACE_ES_Subscription_Info subscription_info_;
ACE_ES_Supplier_Module *supplier_module_;
/// We keep a proxy of the Supplier source_id_;
RtecEventComm::EventSourceID source_id_;
/// CORBA reference to remote push supplier.
RtecEventComm::PushSupplier_ptr push_supplier_;
};
// ************************************************************
/**
* @class ACE_Push_Consumer_Proxy
*
* @brief Push Consumer Proxy.
*
* This is the channels proxy to a push consumer. It implements
* the RtecEventChannelAdmin::ProxyPushSupplier IDL interface.
* Consumers use this interface to connect and disconnect from the
* channel.
*/
class TAO_RTOLDEvent_Export ACE_Push_Consumer_Proxy : public POA_RtecEventChannelAdmin::ProxyPushSupplier, public PortableServer::RefCountServantBase
{
public:
/// Must be created with an consumer admin.
ACE_Push_Consumer_Proxy (ACE_ES_Consumer_Module *cm);
/// Default destruction
virtual ~ACE_Push_Consumer_Proxy (void);
// = Interfaces exported to consumers.
/// A push consumer is connecting. <push_consumer> is a reference to
/// the consumer. <qos> is the subscription types for the consumer.
virtual void connect_push_consumer (
RtecEventComm::PushConsumer_ptr push_consumer,
const RtecEventChannelAdmin::ConsumerQOS& qos
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException,
RtecEventChannelAdmin::AlreadyConnected,
RtecEventChannelAdmin::TypeError));
/// The consumer is disconnecting.
virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException));
/// Stop forwarding events to the calling consumer.
virtual void suspend_connection (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException));
/// Resume forwarding events to the calling consumer.
virtual void resume_connection (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException));
// = Event Channel operations.
/// Push <events> to push_consumer_.
void push (const RtecEventComm::EventSet &events
ACE_ENV_ARG_DECL_NOT_USED);
/// Returns 1 if the proxy has been connected to a "remote" client.
int connected (void);
/// Actively disconnect from the consumer.
void shutdown (void);
/// Access the consumer-specific Consumer_Correlation.
ACE_ES_Consumer_Correlation &correlation (void);
/// Filtering criteria.
RtecEventChannelAdmin::ConsumerQOS &qos (void);
private:
/// A reference to the consumers Quality of Service parameters.
RtecEventChannelAdmin::ConsumerQOS qos_;
/// A hook so that the Correlation Module can associate correlation
/// information with the consumer.
ACE_ES_Consumer_Correlation correlation_;
/// Reference to our push consumer.
RtecEventComm::PushConsumer_var push_consumer_;
/// TODO: Maybe this should be a _var or _duplicate/_release should
/// be used
ACE_ES_Consumer_Module *consumer_module_;
};
#if defined (__ACE_INLINE__)
#include "Event_Channel.i"
#endif /* __ACE_INLINE__ */
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
#pragma warning(pop)
#endif /* _MSC_VER */
#include /**/ "ace/post.h"
#endif /* ACE_EVENT_CHANNEL_H */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -