📄 iqueue.h
字号:
// List of sources, ordered from older to newer SyncSourceLink* first, * last;};/** * @class IncomingDataQueue * @short Queue for incoming RTP data packets in an RTP session. * * @author Federico Montesino Pouzols <fedemp@altern.org> **/class __EXPORT IncomingDataQueue: public IncomingDataQueueBase, protected MembershipBookkeeping{public: /** * @class SyncSourcesIterator * @short iterator through the list of synchronizations * sources in this session **/ class SyncSourcesIterator { public: typedef std::forward_iterator_tag iterator_category; typedef SyncSource value_type; typedef ptrdiff_t difference_type; typedef const SyncSource* pointer; typedef const SyncSource& reference; SyncSourcesIterator(SyncSourceLink* l = NULL) : link(l) { } SyncSourcesIterator(const SyncSourcesIterator& si) : link(si.link) { } reference operator*() const { return *(link->getSource()); } pointer operator->() const { return link->getSource(); } SyncSourcesIterator& operator++() { link = link->getNext(); return *this; } SyncSourcesIterator operator++(int) { SyncSourcesIterator result(*this); ++(*this); return result; } friend bool operator==(const SyncSourcesIterator& l, const SyncSourcesIterator& r) { return l.link == r.link; } friend bool operator!=(const SyncSourcesIterator& l, const SyncSourcesIterator& r) { return l.link != r.link; } private: SyncSourceLink *link; }; SyncSourcesIterator begin() { return SyncSourcesIterator(MembershipBookkeeping::getFirst()); } SyncSourcesIterator end() { return SyncSourcesIterator(NULL); } /** * Retreive data from a specific timestamped packet if such a * packet is currently available in the receive buffer. * * @param stamp Data unit timestamp. * @param src Optional synchronization source selector. * @return data retrieved from the reception buffer. **/ const AppDataUnit* getData(uint32 stamp, const SyncSource* src = NULL); /** * Determine if packets are waiting in the reception queue. * * @param src Optional synchronization source selector. * @return True if packets are waiting. */ bool isWaiting(const SyncSource* src = NULL) const; /** * Get timestamp of first packet waiting in the queue. * * @param src optional source selector. * @return timestamp of first arrival packet. **/ uint32 getFirstTimestamp(const SyncSource* src = NULL) const; /** * When receiving packets from a new source, it may be * convenient to reject a first few packets before we are * really sure the source is valid. This method sets how many * data packets must be received in sequence before the source * is considered valid and the stack starts to accept its * packets. * * @note the default (see defaultMinValidPacketSequence()) * value for this parameter is 0, so that no packets are * rejected (data packets are accepted from the first one). * * @note this validation is performed after the generic header * validation and the additional validation done in * onRTPPacketRecv(). * * @note if any valid RTCP packet is received from this * source, it will be immediatly considered valid regardless * of the number of sequential data packets received. * * @param packets number of sequential packet required **/ void setMinValidPacketSequence(uint8 packets) { minValidPacketSequence = packets; } uint8 getDefaultMinValidPacketSequence() const { return defaultMinValidPacketSequence; } /** * Get the minimun number of consecutive packets that must be * received from a source before accepting its data packets. **/ uint8 getMinValidPacketSequence() const { return minValidPacketSequence; } void setMaxPacketMisorder(uint16 packets) { maxPacketMisorder = packets; } uint16 getDefaultMaxPacketMisorder() const { return defaultMaxPacketMisorder; } uint16 getMaxPacketMisorder() const { return maxPacketMisorder; } /** * * It also prevents packets sent after a restart of the source * being immediately accepted. **/ void setMaxPacketDropout(uint16 packets) // default: 3000. { maxPacketDropout = packets; } uint16 getDefaultMaxPacketDropout() const { return defaultMaxPacketDropout; } uint16 getMaxPacketDropout() const { return maxPacketDropout; } // default value for constructors that allow to specify // members table s\ize inline static size_t getDefaultMembersSize() { return defaultMembersSize; } /** * Set input queue CryptoContext. * * The endQueue method (provided by RTPQueue) deletes all * registered CryptoContexts. * * @param cc Pointer to initialized CryptoContext. */ void setInQueueCryptoContext(CryptoContext* cc); /** * Remove input queue CryptoContext. * * The endQueue method (provided by RTPQueue) also deletes all * registered CryptoContexts. * * @param cc * Pointer to initialized CryptoContext to remove. If pointer * if <code>NULL</code> then delete the whole queue */ void removeInQueueCryptoContext(CryptoContext* cc); /** * Get an input queue CryptoContext identified by SSRC * * @param ssrc Request CryptoContext for this incoming SSRC * @return Pointer to CryptoContext of the SSRC of NULL if no context * available for this SSRC. */ CryptoContext* getInQueueCryptoContext(uint32 ssrc);protected: /** * @param size initial size of the membership table. **/ IncomingDataQueue(uint32 size); virtual ~IncomingDataQueue() { } /** * Apply collision and loop detection and correction algorithm * when receiving RTP data packets. Follows section 8.2 in * draft-ietf-avt-rtp-new. * * @param sourceLink link to the source object. * @param is_new whether the source has been just recorded. * @param na data packet network address. * @param tp data packet source transport port. * * @return whether the packet must not be discarded. **/ bool checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink, bool is_new, InetAddress& na, tpport_t tp); /** * Set the number of RTCP intervals that the stack will wait * to change the state of a source from stateActive to * stateInactive, or to delete the source after being in * stateInactive. * * Note that this value should be uniform accross all * participants and SHOULD be fixed for a particular profile. * * @param intervals number of RTCP report intervals * * @note If RTCP is not being used, the RTCP interval is * assumed to be the default: 5 seconds. * @note The default for this value is, as RECOMMENDED, 5. **/ void setSourceExpirationPeriod(uint8 intervals) { sourceExpirationPeriod = intervals; } /** * This function is used by the service thread to process * the next incoming packet and place it in the receive list. * * @return number of payload bytes received. <0 if error. */ virtual size_t takeInDataPacket(); void renewLocalSSRC(); /** * This is used to fetch a packet in the receive queue and to * expire packets older than the current timestamp. * * @return packet buffer object for current timestamp if found. * @param timestamp timestamp requested. * @param src optional source selector * @note if found, the packet is removed from the reception queue **/ IncomingDataQueue::IncomingRTPPktLink* getWaiting(uint32 timestamp, const SyncSource *src = NULL); /** * Log reception of a new RTP packet from this source. Usually * updates data such as the packet counter, the expected * sequence number for the next packet and the time the last * packet was received at. * * @param srcLink Link structure for the synchronization * source of this packet. * @param pkt Packet just created and to be logged. * @param recvtime Reception time. * * @return whether, according to the source state and * statistics, the packet is considered valid and must be * inserted in the incoming packets queue. **/ bool recordReception(SyncSourceLink& srcLink, const IncomingRTPPkt& pkt, const timeval recvtime); /** * Log extraction of a packet from this source from the * scheduled reception queue. * * @param pkt Packet extracted from the queue. **/ void recordExtraction(const IncomingRTPPkt& pkt); void purgeIncomingQueue(); /** * Virtual called when a new synchronization source has joined * the session. * * @param - new synchronization source **/ inline virtual void onNewSyncSource(const SyncSource&) { }protected: /** * A virtual function to support parsing of arriving packets * to determine if they should be kept in the queue and to * dispatch events. * * A generic header validity check (as specified in RFC 1889) * is performed on every incoming packet. If the generic check * completes succesfully, this method is called before the * packet is actually inserted into the reception queue. * * May be used to perform additional validity checks or to do * some application specific processing. * * @param - packet just received. * @return true if packet is kept in the incoming packets queue. **/ inline virtual bool onRTPPacketRecv(IncomingRTPPkt&) { return true; } /** * A hook to filter packets in the receive queue that are being * expired. This hook may be used to do some application * specific processing on expired packets before they are * deleted. * * @param - packet expired from the recv queue. **/ inline virtual void onExpireRecv(IncomingRTPPkt&) { return; } /** * A hook that gets called if the decoding of an incoming SRTP was erroneous * * @param pkt * The SRTP packet with error. * @param errorCode * The error code: -1 - SRTP authentication failure, -2 - replay * check failed * @return * True: put the packet in incoming queue for further processing * by the applications; false: dismiss packet. The default * implementation returns false. **/ inline virtual bool onSRTPPacketError(IncomingRTPPkt& pkt, int32 errorCode) { return false; } inline virtual bool end2EndDelayed(IncomingRTPPktLink&) { return false; } /** * Insert a just received packet in the queue (both general * and source specific queues). If the packet was already in * the queue (same SSRC and sequence number), it is not * inserted but deleted. * * @param packetLink link to a packet just received and * generally validated and processed by onRTPPacketRecv. * * @return whether the packet was successfully inserted. * @retval false when the packet is duplicated (there is * already a packet from the same source with the same * timestamp). * @retval true when the packet is not duplicated. **/ bool insertRecvPacket(IncomingRTPPktLink* packetLink); /** * This function performs the physical I/O for reading a * packet from the source. It is a virtual that is * overriden in the derived class. * * @return number of bytes read. * @param buffer of read packet. * @param length of data to read. * @param host address of source. * @param port number of source. **/ virtual size_t recvData(unsigned char* buffer, size_t length, InetHostAddress& host, tpport_t& port) = 0; virtual size_t getNextDataPacketSize() const = 0; mutable ThreadLock recvLock; // reception queue IncomingRTPPktLink* recvFirst, * recvLast; // values for packet validation. static const uint8 defaultMinValidPacketSequence; static const uint16 defaultMaxPacketMisorder; static const uint16 defaultMaxPacketDropout; uint8 minValidPacketSequence; uint16 maxPacketMisorder; uint16 maxPacketDropout; static const size_t defaultMembersSize; uint8 sourceExpirationPeriod; mutable Mutex cryptoMutex; std::list<CryptoContext *> cryptoContexts;};/** @}*/ // iqueue#ifdef CCXX_NAMESPACES}#endif#endif //CCXX_RTP_IQUEUE_H_/** EMACS ** * Local variables: * mode: c++ * c-basic-offset: 8 * End: */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -