📄 ec_mcast.h
字号:
//
public:
ECM_Local_Federation (ECM_Federation *federation,
ECM_Driver *driver);
// Constructor.
~ECM_Local_Federation (void);
// Destructor
void open (int event_count,
RtecEventChannelAdmin::EventChannel_ptr event_channel
ACE_ENV_ARG_DECL);
// Connect both the supplier and the consumer.
void close (ACE_ENV_SINGLE_ARG_DECL);
// Disconnect everybody from the EC
void activate (RtecEventChannelAdmin::EventChannel_ptr event_channel,
RtecEventComm::Time interval
ACE_ENV_ARG_DECL);
// Activate the supplier
void supplier_timeout (RtecEventComm::PushConsumer_ptr consumer
ACE_ENV_ARG_DECL);
// The supplier is ready to send a new event.
void consumer_push (ACE_hrtime_t arrival,
const RtecEventComm::EventSet& event
ACE_ENV_ARG_DECL);
// The consumer just received an event.
const ECM_Federation *federation (void) const;
// The federation description.
void open_receiver (RtecEventChannelAdmin::EventChannel_ptr ec,
TAO_ECG_Refcounted_Endpoint ignore_from
ACE_ENV_ARG_DECL);
// Connect the UDP receiver to the EC.
void close_receiver (ACE_ENV_SINGLE_ARG_DECL);
// Close the UDP receiver, disconnect from the EC
void dump_results (void) const;
// Report the results back to the user...
void subscribed_bit (int i, CORBA::Boolean x);
CORBA::Boolean subscribed_bit (int i) const;
// Set&Get the subscribed bit; this defines the subset of events
// that we actually publish.
// = Delegate on the federation description
const char* name (void) const;
CORBA::UShort mcast_port (void) const;
int supplier_types (void) const;
const char* supplier_name (CORBA::ULong i) const;
CORBA::ULong supplier_ipaddr (CORBA::ULong i) const;
int consumer_types (void) const;
const char* consumer_name (CORBA::ULong i) const;
CORBA::ULong consumer_ipaddr (CORBA::ULong i) const;
private:
ECM_Federation *federation_;
// The description of the events we send and receive.
ECM_Driver *driver_;
// The test driver.
ECM_Consumer consumer_;
ECM_Supplier supplier_;
// The supplier and consumer helper classes, other than
// initialization this classes only forward events to the
// Federation.
// Collect statistics
CORBA::ULong recv_count_;
// Messages received.
CORBA::ULong unfiltered_count_;
// Messages received that were not properly filtered.
CORBA::ULong invalid_count_;
// Message received that could *not* be destined to this federation,
// yet they were received.
CORBA::ULong send_count_;
// Messages sent.
int event_count_;
// How many messages will we send before stop the simulation.
ACE_Time_Value last_publication_change_;
// The last time we changed our publication list, we don't change it
// too often.
ACE_Time_Value last_subscription_change_;
// The last time we changed our publication, so we don't change too
// often.
TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver_;
// This object reads the events and pushes them into the EC. Notice
// that it can receive events from multiple Event Handlers.
TAO_ECG_Mcast_EH* mcast_eh_;
// The event handler, it receives callbacks from the reactor
// whenever an event is available in some of the multicast groups,
// it then forwards to the <mcast_recv_> object for processing and
// dispatching of the event.
// @@ TODO Eventually we may need several of this objects to handle
// OS limitations on the number of multicast groups per socket.
ACE_RANDR_TYPE seed_;
// The seed for a random number generator.
CORBA::ULong subscription_change_period_;
// The (average) period between subscription changes, in usecs
CORBA::ULong publication_change_period_;
// The (average) period between publication changes, in usecs
CORBA::Boolean* subscription_subset_;
// The events we are actually subscribed to.
};
class ECM_Driver
{
//
// = TITLE
// Demonstrate the use of the UDP Gateways.
//
// = DESCRIPTION
// This class is design to exercise several features of the UDP
// Gateways and its companion classes.
// We create a set of processes, each running one EC, with
// multiple consumers and suppliers colocated with the EC.
// The ECs communicate among themselves using multicast.
// The test thus show how to use multicast, change the local
// ECG_UDP_Receiver and ECG_UDP_Sender QoS specifications
// dynamically, how to economically use the OS resources to
// receive and send multicast messages, etc.
//
public:
ECM_Driver (void);
enum {
MAX_EVENTS = 1024,
// Maximum number of events to send on each Federation.
MAX_LOCAL_FEDERATIONS = 16,
// Maximum number of federations running on a single process
MAX_FEDERATIONS = 128
// Maximum number of federations in the simulation
};
int run (int argc, char* argv[]);
// Run the test, read all the configuration files, etc.
void federation_has_shutdown (ECM_Local_Federation *federation
ACE_ENV_ARG_DECL);
// One of the federations has completed its simulation, once all of
// them finish the test exists.
private:
void open_federations (RtecEventChannelAdmin::EventChannel_ptr ec
ACE_ENV_ARG_DECL);
// Connect the federations to the EC.
void activate_federations (RtecEventChannelAdmin::EventChannel_ptr ec
ACE_ENV_ARG_DECL);
// Activate all the federations
void close_federations (ACE_ENV_SINGLE_ARG_DECL);
// Close the federations, i.e. disconnect from the EC, deactivate
// the objects, etc.
void open_senders (RtecEventChannelAdmin::EventChannel_ptr ec
ACE_ENV_ARG_DECL);
// Connect all the senders, so we can start multicasting events.
void open_receivers (RtecEventChannelAdmin::EventChannel_ptr ec
ACE_ENV_ARG_DECL);
// Connect all the receivers, thus we accept events arriving through
// multicast.
void close_senders (ACE_ENV_SINGLE_ARG_DECL);
// Close all the senders to cleanup resources.
void close_receivers (ACE_ENV_SINGLE_ARG_DECL);
// Close all the receivers to cleanup resources.
int shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED);
// Called when the main thread.
int parse_args (int argc, char* argv[]);
// parse the command line arguments
int parse_config_file (void);
// parse the command line arguments
int parse_name_list (FILE* file, int n, char** names,
const char* error_msg);
// parse one of the lists of names in the federation definition.
int skip_blanks (FILE* file,
const char* error_msg);
// skip the blanks in the file.
void dump_results (void);
// Dump the results to the standard output.
private:
int event_period_;
// The events are generated using this interval, in microseconds.
int event_count_;
// How many events will the suppliers send
char* config_filename_;
// The name of the file where we read the configuration.
const char* pid_filename_;
// The name of a file where the process stores its pid
int local_federations_count_;
// How many federations are running in this process (or, if you
// prefer, in how many federations does this process participate).
ECM_Local_Federation* local_federations_[MAX_LOCAL_FEDERATIONS];
// The local federations.
char* local_names_[MAX_LOCAL_FEDERATIONS];
// The names of the local federations.
int all_federations_count_;
// The total number of federations we belong to.
ECM_Federation* all_federations_[MAX_FEDERATIONS];
// All the federations.
ACE_Atomic_Op<TAO_SYNCH_MUTEX,CORBA::ULong> federations_running_;
// Keep track of how many federations are active so we can shutdown
// once they are all destroyed.
ACE_hrtime_t test_start_;
ACE_hrtime_t test_stop_;
// Measure the test elapsed time as well as mark the beginning of
// the frames.
CORBA::ORB_var orb_;
// The ORB, so we can shut it down.
TAO_ECG_UDP_Out_Endpoint endpoint_;
// This socket is shared by all the federations to send the
// multicast events.
};
#if defined (__ACE_INLINE__)
#include "EC_Mcast.i"
#endif /* __ACE_INLINE__ */
#endif /* EC_MCAST_H */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -