📄 rtp.c
字号:
rtcp_rr_wrapper *cur, *start;
start = &session->rr[ssrc_hash(reporter_ssrc)][ssrc_hash(rr->ssrc)];
cur = start->next;
while (cur != start) {
if (cur->reporter_ssrc == reporter_ssrc && cur->rr->ssrc == rr->ssrc) {
/* Replace existing entry in the database */
xfree(cur->rr);
xfree(cur->ts);
cur->rr = rr;
cur->ts = (struct timeval *) xmalloc(sizeof(struct timeval));
memcpy(cur->ts, ts, sizeof(struct timeval));
return;
}
cur = cur->next;
}
/* No entry in the database so create one now. */
cur = (rtcp_rr_wrapper*)xmalloc(sizeof(rtcp_rr_wrapper));
cur->reporter_ssrc = reporter_ssrc;
cur->rr = rr;
cur->ts = (struct timeval *) xmalloc(sizeof(struct timeval));
memcpy(cur->ts, ts, sizeof(struct timeval));
/* Fix links */
cur->next = start->next;
cur->next->prev = cur;
cur->prev = start;
cur->prev->next = cur;
rtp_message(LOG_INFO, "Created new rr entry for 0x%08x from source 0x%08x", rr->ssrc, reporter_ssrc);
return;
}
static void remove_rr(struct rtp *session, uint32_t ssrc)
{
/* Remove any RRs from "s" which refer to "ssrc" as either */
/* reporter or reportee. */
rtcp_rr_wrapper *start, *cur, *tmp;
int i;
/* Remove rows, i.e. ssrc == reporter_ssrc */
for(i = 0; i < RTP_DB_SIZE; i++) {
start = &session->rr[ssrc_hash(ssrc)][i];
cur = start->next;
while (cur != start) {
if (cur->reporter_ssrc == ssrc) {
tmp = cur;
cur = cur->prev;
tmp->prev->next = tmp->next;
tmp->next->prev = tmp->prev;
xfree(tmp->ts);
xfree(tmp->rr);
xfree(tmp);
}
cur = cur->next;
}
}
/* Remove columns, i.e. ssrc == reporter_ssrc */
for(i = 0; i < RTP_DB_SIZE; i++) {
start = &session->rr[i][ssrc_hash(ssrc)];
cur = start->next;
while (cur != start) {
if (cur->rr->ssrc == ssrc) {
tmp = cur;
cur = cur->prev;
tmp->prev->next = tmp->next;
tmp->next->prev = tmp->prev;
xfree(tmp->ts);
xfree(tmp->rr);
xfree(tmp);
}
cur = cur->next;
}
}
}
static void timeout_rr(struct rtp *session, struct timeval *curr_ts)
{
/* Timeout any reception reports which have been in the database for more than 3 */
/* times the RTCP reporting interval without refresh. */
rtcp_rr_wrapper *start, *cur, *tmp;
rtp_event event;
int i, j;
for(i = 0; i < RTP_DB_SIZE; i++) {
for(j = 0; j < RTP_DB_SIZE; j++) {
start = &session->rr[i][j];
cur = start->next;
while (cur != start) {
if (tv_diff(*curr_ts, *(cur->ts)) > (session->rtcp_interval * 3)) {
/* Signal the application... */
if (!filter_event(session, cur->reporter_ssrc)) {
event.ssrc = cur->reporter_ssrc;
event.type = RR_TIMEOUT;
event.data = cur->rr;
event.ts = curr_ts;
session->callback(session, &event);
}
/* Delete this reception report... */
tmp = cur;
cur = cur->prev;
tmp->prev->next = tmp->next;
tmp->next->prev = tmp->prev;
xfree(tmp->ts);
xfree(tmp->rr);
xfree(tmp);
}
cur = cur->next;
}
}
}
}
static const rtcp_rr* get_rr(struct rtp *session, uint32_t reporter_ssrc, uint32_t reportee_ssrc)
{
rtcp_rr_wrapper *cur, *start;
start = &session->rr[ssrc_hash(reporter_ssrc)][ssrc_hash(reportee_ssrc)];
cur = start->next;
while (cur != start) {
if (cur->reporter_ssrc == reporter_ssrc &&
cur->rr->ssrc == reportee_ssrc) {
return cur->rr;
}
cur = cur->next;
}
return NULL;
}
static void check_source(source *s)
{
ASSERT(s != NULL);
ASSERT(s->magic == 0xc001feed);
}
static void check_database(struct rtp *session)
{
/* This routine performs a sanity check on the database. */
/* This should not call any of the other routines which */
/* manipulate the database, to avoid common failures. */
#ifdef DEBUG
source *s;
int source_count;
int chain;
#endif
ASSERT(session != NULL);
ASSERT(session->magic == 0xfeedface);
#ifdef DEBUG
/* Check that we have a database entry for our ssrc... */
/* We only do this check if ssrc_count > 0 since it is */
/* performed during initialisation whilst creating the */
/* source entry for my_ssrc. */
if (session->ssrc_count > 0) {
for (s = session->db[ssrc_hash(session->my_ssrc)]; s != NULL; s = s->next) {
if (s->ssrc == session->my_ssrc) {
break;
}
}
ASSERT(s != NULL);
}
source_count = 0;
for (chain = 0; chain < RTP_DB_SIZE; chain++) {
/* Check that the linked lists making up the chains in */
/* the hash table are correctly linked together... */
for (s = session->db[chain]; s != NULL; s = s->next) {
check_source(s);
source_count++;
if (s->prev == NULL) {
ASSERT(s == session->db[chain]);
} else {
ASSERT(s->prev->next == s);
}
if (s->next != NULL) {
ASSERT(s->next->prev == s);
}
/* Check that the SR is for this source... */
if (s->sr != NULL) {
if (s->sr->ssrc != s->ssrc) {
rtp_message(LOG_CRIT, "database error ssrc sr->ssrc is %d should be %d",
s->sr->ssrc, s->ssrc);
ASSERT(s->sr->ssrc == s->ssrc);
}
}
}
}
/* Check that the number of entries in the hash table */
/* matches session->ssrc_count */
ASSERT(source_count == session->ssrc_count);
if (source_count != session->ssrc_count) {
rtp_message(LOG_DEBUG, "source count %d does not equal session count %d", source_count, session->ssrc_count);
}
#endif
}
static source *get_source(struct rtp *session, uint32_t ssrc)
{
source *s;
check_database(session);
for (s = session->db[ssrc_hash(ssrc)]; s != NULL; s = s->next) {
if (s->ssrc == ssrc) {
check_source(s);
return s;
}
}
return NULL;
}
static source *create_source(struct rtp *session, uint32_t ssrc, int probation)
{
/* Create a new source entry, and add it to the database. */
/* The database is a hash table, using the separate chaining */
/* algorithm. */
rtp_event event;
struct timeval event_ts;
source *s = get_source(session, ssrc);
int h;
if (s != NULL) {
/* Source is already in the database... Mark it as */
/* active and exit (this is the common case...) */
gettimeofday(&(s->last_active), NULL);
return s;
}
check_database(session);
/* This is a new source, we have to create it... */
h = ssrc_hash(ssrc);
s = (source *) xmalloc(sizeof(source));
memset(s, 0, sizeof(source));
s->magic = 0xc001feed;
s->next = session->db[h];
s->ssrc = ssrc;
if (probation) {
/* This is a probationary source, which only counts as */
/* valid once several consecutive packets are received */
s->probation = -1;
} else {
s->probation = 0;
}
gettimeofday(&(s->last_active), NULL);
/* Now, add it to the database... */
if (session->db[h] != NULL) {
session->db[h]->prev = s;
}
session->db[ssrc_hash(ssrc)] = s;
session->ssrc_count++;
check_database(session);
rtp_message(LOG_INFO, "Created database entry for ssrc 0x%08x (%d valid sources)", ssrc, session->ssrc_count);
if (ssrc != session->my_ssrc) {
/* Do not send during rtp_init since application cannot map the address */
/* of the rtp session to anything since rtp_init has not returned yet. */
if (!filter_event(session, ssrc)) {
gettimeofday(&event_ts, NULL);
event.ssrc = ssrc;
event.type = SOURCE_CREATED;
event.data = NULL;
event.ts = &event_ts;
session->callback(session, &event);
}
}
return s;
}
static void delete_source(struct rtp *session, uint32_t ssrc)
{
/* Remove a source from the RTP database... */
source *s = get_source(session, ssrc);
int h = ssrc_hash(ssrc);
rtp_event event;
struct timeval event_ts;
ASSERT(s != NULL); /* Deleting a source which doesn't exist is an error... */
gettimeofday(&event_ts, NULL);
check_source(s);
check_database(session);
if (session->db[h] == s) {
/* It's the first entry in this chain... */
session->db[h] = s->next;
if (s->next != NULL) {
s->next->prev = NULL;
}
} else {
ASSERT(s->prev != NULL); /* Else it would be the first in the chain... */
s->prev->next = s->next;
if (s->next != NULL) {
s->next->prev = s->prev;
}
}
/* Free the memory allocated to a source... */
if (s->cname != NULL) xfree(s->cname);
if (s->name != NULL) xfree(s->name);
if (s->email != NULL) xfree(s->email);
if (s->phone != NULL) xfree(s->phone);
if (s->loc != NULL) xfree(s->loc);
if (s->tool != NULL) xfree(s->tool);
if (s->note != NULL) xfree(s->note);
if (s->priv != NULL) xfree(s->priv);
if (s->sr != NULL) xfree(s->sr);
remove_rr(session, ssrc);
/* Reduce our SSRC count, and perform reverse reconsideration on the RTCP */
/* reporting interval (draft-ietf-avt-rtp-new-05.txt, section 6.3.4). To */
/* make the transmission rate of RTCP packets more adaptive to changes in */
/* group membership, the following "reverse reconsideration" algorithm */
/* SHOULD be executed when a BYE packet is received that reduces members */
/* to a value less than pmembers: */
/* o The value for tn is updated according to the following formula: */
/* tn = tc + (members/pmembers)(tn - tc) */
/* o The value for tp is updated according the following formula: */
/* tp = tc - (members/pmembers)(tc - tp). */
/* o The next RTCP packet is rescheduled for transmission at time tn, */
/* which is now earlier. */
/* o The value of pmembers is set equal to members. */
session->ssrc_count--;
if (session->ssrc_count < session->ssrc_count_prev) {
gettimeofday(&(session->next_rtcp_send_time), NULL);
gettimeofday(&(session->last_rtcp_send_time), NULL);
tv_add(&(session->next_rtcp_send_time), (session->ssrc_count / session->ssrc_count_prev)
* tv_diff(session->next_rtcp_send_time, event_ts));
tv_add(&(session->last_rtcp_send_time), - ((session->ssrc_count / session->ssrc_count_prev)
* tv_diff(event_ts, session->last_rtcp_send_time)));
session->ssrc_count_prev = session->ssrc_count;
}
/* Reduce our csrc count... */
if (s->should_advertise_sdes == TRUE) {
session->csrc_count--;
}
if (session->last_advertised_csrc == session->csrc_count) {
session->last_advertised_csrc = 0;
}
/* Signal to the application that this source is dead... */
if (!filter_event(session, ssrc)) {
event.ssrc = ssrc;
event.type = SOURCE_DELETED;
event.data = NULL;
event.ts = &event_ts;
session->callback(session, &event);
}
xfree(s);
check_database(session);
}
static void init_seq(source *s, uint16_t seq)
{
/* Taken from draft-ietf-avt-rtp-new-01.txt */
check_source(s);
s->base_seq = seq;
s->max_seq = seq;
s->bad_seq = RTP_SEQ_MOD + 1;
s->cycles = 0;
s->received = 0;
s->received_prior = 0;
s->expected_prior = 0;
}
static int update_seq(source *s, uint16_t seq)
{
/* Taken from draft-ietf-avt-rtp-new-01.txt */
uint16_t udelta = seq - s->max_seq;
/*
* Source is not valid until MIN_SEQUENTIAL packets with
* sequential sequence numbers have been received.
*/
check_source(s);
if (s->probation) {
/* packet is in sequence */
if (seq == s->max_seq + 1) {
s->probation--;
s->max_seq = seq;
if (s->probation == 0) {
init_seq(s, seq);
s->received++;
return 1;
}
} else {
s->probation = MIN_SEQUENTIAL - 1;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -