⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_channel.h

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 H
📖 第 1 页 / 共 4 页
字号:

    // void operator= (const Subscriber_Set &);
    // Copy.

    /// All the consumers that have registered for this event.
    Subscriber_Set consumers_;

    /// Description of the method that generates this event.
    RtecScheduler::Dependency_Info *dependency_info_;
  };

  typedef RtecEventComm::EventType EXT;
  typedef Type_Subscribers *INT;
  typedef ACE_Null_Mutex SYNCH;
  typedef ACE_Map_Manager<EXT, INT, SYNCH> Subscriber_Map;
  typedef ACE_Map_Iterator<EXT, INT, SYNCH> Subscriber_Map_Iterator;
  typedef ACE_Map_Entry<EXT, INT> Subscriber_Map_Entry;

  /// Source-based subscribers.
  Subscriber_Set source_subscribers_;

  /// Type-based subscribers.
  Subscriber_Map type_subscribers_;

  // = These are just typedefs for source-based subscriptions.
  typedef RtecEventComm::EventSourceID sEXT;
  typedef Subscriber_Set *sINT;
  typedef ACE_Map_Manager<sEXT, sINT, SYNCH> SourceID_Map;
  typedef ACE_Map_Iterator<sEXT, sINT, SYNCH> SourceID_Map_Iterator;
  typedef ACE_Map_Entry<sEXT, sINT> SourceID_Map_Entry;

  /// Serializes writes to source_subscribers_ and type_subscribers_.
  ACE_ES_RW_LOCK lock_;

  /**
   * <source_subscribers> contains a mapping of source id to consumer
   * list.  Insert <consumer> into the list of consumers subscribed to
   * <sid>.  Allocate a list for <sid> if necessary.
   */
  static int insert_or_allocate (SourceID_Map &source_subscribers,
                                 ACE_ES_Consumer_Rep *consumer,
                                 RtecEventComm::EventSourceID sid);

  /**
   * Add <consumer> to the set of consumers bound to <type> in
   * <type_subscribers>.  If there is consumer set for <type>, one is
   * allocated.  Returns -1 on failure, 0 otherwise.
   */
  static int insert_or_allocate (Subscriber_Map &type_subscribers,
                                 ACE_ES_Consumer_Rep *consumer,
                                 RtecEventComm::EventType type);

  /**
   * Add <consumer> to the set of consumers bound to <type> in
   * <type_subscribers>.  If there is consumer set for <type>, the
   * operation fails.  Returns -1 on failure, 0 otherwise.
   */
  static int insert_or_fail (Subscriber_Map &type_subscribers,
                             ACE_ES_Consumer_Rep *consumer,
                             RtecEventComm::EventType type,
                             RtecScheduler::Dependency_Info *&dependency);

  /// Remove <consumer> from the consumer set in <type_map> set
  /// corresponding to <type>.
  static int remove (Subscriber_Map &type_map,
                     ACE_ES_Consumer_Rep *consumer,
                     RtecEventComm::EventType type);

  /// Remove <consumer> from the consumer set in the
  /// <source_subscribers> set corresponding to <sid>.
  static int remove (SourceID_Map &source_subscribers,
                     ACE_ES_Consumer_Rep *consumer,
                     RtecEventComm::EventSourceID sid);

  /// Insert all elements of <src> into <dest>.
  static void append_subscribers (Subscriber_Set &dest,
                                  Subscriber_Set &src);
};

// ************************************************************

// Forward declarations.
class ACE_ES_Dispatch_Request;
class ACE_Push_Consumer_Proxy;

/**
 * @class ACE_ES_Consumer_Correlation
 *
 * @brief Event Service Consumer_Correlation
 *
 * There is one Consumer Correlation object per call to
 * connect_push_consumer.  It handles all the consumer's
 * correlation dependencies including timeouts.  This is also a
 * PushSupplier to support event forwarding.
 */
class TAO_RTOLDEvent_Export ACE_ES_Consumer_Correlation : public POA_RtecEventComm::PushSupplier
{
public:
  /// Default construction.
  ACE_ES_Consumer_Correlation (void);

  /// Deletes lock_.
  virtual ~ACE_ES_Consumer_Correlation (void);

  /**
   * Initialization.  <correlation_module> is stored for delegating
   * channel operations.  <consumer> is stored to access the consumers
   * qos and filterin data.  Returns 0 on success, -1 on failure.
   */
  int connected (ACE_Push_Consumer_Proxy *consumer,
                 ACE_ES_Correlation_Module *correlation_module);

  /// Shutdown.
  int disconnecting (void);

  /// Takes <event> and adds it to the correlation.  Returns the
  /// dispatch request that should be forwarded.
  ACE_ES_Dispatch_Request *push (ACE_ES_Consumer_Rep *consumer,
                                 const TAO_EC_Event& event);

  /// Stop forwarding events to the calling consumer.
  void suspend (void);

  /// Resume forwarding events to the calling consumer.
  void resume (void);

  /// Pointer back to the main correlation module.  This is public so
  /// that ACE_ES_Consumer_Rep_Timeout::execute can access it.
  ACE_ES_Correlation_Module *correlation_module_;

private:
  /// Called when the channel disconnects us.
  virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
      ACE_THROW_SPEC ((CORBA::SystemException));

  /// Dynamically allocates structures needed for correlations.  0 on
  /// success, -1 on failure.
  int allocate_correlation_resources (ACE_ES_Dependency_Iterator &iter);

  /// Helper function for this->push.
  ACE_ES_Dispatch_Request * correlate (ACE_ES_Consumer_Rep *cr,
                                       const TAO_EC_Event& event);

  // = Registration helper functions.
  int register_deadline_timeout (RtecEventChannelAdmin::Dependency &dependency,
                                 RtecEventComm::EventType group_type,
                                 int cgindex,
                                 int dgindex,
                                 int &trep_index);
  int register_interval_timeout (RtecEventChannelAdmin::Dependency &dependency,
                                 RtecEventComm::EventType group_type,
                                 int cgindex,
                                 int dgindex,
                                 int &trep_index);
  int register_event (RtecEventChannelAdmin::Dependency &dependency,
                      RtecEventComm::EventType group_type,
                      int cgindex,
                      int dgindex,
                      int &crep_index);

  ACE_ES_Consumer_Rep *get_consumer_rep (RtecEventChannelAdmin::Dependency &dependency,
                                          int &crep_index);
  int new_type_id (void);

  int type_id_index_;

  /// For event forwarding.
  RtecEventChannelAdmin::ProxyPushConsumer_ptr channel_;

  /// Supplier QOS specifications.
  RtecEventChannelAdmin::SupplierQOS qos_;

  // Events waiting to be forwarded.
  TAO_EC_Event_Array *pending_events_;

  // Used to synchronize pending_events_ and by the correlation module.
  /// Used to lock shared state.
  ACE_ES_MUTEX lock_;

  ACE_Push_Consumer_Proxy *consumer_;

  /// A bit is set for each dependency satisfied.
  u_long pending_flags_;

  /// Array of consumer rep pointers.
  ACE_ES_Consumer_Rep **consumer_reps_;
  int n_consumer_reps_;
  ACE_ES_Consumer_Rep_Timeout *timer_reps_;
  int n_timer_reps_;

  ACE_ES_Conjunction_Group *conjunction_groups_;
  int n_conjunction_groups_;
  ACE_ES_Disjunction_Group *disjunction_groups_;
  int n_disjunction_groups_;

  /// True when we're connected to the channel for forwarding.
  int connected_;
};

// ************************************************************

/**
 * @class ACE_ES_ACT
 *
 * @brief Event Service ACT
 *
 */
class TAO_RTOLDEvent_Export ACE_ES_ACT
{
public:
  ACE_ES_ACT (void);
  int has_act_;
  RtecEventComm::Event act_;
};

// ************************************************************

// Forward declarations.
class ACE_ES_Dispatch_Request;

/**
 * @class ACE_ES_Consumer_Module
 *
 * @brief Event Service Consumer Module
 *
 * ProxyPushSupplier factory.
 */
class TAO_RTOLDEvent_Export ACE_ES_Consumer_Module : public POA_RtecEventChannelAdmin::ConsumerAdmin
{
public:
  /// Default construction.
  ACE_ES_Consumer_Module (ACE_EventChannel *channel);

  /// Link to the next module.
  void open (ACE_ES_Dispatching_Module *down);

  /// Factory method for push consumer proxies.
  virtual RtecEventChannelAdmin::ProxyPushSupplier_ptr
      obtain_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
        ACE_THROW_SPEC ((CORBA::SystemException));

  /// Register the consumer with the Event Service.  This handles all
  /// the details regarding Correlation_Module and Subscription_Module.
  void connected (ACE_Push_Consumer_Proxy *consumer
                  ACE_ENV_ARG_DECL_NOT_USED);

  /// Unregister the consumer from the Event Service.
  void disconnecting (ACE_Push_Consumer_Proxy *consumer
                      ACE_ENV_ARG_DECL_NOT_USED);

  virtual void push (const ACE_ES_Dispatch_Request *request
                     ACE_ENV_ARG_DECL_NOT_USED);

  /// Allow transformations to RtecEventChannelAdmin::ConsumerAdmin.
  RtecEventChannelAdmin::ConsumerAdmin_ptr get_ref (ACE_ENV_SINGLE_ARG_DECL_NOT_USED);

  /// This is called by Shutdown_Consumer command objects when a
  /// consumer proxy is ready to be deleted.
  void shutdown_request (ACE_ES_Dispatch_Request *request);

  /// Actively disconnect from all consumers.
  void shutdown (void);

  /**
   * Fill the QoS with the disjuction off all the subscriptions in
   * this EC.
   * It leaves the gateways out of the list.
   */
  void fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos);

private:
  typedef ACE_Unbounded_Set_Ex_Iterator<ACE_Push_Consumer_Proxy *> Consumer_Iterator;
  typedef ACE_Unbounded_Set_Ex<ACE_Push_Consumer_Proxy *> Consumers;

  /// Protects access to all_consumers_.
  ACE_ES_MUTEX lock_;

  Consumers all_consumers_;

  /// Used to test for shutdown.
  ACE_EventChannel *channel_;

  /// Next module down.
  ACE_ES_Dispatching_Module *down_;
};

// ************************************************************

// Forward declaration.
class ACE_ES_Subscription_Module;

/**
 * @class ACE_ES_Correlation_Module
 *
 * @brief Event Service Correlation Module
 *
 */
class TAO_RTOLDEvent_Export ACE_ES_Correlation_Module
{
public:
  /// Default construction.
  ACE_ES_Correlation_Module (ACE_EventChannel *channel);

  /// Link to adjacent modules.
  void open (ACE_ES_Dispatching_Module *up,
             ACE_ES_Subscription_Module *down);

  /// Create the consumers filter object.
  void connected (ACE_Push_Consumer_Proxy *consumer
                  ACE_ENV_ARG_DECL_NOT_USED);

  /// Release the consumers filter object.
  void disconnecting (ACE_Push_Consumer_Proxy *consumer
                      ACE_ENV_ARG_DECL_NOT_USED);

  /**
   * Take in an event and its subscriber.  Apply consumer-specific
   * filters to each event and forward any dispatch requests to the
   * Dispatching Module.
   */
  void push (ACE_ES_Consumer_Rep *consumer,
             const TAO_EC_Event &event
             ACE_ENV_ARG_DECL_NOT_USED);

  // = These are called by ACE_ES_Consumer_Reps.

  /// Forwards to the subscription module.
  int subscribe (ACE_ES_Consumer_Rep *consumer);

  /// Forwards to the subscription module.
  int unsubscribe (ACE_ES_Consumer_Rep *consumer);

  /// Schedule consumer timeout.  Return 0 on success, -1 on failure.
  int schedule_timeout (ACE_ES_Consumer_Rep_Timeout *consumer);

  /// Cancel consumer timeout.  Return 0 on success, -1 on failure.
  int cancel_timeout (ACE_ES_Consumer_Rep_Timeout *consumer);

  /// Reschedule consumer timeout.  Return 0 on success, -1 on failure.
  int reschedule_timeout (ACE_ES_Consumer_Rep_Timeout *consumer);

  /// The master channel.  This is public so that Consumer_Correlation
  /// objects can access it.
  ACE_EventChannel *channel_;

  /// Does nothing.
  void shutdown (void);

private:
  /// Next module up.
  ACE_ES_Dispatching_Module *up_;

  /// Next module down.
  ACE_ES_Subscription_Module *subscription_module_;
};

// ************************************************************

// Forward declaration.
class ACE_ES_Supplier_Module;
class ACE_Push_Supplier_Proxy;

/**
 * @class ACE_ES_Subscription_Module
 *
 * @brief Event Service Subscription Module
 *
 * = SYNCHRONIZATION
 * This is currently implemented with very coarse-grain
 * synchronization.  Basically, there is a single readers/writer
 * lock.  All operations acquire the writer lock to change any
 * subscription record.  All operations acquire a reader lock to
 * read any subscription record.  This is fine for normal
 * operations (which are *all* read operations).  However, the
 * initialization and shutdown periods might benefit from the

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -