⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_channel.h

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 H
📖 第 1 页 / 共 4 页
字号:
 * potential increase in concurrency if we used finer grain locks
 * (e.g., lock-per-source).
 */
class TAO_RTOLDEvent_Export ACE_ES_Subscription_Module
{
public:
  /// Default construction.
  ACE_ES_Subscription_Module (ACE_EventChannel *channel);

  /// Link to the adjacent modules.
  void open (ACE_ES_Correlation_Module *up,
             ACE_ES_Supplier_Module *down);

  /// Deletes the lock_.
  ~ACE_ES_Subscription_Module (void);

  /// Register a new consumer.  Calls into <consumer> to figure out the
  /// subscription options.  Returns 0 on success, -1 on failure.
  int subscribe (ACE_ES_Consumer_Rep *consumer);

  /// Removes the -consumer- from any subscription lists.
  int unsubscribe (ACE_ES_Consumer_Rep *consumer);

  void connected (ACE_Push_Supplier_Proxy *supplier
                  ACE_ENV_ARG_DECL_NOT_USED);
  void disconnecting (ACE_Push_Supplier_Proxy *supplier
                      ACE_ENV_ARG_DECL_NOT_USED);

  /// Takes in an event and pushes subscriber sets to the
  /// Correlation Module.
  void push (ACE_Push_Supplier_Proxy *source,
             const TAO_EC_Event &event
             ACE_ENV_ARG_DECL_NOT_USED);

  /// Unsubscribes all consumers from the suppliers.
  void shutdown (void);

private:
  /// Reregister any consumers that registered for <source_id> before
  /// it actually connected to the channel.
  void reregister_consumers (RtecEventComm::EventSourceID source_id);

  /// The channel of all channels.
  ACE_EventChannel *channel_;

  /// Source-only subscribers.
  /*
  typedef ACE_ES_Subscription_Info::Subscriber_Set INT;
  typedef ACE_Null_Mutex SYNCH;
  typedef ACE_Map_Manager<EXT, INT, SYNCH> Source_Collection;
  typedef ACE_Map_Iterator<EXT, INT, SYNCH> Source_Collection_Iterator;
  typedef ACE_Map_Entry<EXT, INT> Source_Collection_Entry;
  Source_Collection source_subscription_info_;
  */

  // = Subscribe helper methods.  Returns 0 on success, -1 on failure.

  int subscribe_all (ACE_ES_Consumer_Rep *consumer);

  int subscribe_type (ACE_ES_Consumer_Rep *consumer,
                      RtecEventComm::EventType type);

  int subscribe_source (ACE_ES_Consumer_Rep *consumer,
                        RtecEventComm::EventSourceID source);

  int subscribe_source_type (ACE_ES_Consumer_Rep *consumer,
                             RtecEventComm::EventSourceID source,
                             RtecEventComm::EventType type);

  int unsubscribe_all (ACE_ES_Consumer_Rep *consumer);

  int unsubscribe_type (ACE_ES_Consumer_Rep *consumer,
                        RtecEventComm::EventType type);

  int unsubscribe_source (ACE_ES_Consumer_Rep *consumer,
                          RtecEventComm::EventSourceID source);

  int unsubscribe_source_type (ACE_ES_Consumer_Rep *consumer,
                               RtecEventComm::EventSourceID source,
                               RtecEventComm::EventType type);

  // = Push helper methods.

  /// Push <event> to all consumers subscribed to all events from
  /// <source>.  Returns 0 on success, -1 on failure.
  int push_source (ACE_Push_Supplier_Proxy *source,
                   const TAO_EC_Event &event
                   ACE_ENV_ARG_DECL);

  /// Push <event> to all consumers subscribed to <event>.type_ from
  /// <source>.  Returns 0 on success, -1 on failure.
  int push_source_type (ACE_Push_Supplier_Proxy *source,
                        const TAO_EC_Event &event
                        ACE_ENV_ARG_DECL);

  /// Push <event> to all_suppliers_.
  void push_all (const TAO_EC_Event &event
                 ACE_ENV_ARG_DECL_NOT_USED);

  /// Next module up stream.
  ACE_ES_Correlation_Module *up_;

  /// Next module down stream.
  ACE_ES_Supplier_Module *down_;

  typedef ACE_Unbounded_Set_Ex_Iterator<ACE_Push_Supplier_Proxy *> Supplier_Iterator;
  typedef ACE_Unbounded_Set_Ex<ACE_Push_Supplier_Proxy *> Suppliers;

  /// All suppliers.
  Suppliers all_suppliers_;

  /// Type-based subscribers.
  ACE_ES_Subscription_Info::Subscriber_Map type_subscribers_;

  /// Source-based subscribers.
  ACE_ES_Subscription_Info::SourceID_Map source_subscribers_;

  /// Protects access to all_suppliers_ and type_suppliers_;
  ACE_ES_RW_LOCK lock_;

  /// The scheduler;
  RtecScheduler::Scheduler_ptr scheduler_;
};

// ************************************************************

/**
 * @class ACE_ES_Supplier_Module
 *
 * @brief Event Service Supplier Proxy Module
 *
 * ProxyPushConsumer factory.
 */
class TAO_RTOLDEvent_Export ACE_ES_Supplier_Module : public POA_RtecEventChannelAdmin::SupplierAdmin
{
public:
  /// Default construction.
  ACE_ES_Supplier_Module (ACE_EventChannel *channel);

  /// Associate the module to a channel.
  void open (ACE_ES_Subscription_Module *up);

  /// Factory method for push supplier proxies.
  virtual RtecEventChannelAdmin::ProxyPushConsumer_ptr
      obtain_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
        ACE_THROW_SPEC ((CORBA::SystemException));

  /// The supplier module acts on behalf of the supplier proxy to
  /// forward events through the channel.
  virtual void push (ACE_Push_Supplier_Proxy *proxy,
                     RtecEventComm::EventSet &event
                     ACE_ENV_ARG_DECL_NOT_USED);

  /// Register the consumer with the Event Service.  This handles all
  /// the details regarding Correlation_Module and Subscription_Module.
  void connected (ACE_Push_Supplier_Proxy *supplier
                  ACE_ENV_ARG_DECL_NOT_USED);

  /// Unregister the consumer from the Event Service.
  void disconnecting (ACE_Push_Supplier_Proxy *supplier
                      ACE_ENV_ARG_DECL_NOT_USED);

  /// Allow transformations to RtecEventComm::PushConsumer.
  RtecEventChannelAdmin::SupplierAdmin_ptr get_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED);

  /// Actively disconnect from all suppliers.
  void shutdown (void);

  /**
   * Fill the QoS with the disjuction off all the publications in
   * this EC.
   * It leaves the gateways out of the list.
   */
  void fill_qos (RtecEventChannelAdmin::SupplierQOS& s_qos);

private:
  typedef ACE_Unbounded_Set_Ex_Iterator<ACE_Push_Supplier_Proxy *> Supplier_Iterator;
  typedef ACE_Unbounded_Set_Ex<ACE_Push_Supplier_Proxy *> Suppliers;

  /// All suppliers.
  Suppliers all_suppliers_;

  /// Protects access to all_suppliers_ and type_suppliers_;
  ACE_ES_MUTEX lock_;

  ACE_ES_Subscription_Module *up_;

  /// Used to test for shutdown.
  ACE_EventChannel *channel_;
};

// ************************************************************

// Forward declarations.
class ACE_EventChannel;

// = Event Channel interfaces.

/**
 * @class ACE_Push_Supplier_Proxy
 *
 * @brief Push Supplier Proxy.
 *
 * To the channel, this is a proxy to suppliers.  To suppliers, it
 * exports a PushConsumer interface.  It is a
 * RtecEventChannelAdmin::ProxyPushConsumer.  Suppliers use this
 * interface to connect to the channel, push events to consumers,
 * and to disconnect from the channel.
 */
class TAO_RTOLDEvent_Export ACE_Push_Supplier_Proxy : public POA_RtecEventChannelAdmin::ProxyPushConsumer, public PortableServer::RefCountServantBase
{
public:
  /// Must be created with an owning supplier admin.
  ACE_Push_Supplier_Proxy (ACE_ES_Supplier_Module *supplier_module);

  // = Operations public to suppliers.

  /**
   * Suppliers connect via this interface.  <push_supplier> is a
   * reference to the supplier.  <qos> represents the publish types of
   * the supplier.
   */
  virtual void connect_push_supplier (
      RtecEventComm::PushSupplier_ptr push_supplier,
      const RtecEventChannelAdmin::SupplierQOS& qos
      ACE_ENV_ARG_DECL_NOT_USED)
        ACE_THROW_SPEC ((CORBA::SystemException,
                         RtecEventChannelAdmin::AlreadyConnected));

  /// Data arriving from a PushSupplier that must be sent to
  /// consumers.  This is the entry point of all events.
  virtual void push (const RtecEventComm::EventSet &event
                     ACE_ENV_ARG_DECL_NOT_USED)
      ACE_THROW_SPEC ((CORBA::SystemException));

  /// Disconnect the supplier from the channel.
  virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
      ACE_THROW_SPEC ((CORBA::SystemException));

  // = Operations for the Event Channel.

  /// Returns 1 if the proxy has been connected to a "remote" client.
  int connected (void);

  /// Actively disconnect from the supplier.
  void shutdown (void);

  // This is a hook so that the Subscription Module can associate
  // state with supplier proxies.
  ACE_ES_Subscription_Info &subscription_info (void);

  /// Filtering criteria.
  RtecEventChannelAdmin::SupplierQOS &qos (void);

  /// Is this object a proxy for -rhs-.  Simple pointer comparison for now.
  int operator== (const RtecEventComm::EventSourceID rhs);

  /// Returns underlying supplier object ref.
  RtecEventComm::EventSourceID source_id (void);

  /// The QoS for this supplier
  const RtecEventChannelAdmin::SupplierQOS& qos (void) const;

private:
  void time_stamp (RtecEventComm::EventSet &event);

private:
  /// Reference to the supplier's qos params.
  RtecEventChannelAdmin::SupplierQOS qos_;

  ACE_ES_Subscription_Info subscription_info_;

  ACE_ES_Supplier_Module *supplier_module_;

  /// We keep a proxy of the Supplier source_id_;
  RtecEventComm::EventSourceID source_id_;

  /// CORBA reference to remote push supplier.
  RtecEventComm::PushSupplier_ptr push_supplier_;
};

// ************************************************************

/**
 * @class ACE_Push_Consumer_Proxy
 *
 * @brief Push Consumer Proxy.
 *
 * This is the channels proxy to a push consumer.  It implements
 * the RtecEventChannelAdmin::ProxyPushSupplier IDL interface.
 * Consumers use this interface to connect and disconnect from the
 * channel.
 */
class TAO_RTOLDEvent_Export ACE_Push_Consumer_Proxy : public POA_RtecEventChannelAdmin::ProxyPushSupplier, public PortableServer::RefCountServantBase
{
public:
  /// Must be created with an consumer admin.
  ACE_Push_Consumer_Proxy (ACE_ES_Consumer_Module *cm);

  /// Default destruction
  virtual ~ACE_Push_Consumer_Proxy (void);

  // = Interfaces exported to consumers.

  /// A push consumer is connecting.  <push_consumer> is a reference to
  /// the consumer.  <qos> is the subscription types for the consumer.
  virtual void connect_push_consumer (
       RtecEventComm::PushConsumer_ptr push_consumer,
       const RtecEventChannelAdmin::ConsumerQOS& qos
       ACE_ENV_ARG_DECL_NOT_USED)
    ACE_THROW_SPEC ((CORBA::SystemException,
                     RtecEventChannelAdmin::AlreadyConnected,
                     RtecEventChannelAdmin::TypeError));

  /// The consumer is disconnecting.
  virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
      ACE_THROW_SPEC ((CORBA::SystemException));

  /// Stop forwarding events to the calling consumer.
  virtual void suspend_connection (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
      ACE_THROW_SPEC ((CORBA::SystemException));

  /// Resume forwarding events to the calling consumer.
  virtual void resume_connection (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
      ACE_THROW_SPEC ((CORBA::SystemException));

  // = Event Channel operations.

  /// Push <events> to push_consumer_.
  void push (const RtecEventComm::EventSet &events
             ACE_ENV_ARG_DECL_NOT_USED);

  /// Returns 1 if the proxy has been connected to a "remote" client.
  int connected (void);

  /// Actively disconnect from the consumer.
  void shutdown (void);

  /// Access the consumer-specific Consumer_Correlation.
  ACE_ES_Consumer_Correlation &correlation (void);

  /// Filtering criteria.
  RtecEventChannelAdmin::ConsumerQOS &qos (void);

private:
  /// A reference to the consumers Quality of Service parameters.
  RtecEventChannelAdmin::ConsumerQOS qos_;

  /// A hook so that the Correlation Module can associate correlation
  /// information with the consumer.
  ACE_ES_Consumer_Correlation correlation_;

  /// Reference to our push consumer.
  RtecEventComm::PushConsumer_var push_consumer_;

  /// TODO: Maybe this should be a _var or _duplicate/_release should
  /// be used
  ACE_ES_Consumer_Module *consumer_module_;
};

#if defined (__ACE_INLINE__)
#include "Event_Channel.i"
#endif /* __ACE_INLINE__ */

#if defined(_MSC_VER) && (_MSC_VER >= 1200)
#pragma warning(pop)
#endif /* _MSC_VER */

#include /**/ "ace/post.h"

#endif /* ACE_EVENT_CHANNEL_H */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -