📄 framereadfragmented.c
字号:
/* framereadfragmented.c - framereadfragmented */#include <syn.h>#include <util.h>#include <strings.h>#include <stdlib.h>#include <unistd.h>#include <frame.h>#include <errno.h>#include <util.h>#include <limits.h>#define DEBUG(x) x#include <stdio.h>/*------------------------------------------------------------------------ * framereadnonfragmented - reassemble and copy out the frame of which * pln.rln_rtp is part. This function waits for all fragments to arrive * if in blocking mode. If not blocking and any fragment is missing, * this function return SYN_FRAGMENT. Regardless of the value of the * SYN_READ_FLUSH bit of the read flags, this function will return * SYN_FRAGMENT if all fragments are not received within the frame's * useful time. The time at which the frame expires is determined by a * call to frameduration on its first fragment, or by the next highest * timestamp received. *------------------------------------------------------------------------ */intframereadfragmented(struct synsession *pssn, struct synstream *psstm, struct timespec *time, mediatime_t *tsout, char *buf, int buflen, char *extraheaderbuf, int *extraheaderbuflen, bool *mark, mediatime_t ts){ struct frameparam *pparam; struct timespec now, exptime, tmptime; struct rtpqueue queue; struct rtpln *pln; struct rtp *prtp; mediatime_t frameduration, endtime; bool block, aligned, newdata; int dataoffset, bytes, framelen, accum, rv; int bytescopied, bytestocopy; char *pframe; pparam = (struct frameparam *) psstm->sstm_parameters; block = (psstm->sstm_readflags & SYN_READ_BLOCK) != 0; framelen = -1; exptime.tv_sec = LONG_MAX; exptime.tv_nsec = 0; accum = 0; rtpqinit(&queue); /* * Accumulate fragments. */ while(framelen != accum) { if (psstm->sstm_zombie == TRUE) { rtpqdestroy(&queue); return ERROR; } /* * Check for rebuffering. */ if (psstm->sstm_buffering == TRUE) { rtpqdestroy(&queue); return ERROR; } /* * Check for expiration of the frame. */ clock_gettime(CLOCK_REALTIME, &now); if (timecmp(exptime, now) >= 0) { rtpqdestroy(&queue); return ERROR; } /* * Get the next packet. */ pln = rtpqextracthead(pssn->ssn_session, psstm->sstm_ssrc); newdata = FALSE; if (pln != NULL) { prtp = &pln->rln_rtp; if (tseq(prtp->rtp_time, ts)) newdata = TRUE; else if (tslt(prtp->rtp_time, ts)) { bufpoolfreebuf(pln); continue; } else { /* * Infer expiration time of the frame being reassembled */ endtime = prtp->rtp_time; tmptime = timeunflatten(endtime - psstm->sstm_clky, psstm->sstm_clkrt); tmptime = timeadd(psstm->sstm_clkx, tmptime); if (timecmp(tmptime, exptime) < 0) exptime = tmptime; rtpqinsert(pssn->ssn_session, psstm->sstm_ssrc, pln); } } /* * Check if no new data; if none, condition wait. */ if (newdata == FALSE) { if (block == TRUE) { rv = pthread_cond_timedwait(&psstm->sstm_readcond, &psstm->sstm_readmutex, &exptime); if (rv != 0 && rv != ETIMEDOUT) { rtpqdestroy(&queue); return ERROR; } continue; } else { /* * Put frags back into main queue. */ for (pln = queue.rq_head; pln != NULL; pln = pln->rln_next) rtpqinsert(pssn->ssn_session, psstm->sstm_ssrc, pln); queue.rq_head = queue.rq_tail = NULL; rtpqdestroy(&queue); if (exptime.tv_sec != LONG_MAX) *time = exptime; return SYN_FRAME_FRAG; } } /* * Keep this packet. */ _rtpqinsert(&queue, pln, NULL); dataoffset = pparam->fp_dataoffset(prtp, pln->rln_len); if (dataoffset == ERROR) { rtpqdestroy(&queue); return ERROR; } bytes = pln->rln_len - RTP_HEADER_LEN(prtp) - dataoffset; accum += bytes; /* * If we havn't already seen the initial fragment, get the * details we need from it. */ if (framelen == -1) { aligned = pparam->fp_framealigned(prtp, pln->rln_len); if (aligned == (bool) ERROR) { rtpqdestroy(&queue); return ERROR; } if (aligned == TRUE) { pframe = RTP_DATA(prtp) + dataoffset; framelen = pparam->fp_framelength(pframe, bytes); frameduration = pparam->fp_frameduration(pframe, bytes); if (framelen == ERROR || frameduration == ERROR) { rtpqdestroy(&queue); return ERROR; } endtime = ts + frameduration; exptime = timeunflatten(endtime - psstm->sstm_clky, psstm->sstm_clkrt); exptime = timeadd(psstm->sstm_clkx, tmptime); } } } /* while(framelen != accum) */ /* * Reassemble the fragments */ bytescopied = 0; pln = queue.rq_head; while(pln != NULL && buflen > 0) { prtp = &pln->rln_rtp; dataoffset = pparam->fp_dataoffset(prtp, pln->rln_len); bytes = pln->rln_len - RTP_HEADER_LEN(prtp) - dataoffset; pframe = RTP_DATA(prtp) + dataoffset; /* * Copy extra headers only from first fragment's packet. */ if (bytescopied == 0 && extraheaderbuf != NULL) { *extraheaderbuflen = min(dataoffset, *extraheaderbuflen); bcopy(RTP_DATA(prtp), extraheaderbuf, *extraheaderbuflen); *mark = prtp->rtp_mark; } bytestocopy = min(buflen, bytes); bcopy(pframe, buf, bytestocopy); bytescopied += bytestocopy; buf += bytestocopy; buflen -= bytestocopy; pln = pln->rln_next; } rtpqdestroy(&queue); /* * Set *time to local time of ts. */ tmptime = timeunflatten(ts - psstm->sstm_clky, psstm->sstm_clkrt); *time = timeadd(tmptime, psstm->sstm_clkx); *tsout = ts; psstm->sstm_lastread = ts + frameduration; return bytescopied;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -