rtpsession.c
来自「linphone源码-1.3.5.tar.gz,linphone源码-1.3.5」· C语言 代码 · 共 1,957 行 · 第 1/4 页
C
1,957 行
/** *rtp_session_create_packet_with_data: *@session: a rtp session. *@payload : the data to be sent with this packet *@payload_size : size of data *@freefn : a function that will be called when the payload buffer is no more needed. * * Creates a new rtp packet using the given payload buffer (no copy). The header will be allocated separetely. * In the header, ssrc and payload_type according to the session's * context. Timestamp and seq number are not set, there will be set when the packet is going to be * sent with rtp_session_sendm_with_ts(). * oRTP will send this packet using libc's sendmsg() (if this function is availlable!) so that there will be no * packet concatenation involving copies to be done in user-space. * @freefn can be NULL, in that case payload will be kept untouched. * *Returns: a rtp packet in a mblk_t (message block) structure.**/mblk_t * rtp_session_create_packet_with_data(RtpSession *session, char *payload, int payload_size, void (*freefn)(void*)){ mblk_t *mp,*mpayload; int header_size=RTP_FIXED_HEADER_SIZE; /* revisit when support for csrc is done */ rtp_header_t *rtp; mp=allocb(header_size,BPRI_MED); rtp=(rtp_header_t*)mp->b_rptr; rtp->version = 2; rtp->padbit = 0; rtp->extbit = 0; rtp->markbit= 0; rtp->cc = 0; rtp->paytype = session->send_pt; rtp->ssrc = session->send_ssrc; rtp->timestamp = 0; /* set later, when packet is sended */ rtp->seq_number = 0; /*set later, when packet is sended */ mp->b_wptr+=header_size; /* create a mblk_t around the user supplied payload buffer */ mpayload=allocb_with_buf(payload,payload_size,BPRI_MED,freefn); mpayload->b_wptr+=payload_size; /* link it with the header */ mp->b_cont=mpayload; return mp;}/** *rtp_session_create_packet_in_place: *@session: a rtp session. *@buffer: a buffer that contains first just enough place to write a RTP header, then the data to send. *@size : the size of the buffer *@freefn : a function that will be called once the buffer is no more needed (the data has been sent). * * Creates a new rtp packet using the buffer given in arguments (no copy). * In the header, ssrc and payload_type according to the session's * context. Timestamp and seq number are not set, there will be set when the packet is going to be * sent with rtp_session_sendm_with_ts(). * @freefn can be NULL, in that case payload will be kept untouched. * *Returns: a rtp packet in a mblk_t (message block) structure.**/mblk_t * rtp_session_create_packet_in_place(RtpSession *session,char *buffer, int size, void (*freefn)(void*) ){ mblk_t *mp; rtp_header_t *rtp; mp=allocb_with_buf(buffer,size,BPRI_MED,freefn); rtp=(rtp_header_t*)mp->b_rptr; rtp->version = 2; rtp->padbit = 0; rtp->extbit = 0; rtp->markbit= 0; rtp->cc = 0; rtp->paytype = session->send_pt; rtp->ssrc = session->send_ssrc; rtp->timestamp = 0; /* set later, when packet is sended */ rtp->seq_number = 0; /*set later, when packet is sended */ return mp;}static int rtcp_recv (RtpSession * session);/** *rtp_session_sendm_with_ts: *@session : a rtp session. *@mp : a rtp packet presented as a mblk_t. *@timestamp: the timestamp of the data to be sent. Refer to the rfc to know what it is. * * Send the rtp datagram @mp to the destination set by rtp_session_set_remote_addr() * with timestamp @timestamp. For audio data, the timestamp is the number * of the first sample resulting of the data transmitted. See rfc1889 for details. * The packet (@mp) is freed once it is sended. * *Returns: the number of bytes sent over the network.**/intrtp_session_sendm_with_ts (RtpSession * session, mblk_t *mp, uint32_t timestamp){ rtp_header_t *rtp; uint32_t packet_time; int error = 0; int packsize; RtpScheduler *sched=session->sched; RtpStream *stream=&session->rtp; if (session->flags & RTP_SESSION_SEND_NOT_STARTED) { session->rtp.snd_ts_offset = timestamp; /* Set initial last_rcv_time to first send time. */ if ((session->flags & RTP_SESSION_RECV_NOT_STARTED) || session->mode == RTP_SESSION_SENDONLY) { gettimeofday(&session->last_recv_time, NULL); } if (session->flags & RTP_SESSION_SCHEDULED) { session->rtp.snd_time_offset = sched->time_; } rtp_session_unset_flag (session,RTP_SESSION_SEND_NOT_STARTED); } /* if we are in blocking mode, then suspend the process until the scheduler it's time to send the * next packet */ /* if the timestamp of the packet queued is older than current time, then you we must * not block */ if (session->flags & RTP_SESSION_SCHEDULED) { packet_time = rtp_session_ts_to_time (session, timestamp - session->rtp.snd_ts_offset) + session->rtp.snd_time_offset; /*ortp_message("rtp_session_send_with_ts: packet_time=%i time=%i",packet_time,sched->time_);*/ wait_point_lock(&session->send_wp); if (TIME_IS_STRICTLY_NEWER_THAN (packet_time, sched->time_)) { wait_point_wakeup_at(&session->send_wp,packet_time,(session->flags & RTP_SESSION_BLOCKING_MODE)!=0); session_set_clr(&sched->w_sessions,session); /* the session has written */ } else session_set_set(&sched->w_sessions,session); /*to indicate select to return immediately */ wait_point_unlock(&session->send_wp); } rtp=(rtp_header_t*)mp->b_rptr; packsize = msgdsize(mp) ; rtp_session_lock (session); /* set a seq number */ rtp->seq_number=session->rtp.snd_seq; rtp->timestamp=timestamp; session->rtp.snd_seq++; session->rtp.snd_last_ts = timestamp; ortp_global_stats.sent += packsize; stream->stats.sent += packsize; ortp_global_stats.packet_sent++; stream->stats.packet_sent++; error = ortp_rtp_send (session, mp); rtcp_recv(session); rtp_session_rtcp_process(session); rtp_session_unlock (session); return error;}/** *rtp_session_send_with_ts: *@session: a rtp session. *@buffer: a buffer containing the data to be sent in a rtp packet. *@len: the length of the data buffer, in bytes. *@userts: the timestamp of the data to be sent. Refer to the rfc to know what it is. * * Send a rtp datagram to the destination set by rtp_session_set_remote_addr() containing * the data from @buffer with timestamp @userts. This is a high level function that uses * rtp_session_create_packet() and rtp_session_sendm_with_ts() to send the data. * * *Returns: the number of bytes sent over the network.**/intrtp_session_send_with_ts (RtpSession * session, const char * buffer, int len, uint32_t userts){ mblk_t *m; int err;#ifdef USE_SENDMSG m=rtp_session_create_packet_with_data(session,(char*)buffer,len,NULL);#else m = rtp_session_create_packet(session,RTP_FIXED_HEADER_SIZE,(char*)buffer,len);#endif err=rtp_session_sendm_with_ts(session,m,userts); return err;}static intrtp_recv (RtpSession * session, uint32_t user_ts){ int error; struct sockaddr remaddr; socklen_t addrlen = sizeof (remaddr); mblk_t *mp; RtpStream *stream=&session->rtp; if (session->rtp.socket<0) return -1; /*session has no sockets for the moment*/ while (1) { int bufsz; if (session->rtp.cached_mp==NULL) session->rtp.cached_mp = allocb (session->recv_buf_size, 0); mp=session->rtp.cached_mp; bufsz=(int) (mp->b_datap->db_lim - mp->b_datap->db_base); if (session->flags & RTP_SESSION_USING_EXT_SOCKETS){ error=recv(session->rtp.socket,mp->b_wptr,bufsz,0); }else error = recvfrom (session->rtp.socket, mp->b_wptr, bufsz, 0, (struct sockaddr *) &remaddr, &addrlen); if (error > 0) { if (error<RTP_FIXED_HEADER_SIZE){ ortp_warning("Packet too small to be a rtp packet (%i)!",error); stream->stats.bad++; ortp_global_stats.bad++; /* don't free, it will be reused next time */ }else{ /* then parse the message and put on queue */ mp->b_wptr+=error; rtp_parse (session, mp, user_ts + session->rtp.hwrcv_diff_ts); session->rtp.cached_mp=NULL; if (session->symmetric_rtp==1 && addrlen>0){ /* store the sender rtp address to do symmetric RTP */ memcpy(&session->rtp.rem_addr,&remaddr,addrlen); session->rtp.rem_addrlen=addrlen; } } } else { int errnum=getSocketErrorCode(); if (error == 0) { ortp_warning ("rtp_recv: strange... recv() returned zero."); } else if (!is_would_block_error(errnum)) { if (session->on_network_error.count>0){ rtp_signal_table_emit3(&session->on_network_error,(long)"Error receiving RTP packet",INT_TO_POINTER(getSocketErrorCode())); }else ortp_warning("Error receiving RTP packet: %s.",getSocketError()); } /* don't free the cached_mp, it will be reused next time */ return -1; /* avoids an infinite loop ! */ } } return error;}extern void rtcp_parse(RtpSession *session, mblk_t *mp);static intrtcp_recv (RtpSession * session){ int error;#ifdef ORTP_INET6 struct sockaddr_storage remaddr;#else struct sockaddr remaddr;#endif socklen_t addrlen=0; mblk_t *mp; if (session->rtcp.socket<0) return -1; /*session has no rtcp sockets for the moment*/ while (1) { if (session->rtcp.cached_mp==NULL) session->rtcp.cached_mp = allocb (RTCP_MAX_RECV_BUFSIZE, 0); mp=session->rtcp.cached_mp; if (session->flags & RTP_SESSION_USING_EXT_SOCKETS){ error=recv(session->rtcp.socket,mp->b_wptr,RTCP_MAX_RECV_BUFSIZE,0); }else { addrlen=sizeof (remaddr); error=recvfrom (session->rtcp.socket, mp->b_wptr, RTCP_MAX_RECV_BUFSIZE, 0, (struct sockaddr *) &remaddr, &addrlen); } if (error > 0) { mp->b_wptr += error; /* then parse the message */ rtcp_parse (session, mp); freemsg(mp); session->rtcp.cached_mp=NULL; if (session->symmetric_rtp==1 && addrlen>0){ /* store the sender rtcp address to send him receiver reports */ memcpy(&session->rtcp.rem_addr,&remaddr,addrlen); session->rtcp.rem_addrlen=addrlen; } else if (session->rtcp.rem_addrlen==0 && addrlen>0){ /* store the sender rtcp address to send him receiver reports */ memcpy(&session->rtcp.rem_addr,&remaddr,addrlen); session->rtcp.rem_addrlen=addrlen; } } else { int errnum=getSocketErrorCode(); if (error == 0) { ortp_warning ("rtcp_recv: strange... recv() returned zero."); } else if (!is_would_block_error(errnum)) { if (session->on_network_error.count>0){ rtp_signal_table_emit3(&session->on_network_error,(long)"Error receiving RTCP packet",INT_TO_POINTER(errnum)); }else ortp_warning("Error receiving RTCP packet: %s.",getSocketError()); } /* don't free the cached_mp, it will be reused next time */ return -1; /* avoids an infinite loop ! */ } } return error;}static void payload_type_changed_notify(RtpSession *session, int paytype){ session->recv_pt = paytype; rtp_signal_table_emit (&session->on_payload_type_changed); }/** *rtp_session_recvm_with_ts: *@session: a rtp session. *@user_ts: a timestamp. * * Try to get a rtp packet presented as a mblk_t structure from the rtp session. * The @user_ts parameter is relative to the first timestamp of the incoming stream. In other * words, the application does not have to know the first timestamp of the stream, it can * simply call for the first time this function with @user_ts=0, and then incrementing it * as it want. The RtpSession takes care of synchronisation between the stream timestamp * and the user timestamp given here. * *Returns: a rtp packet presented as a mblk_t.**/mblk_t *rtp_session_recvm_with_ts (RtpSession * session, uint32_t user_ts){ mblk_t *mp = NULL; rtp_header_t *rtp; uint32_t ts; uint32_t packet_time; RtpScheduler *sched=session->sched; RtpStream *stream=&session->rtp; int rejected=0; /* if we are scheduled, remember the scheduler time at which the application has * asked for its first timestamp */ if (session->flags & RTP_SESSION_RECV_NOT_STARTED) { session->rtp.rcv_query_ts_offset = user_ts; /* Set initial last_rcv_time to first recv time. */ if ((session->flags & RTP_SESSION_SEND_NOT_STARTED) || session->mode == RTP_SESSION_RECVONLY) { gettimeofday(&session->last_recv_time, NULL); } if (session->flags & RTP_SESSION_SCHEDULED) { session->rtp.rcv_time_offset = sched->time_; //ortp_message("setting snd_time_offset=%i",session->rtp.snd_time_offset); } rtp_session_unset_flag (session,RTP_SESSION_RECV_NOT_STARTED); } session->rtp.rcv_last_app_ts = user_ts; rtp_recv (session, user_ts); rtcp_recv(session); /* check for telephone event first */ /* first lock the session */ rtp_session_lock (session); mp=getq(&session->rtp.tev_rq); if (mp!=NULL){ rtp_signal_table_emit2(&session->on_telephone_event_packet,(long)mp); if (session->on_telephone_event.count>0){ rtp_session_check_telephone_events(session,mp); } freemsg(mp); mp=NULL; } /* then now try to return a media packet, if possible */ /* first condition: if the session is starting, don't return anything * until the queue size reaches jitt_comp */ if (session->flags & RTP_SESSION_RECV_SYNC) { rtp_header_t *oldest, *newest; queue_t *q = &session->rtp.rq; if (qempty(q)) { ortp_debug ("Queue is empty."); goto end; } oldest = (rtp_header_t *) qfirst(q)->b_rptr; newest = (rtp_header_t *) qlast(q)->b_rptr; if ((uint32_t) (newest->timestamp - oldest->timestamp) < (uint32_t) session->rtp.jittctl.jitt_comp_ts) { ortp_debug("Not enough packet bufferised."); goto end; } /* enough packet bufferised */ mp = getq (&session->rtp.rq); rtp = (rtp_header_t *) mp->b_rptr; session->rtp.rcv_ts_offset = rtp->timestamp; /* remember the timestamp offset between the stream timestamp (random) * and the user timestamp, that very often starts at zero */ session->rtp.rcv_diff_ts = rtp->timestamp - user_ts; /* remember the difference between the last received on the socket timestamp and the user timestamp */ session->rtp.hwrcv_diff_ts=session->rtp.rcv_diff_ts + session->rtp.jittctl.jitt_comp_ts; session->rtp.rcv_last_ret_ts = user_ts; /* just to have an init value */ session->rtp.rcv_last_ts = rtp->timestamp; session->recv_ssrc = rtp->ssrc; /* delete the recv synchronisation flag */ rtp_session_unset_flag (session, RTP_SESSION_RECV_SYNC); ortp_debug("Returning FIRST packet with ts=%i, hwrcv_diff_ts=%i, rcv_diff_ts=%i", rtp->timestamp, session->rtp.hwrcv_diff_ts,session->rtp.rcv_diff_ts); goto end; } /* else this the normal case */ /*calculate the stream timestamp from the user timestamp */ ts = user_ts + session->rtp.rcv_diff_ts; session->rtp.rcv_last_ts = ts; mp = rtp_getq (&session->rtp.rq, ts,&rejected); stream->stats.skipped+=rejected; ortp_global_stats.skipped+=rejected; goto end; end: if (mp != NULL) { int msgsize = msgdsize (mp); /* evaluate how much bytes (including header) is received by app */ uint32_t packet_ts; ortp_global_stats.recv += msgsize; stream->stats.recv += msgsize; rtp = (rtp_header_t *) mp->b_rptr; packet_ts=rtp->timestamp; ortp_debug("Returning mp with ts=%i", packet_ts); /* check for payload type changes */ if (session->recv_pt != rtp->paytype) { payload_type_changed_notify(session, rtp->paytype); } /* patch the packet so that it has a timestamp compensated by the adaptive jitter buffer mechanism */ if (session->rtp.jittctl.adaptive){ rtp->timestamp-=session->rtp.jittctl.corrective_slide; /*printf("Returned packet has timestamp %u, with clock slide compensated it is %u\n",packet_ts,rtp->timestamp);*/ } } else { ortp_debug ("No mp for timestamp queried"); stream->stats.unavaillable++; ortp_global_stats.unavaillable++; } rtp_session_rtcp_process(session); rtp_session_unlock (session); if (session->flags & RTP_SESSION_SCHEDULED)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?