📄 ecg_cdr_message_receiver.h
字号:
/* -*- C++ -*- */
/**
* @file ECG_CDR_Message_Receiver.h
*
* ECG_CDR_Message_Receiver.h,v 1.6 2003/11/04 05:21:32 dhinton Exp
*
* @author Carlos O'Ryan (coryan@cs.wustl.edu)
* @author Marina Spivak (marina@atdesk.com)
*/
#ifndef TAO_ECG_CDR_MESSAGE_RECEIVER_H
#define TAO_ECG_CDR_MESSAGE_RECEIVER_H
#include /**/ "ace/pre.h"
#include "ECG_UDP_Out_Endpoint.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
#include "tao/CDR.h"
#include "tao/Environment.h"
#include "ace/Hash_Map_Manager.h"
#include "ace/INET_Addr.h"
#include "ace/Null_Mutex.h"
/**
* @class TAO_ECG_CDR_Processor
*
* @brief Interface for callback objects used by
* TAO_ECG_CDR_Message_Receiver to propagate received data to
* its callers.
*/
class TAO_ECG_CDR_Processor
{
public:
virtual ~TAO_ECG_CDR_Processor (void);
/// Extracts data from <cdr>. Returns 0 on success, -1 on error.
virtual int decode (TAO_InputCDR &cdr) = 0;
};
// ****************************************************************
/**
* @class TAO_ECG_UDP_Request_Entry
*
* @brief Keeps information about an incomplete request.
*
* When a request arrives in fragments this object is used to
* keep track of the incoming data.
*/
class TAO_ECG_UDP_Request_Entry
{
public:
enum {
ECG_DEFAULT_FRAGMENT_BUFSIZ = 8
};
/// Initialize the fragment, allocating memory, etc.
TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order,
CORBA::ULong request_id,
CORBA::ULong request_size,
CORBA::ULong fragment_count);
~TAO_ECG_UDP_Request_Entry (void);
/// Validate a fragment, it should be rejected if it is invalid..
int validate_fragment (CORBA::Boolean byte_order,
CORBA::ULong request_size,
CORBA::ULong fragment_size,
CORBA::ULong fragment_offset,
CORBA::ULong fragment_id,
CORBA::ULong fragment_count) const;
/// Has @a fragment_id been received?
int test_received (CORBA::ULong fragment_id) const;
/// Mark @a fragment_id as received, reset timeout counter...
void mark_received (CORBA::ULong fragment_id);
/// Is the message complete?
int complete (void) const;
/// Return a buffer for the fragment at offset @a fragment_offset
char* fragment_buffer (CORBA::ULong fragment_offset);
private:
TAO_ECG_UDP_Request_Entry (const TAO_ECG_UDP_Request_Entry & rhs);
TAO_ECG_UDP_Request_Entry& operator= (const TAO_ECG_UDP_Request_Entry & rhs);
private:
/// This attributes should remain constant in all the fragments, used
/// for validation....
CORBA::Boolean byte_order_;
CORBA::ULong request_id_;
CORBA::ULong request_size_;
CORBA::ULong fragment_count_;
ACE_Message_Block payload_;
/// This is a bit vector, used to keep track of the received buffers.
CORBA::ULong* received_fragments_;
int own_received_fragments_;
CORBA::ULong received_fragments_size_;
CORBA::ULong default_received_fragments_[ECG_DEFAULT_FRAGMENT_BUFSIZ];
};
// ****************************************************************
/**
* @class TAO_ECG_CDR_Message_Receiver
*
* @brief Receives UDP and Multicast messages.
*
* @todo Update class documentation below.
*
* 5) Make status array size and purge_count configurable.
*
* This class receives UDP and Multicast message fragments, assembles
* them (described in detail below), and passes complete messages
* in the form of cdr streams to the calling classes.
*
* This class is used by various Gateway classes (Senders/Receivers)
* responsible for federating Event Channels with UDP/Mcast.
*
* = REASSEMBLY
* Fragmentation is described in ECG_CDR_Message_Sender.h
* Whenever an incomplete fragment is received (one with
* fragment_count > 1) we allocate an entry for the message in an
* map indexed by (host,port,request_id). The entry contains the
* buffer, a bit vector to keep track of the fragments received
* so far, and a timeout counter. This timeout counter is set to
* 0 on each (new) fragment arrival, and incremented on a regular
* basis. If the counter reaches a maximum value the message is
* dropped.
* Once all the fragments have been received the message is sent
* up to the calling classes, and the memory reclaimed.
*/
class TAO_RTEvent_Export TAO_ECG_CDR_Message_Receiver
{
public:
/// Initialization and termination methods.
//@{
TAO_ECG_CDR_Message_Receiver (CORBA::Boolean check_crc);
~TAO_ECG_CDR_Message_Receiver (void);
/**
* @param ignore_from Endpoint used to remove events generated by
* the same process.
*/
void init (TAO_ECG_Refcounted_Endpoint ignore_from
/* , ACE_Lock *lock = 0 */);
// Shutdown the component: close down the request map, etc.
void shutdown (void);
//@}
/// Main method: read the data from @a dgram and either pass ready data
/// to @a cdr_processor or update the <request_map_> if the request
/// is not yet complete.
/**
* Returns 1 if data was read successfully and accepted by
* <cdr_processor> without errors.
* Returns 0 if there were no errors, but no data has been passed to
* <cdr_processor>, either due to request being incomplete (not all
* fragments received), or it being a duplicate.
* Returns -1 if there were errors.
*/
int handle_input (ACE_SOCK_Dgram& dgram,
TAO_ECG_CDR_Processor *cdr_processor);
/// Represents any request that has been fully received and
/// serviced, to simplify the internal logic.
static TAO_ECG_UDP_Request_Entry Request_Completed_;
private:
enum {
ECG_DEFAULT_MAX_FRAGMENTED_REQUESTS = 1024,
ECG_DEFAULT_FRAGMENTED_REQUESTS_MIN_PURGE_COUNT = 32
};
struct Mcast_Header;
class Requests;
typedef ACE_Hash_Map_Manager<ACE_INET_Addr,
Requests*,
ACE_Null_Mutex> Request_Map;
private:
/// Returns 1 on success, 0 if <request_id> has already been
/// received or is below current request range, and -1 on error.
int mark_received (const ACE_INET_Addr &from,
CORBA::ULong request_id);
/// Returns 1 if complete request is received and <event> is
/// populated, 0 if request has only partially been received or is a
/// duplicate, and -1 on error.
int process_fragment (const ACE_INET_Addr &from,
const Mcast_Header &header,
char * data_buf,
TAO_ECG_CDR_Processor *cdr_processor);
Request_Map::ENTRY* get_source_entry (const ACE_INET_Addr &from);
private:
/// Ignore any events coming from this IP address.
TAO_ECG_Refcounted_Endpoint ignore_from_;
/// The map containing all the incoming requests which have been
/// partially received.
Request_Map request_map_;
/// Serializes use of <request_map_>.
// ACE_Lock* lock_;
/// Size of a fragmented requests array, i.e., max number of
/// partially received requests kept at any given time per source.
size_t max_requests_;
/// Minimum number of requests purged from a fragmented requests
/// array when the range of requests represented there needs to be
/// shifted.
size_t min_purge_count_;
/// Flag to indicate whether CRC should be computed and checked.
CORBA::Boolean check_crc_;
};
// ****************************************************************
/// Helper for decoding, validating and storing mcast header.
struct TAO_ECG_CDR_Message_Receiver::Mcast_Header
{
int byte_order;
CORBA::ULong request_id;
CORBA::ULong request_size;
CORBA::ULong fragment_size;
CORBA::ULong fragment_offset;
CORBA::ULong fragment_id;
CORBA::ULong fragment_count;
CORBA::ULong crc;
int read (char * header,
size_t bytes_received,
CORBA::Boolean checkcrc = 0);
};
// ****************************************************************
/// Once init() has been called:
/// Invariant: id_range_high_- id_range_low_ == size_ - 1
class TAO_ECG_CDR_Message_Receiver::Requests
{
public:
Requests (void);
~Requests (void);
/// Allocates and initializes <fragmented_requests_>.
int init (size_t size, size_t min_purge_count);
/// Returns pointer to a <fragmented_requests_> element
/// representing <request_id>.
/**
* If <request_id> < <id_range_low> return 0.
* If <request_id> > <id_range_high>, shift the range so it
* includes <request_id>, purging incomplete requests as needed.
*/
TAO_ECG_UDP_Request_Entry ** get_request (CORBA::ULong request_id);
private:
/// Delete any outstanding requests with ids in the range
/// [<purge_first>, <purge_last>] from <fragmented_requests> and
/// and reset their slots.
void purge_requests (CORBA::ULong purge_first,
CORBA::ULong purge_last);
Requests & operator= (const Requests &rhs);
Requests (const Requests &rhs);
private:
/// Array, used in a circular fashion, that stores partially received
/// requests (and info on which requests have been fully received
/// and processed) for a range of request ids.
TAO_ECG_UDP_Request_Entry** fragmented_requests_;
/// Size of <fragmented_requests_> array.
size_t size_;
/// The range of request ids, currently represented in
/// <fragmented_requests>.
//@{
CORBA::ULong id_range_low_;
CORBA::ULong id_range_high_;
//@}
/// Minimum range shifting amount.
size_t min_purge_count_;
};
#if defined(__ACE_INLINE__)
#include "ECG_CDR_Message_Receiver.i"
#endif /* __ACE_INLINE__ */
#include /**/ "ace/post.h"
#endif /* TAO_ECG_CDR_MESSAGE_RECEIVER_H */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -