rtpsession.c

来自「linphone源码-1.3.5.tar.gz,linphone源码-1.3.5」· C语言 代码 · 共 1,957 行 · 第 1/4 页

C
1,957
字号
/** *rtp_session_create_packet_with_data: *@session:		a rtp session. *@payload		: the data to be sent with this packet *@payload_size	: size of data *@freefn		: a function that will be called when the payload buffer is no more needed. * *	Creates a new rtp packet using the given payload buffer (no copy). The header will be allocated separetely. *  In the header, ssrc and payload_type according to the session's *	context. Timestamp and seq number are not set, there will be set when the packet is going to be *	sent with rtp_session_sendm_with_ts(). *	oRTP will send this packet using libc's sendmsg() (if this function is availlable!) so that there will be no *	packet concatenation involving copies to be done in user-space. *  @freefn can be NULL, in that case payload will be kept untouched. * *Returns: a rtp packet in a mblk_t (message block) structure.**/mblk_t * rtp_session_create_packet_with_data(RtpSession *session, char *payload, int payload_size, void (*freefn)(void*)){	mblk_t *mp,*mpayload;	int header_size=RTP_FIXED_HEADER_SIZE; /* revisit when support for csrc is done */	rtp_header_t *rtp;		mp=allocb(header_size,BPRI_MED);	rtp=(rtp_header_t*)mp->b_rptr;	rtp->version = 2;	rtp->padbit = 0;	rtp->extbit = 0;	rtp->markbit= 0;	rtp->cc = 0;	rtp->paytype = session->send_pt;	rtp->ssrc = session->send_ssrc;	rtp->timestamp = 0;	/* set later, when packet is sended */	rtp->seq_number = 0; /*set later, when packet is sended */	mp->b_wptr+=header_size;	/* create a mblk_t around the user supplied payload buffer */	mpayload=allocb_with_buf(payload,payload_size,BPRI_MED,freefn);	mpayload->b_wptr+=payload_size;	/* link it with the header */	mp->b_cont=mpayload;	return mp;}/** *rtp_session_create_packet_in_place: *@session:		a rtp session. *@buffer:	a buffer that contains first just enough place to write a RTP header, then the data to send. *@size		: the size of the buffer *@freefn : a function that will be called once the buffer is no more needed (the data has been sent). * *	Creates a new rtp packet using the buffer given in arguments (no copy).  *  In the header, ssrc and payload_type according to the session's *	context. Timestamp and seq number are not set, there will be set when the packet is going to be *	sent with rtp_session_sendm_with_ts(). *  @freefn can be NULL, in that case payload will be kept untouched. * *Returns: a rtp packet in a mblk_t (message block) structure.**/mblk_t * rtp_session_create_packet_in_place(RtpSession *session,char *buffer, int size, void (*freefn)(void*) ){	mblk_t *mp;	rtp_header_t *rtp;		mp=allocb_with_buf(buffer,size,BPRI_MED,freefn);	rtp=(rtp_header_t*)mp->b_rptr;	rtp->version = 2;	rtp->padbit = 0;	rtp->extbit = 0;	rtp->markbit= 0;	rtp->cc = 0;	rtp->paytype = session->send_pt;	rtp->ssrc = session->send_ssrc;	rtp->timestamp = 0;	/* set later, when packet is sended */	rtp->seq_number = 0; /*set later, when packet is sended */	return mp;}static int rtcp_recv (RtpSession * session);/** *rtp_session_sendm_with_ts: *@session	: a rtp session. *@mp		:	a rtp packet presented as a mblk_t. *@timestamp:	the timestamp of the data to be sent. Refer to the rfc to know what it is. * *	Send the rtp datagram @mp to the destination set by rtp_session_set_remote_addr()  *	with timestamp @timestamp. For audio data, the timestamp is the number *	of the first sample resulting of the data transmitted. See rfc1889 for details. *  The packet (@mp) is freed once it is sended. * *Returns: the number of bytes sent over the network.**/intrtp_session_sendm_with_ts (RtpSession * session, mblk_t *mp, uint32_t timestamp){	rtp_header_t *rtp;	uint32_t packet_time;	int error = 0;	int packsize;	RtpScheduler *sched=session->sched;	RtpStream *stream=&session->rtp;	if (session->flags & RTP_SESSION_SEND_NOT_STARTED)	{		session->rtp.snd_ts_offset = timestamp;        /* Set initial last_rcv_time to first send time. */        if ((session->flags & RTP_SESSION_RECV_NOT_STARTED)            || session->mode == RTP_SESSION_SENDONLY)        {            gettimeofday(&session->last_recv_time, NULL);        }		if (session->flags & RTP_SESSION_SCHEDULED)		{			session->rtp.snd_time_offset = sched->time_;		}		rtp_session_unset_flag (session,RTP_SESSION_SEND_NOT_STARTED);	}	/* if we are in blocking mode, then suspend the process until the scheduler it's time to send  the	 * next packet */	/* if the timestamp of the packet queued is older than current time, then you we must	 * not block */	if (session->flags & RTP_SESSION_SCHEDULED)	{		packet_time =			rtp_session_ts_to_time (session,				     timestamp -				     session->rtp.snd_ts_offset) +					session->rtp.snd_time_offset;		/*ortp_message("rtp_session_send_with_ts: packet_time=%i time=%i",packet_time,sched->time_);*/		wait_point_lock(&session->send_wp);		if (TIME_IS_STRICTLY_NEWER_THAN (packet_time, sched->time_))		{			wait_point_wakeup_at(&session->send_wp,packet_time,(session->flags & RTP_SESSION_BLOCKING_MODE)!=0);				session_set_clr(&sched->w_sessions,session);	/* the session has written */		}		else session_set_set(&sched->w_sessions,session);	/*to indicate select to return immediately */		wait_point_unlock(&session->send_wp);	}		rtp=(rtp_header_t*)mp->b_rptr;		packsize = msgdsize(mp) ;	rtp_session_lock (session);		/* set a seq number */	rtp->seq_number=session->rtp.snd_seq;	rtp->timestamp=timestamp;	session->rtp.snd_seq++;	session->rtp.snd_last_ts = timestamp;	ortp_global_stats.sent += packsize;	stream->stats.sent += packsize;	ortp_global_stats.packet_sent++;	stream->stats.packet_sent++;	error = ortp_rtp_send (session, mp);	rtcp_recv(session);	rtp_session_rtcp_process(session);	rtp_session_unlock (session);		return error;}/** *rtp_session_send_with_ts: *@session: a rtp session. *@buffer:	a buffer containing the data to be sent in a rtp packet. *@len:		the length of the data buffer, in bytes. *@userts:	the timestamp of the data to be sent. Refer to the rfc to know what it is. * *	Send a rtp datagram to the destination set by rtp_session_set_remote_addr() containing *	the data from @buffer with timestamp @userts. This is a high level function that uses *	rtp_session_create_packet() and rtp_session_sendm_with_ts() to send the data. * * *Returns: the number of bytes sent over the network.**/intrtp_session_send_with_ts (RtpSession * session, const char * buffer, int len,			  uint32_t userts){	mblk_t *m;	int err;#ifdef USE_SENDMSG	m=rtp_session_create_packet_with_data(session,(char*)buffer,len,NULL);#else	m = rtp_session_create_packet(session,RTP_FIXED_HEADER_SIZE,(char*)buffer,len);#endif	err=rtp_session_sendm_with_ts(session,m,userts);	return err;}static intrtp_recv (RtpSession * session, uint32_t user_ts){	int error;	struct sockaddr remaddr;	socklen_t addrlen = sizeof (remaddr);	mblk_t *mp;	RtpStream *stream=&session->rtp;	if (session->rtp.socket<0) return -1;  /*session has no sockets for the moment*/		while (1)	{		int bufsz;		if (session->rtp.cached_mp==NULL)			 session->rtp.cached_mp = allocb (session->recv_buf_size, 0);		mp=session->rtp.cached_mp;		bufsz=(int) (mp->b_datap->db_lim - mp->b_datap->db_base);		if (session->flags & RTP_SESSION_USING_EXT_SOCKETS){			error=recv(session->rtp.socket,mp->b_wptr,bufsz,0);		}else error = recvfrom (session->rtp.socket, mp->b_wptr,				  bufsz, 0,				  (struct sockaddr *) &remaddr,				  &addrlen);		if (error > 0)		{			if (error<RTP_FIXED_HEADER_SIZE){				ortp_warning("Packet too small to be a rtp packet (%i)!",error);				stream->stats.bad++;				ortp_global_stats.bad++;				/* don't free, it will be reused next time */			}else{				/* then parse the message and put on queue */				mp->b_wptr+=error;				rtp_parse (session, mp, user_ts + session->rtp.hwrcv_diff_ts);				session->rtp.cached_mp=NULL;				if (session->symmetric_rtp==1 && addrlen>0){				  /* store the sender rtp address to do symmetric RTP */				  memcpy(&session->rtp.rem_addr,&remaddr,addrlen);				  session->rtp.rem_addrlen=addrlen;				}			}		}		else		{		 	int errnum=getSocketErrorCode();			if (error == 0)			{				ortp_warning					("rtp_recv: strange... recv() returned zero.");			}			else if (!is_would_block_error(errnum))			{				if (session->on_network_error.count>0){					rtp_signal_table_emit3(&session->on_network_error,(long)"Error receiving RTP packet",INT_TO_POINTER(getSocketErrorCode()));				}else ortp_warning("Error receiving RTP packet: %s.",getSocketError());			}			/* don't free the cached_mp, it will be reused next time */			return -1;	/* avoids an infinite loop ! */		}	}	return error;}extern void rtcp_parse(RtpSession *session, mblk_t *mp);static intrtcp_recv (RtpSession * session){	int error;#ifdef ORTP_INET6	struct sockaddr_storage remaddr;#else	struct sockaddr remaddr;#endif	socklen_t addrlen=0;	mblk_t *mp;		if (session->rtcp.socket<0) return -1;  /*session has no rtcp sockets for the moment*/		while (1)	{		if (session->rtcp.cached_mp==NULL)			 session->rtcp.cached_mp = allocb (RTCP_MAX_RECV_BUFSIZE, 0);				mp=session->rtcp.cached_mp;		if (session->flags & RTP_SESSION_USING_EXT_SOCKETS){			error=recv(session->rtcp.socket,mp->b_wptr,RTCP_MAX_RECV_BUFSIZE,0);		}else {			addrlen=sizeof (remaddr);			error=recvfrom (session->rtcp.socket, mp->b_wptr,				  RTCP_MAX_RECV_BUFSIZE, 0,				  (struct sockaddr *) &remaddr,				  &addrlen);		}		if (error > 0)		{			mp->b_wptr += error;			/* then parse the message */			rtcp_parse (session, mp);			freemsg(mp);			session->rtcp.cached_mp=NULL;			if (session->symmetric_rtp==1 && addrlen>0){				/* store the sender rtcp address to send him receiver reports */				memcpy(&session->rtcp.rem_addr,&remaddr,addrlen);				session->rtcp.rem_addrlen=addrlen;			} else if (session->rtcp.rem_addrlen==0 && addrlen>0){				/* store the sender rtcp address to send him receiver reports */				memcpy(&session->rtcp.rem_addr,&remaddr,addrlen);				session->rtcp.rem_addrlen=addrlen;			}		}		else		{			int errnum=getSocketErrorCode();			if (error == 0)			{				ortp_warning					("rtcp_recv: strange... recv() returned zero.");			}			else if (!is_would_block_error(errnum))			{				if (session->on_network_error.count>0){					rtp_signal_table_emit3(&session->on_network_error,(long)"Error receiving RTCP packet",INT_TO_POINTER(errnum));				}else ortp_warning("Error receiving RTCP packet: %s.",getSocketError());			}			/* don't free the cached_mp, it will be reused next time */			return -1;	/* avoids an infinite loop ! */		}	}	return error;}static void payload_type_changed_notify(RtpSession *session, int paytype){	session->recv_pt = paytype;	rtp_signal_table_emit (&session->on_payload_type_changed);	}/** *rtp_session_recvm_with_ts: *@session: a rtp session. *@user_ts:	a timestamp. * *	Try to get a rtp packet presented as a mblk_t structure from the rtp session. *	The @user_ts parameter is relative to the first timestamp of the incoming stream. In other *	words, the application does not have to know the first timestamp of the stream, it can *	simply call for the first time this function with @user_ts=0, and then incrementing it *	as it want. The RtpSession takes care of synchronisation between the stream timestamp *	and the user timestamp given here. * *Returns: a rtp packet presented as a mblk_t.**/mblk_t *rtp_session_recvm_with_ts (RtpSession * session, uint32_t user_ts){	mblk_t *mp = NULL;	rtp_header_t *rtp;	uint32_t ts;	uint32_t packet_time;	RtpScheduler *sched=session->sched;	RtpStream *stream=&session->rtp;	int rejected=0;	/* if we are scheduled, remember the scheduler time at which the application has	 * asked for its first timestamp */	if (session->flags & RTP_SESSION_RECV_NOT_STARTED)	{		session->rtp.rcv_query_ts_offset = user_ts;        /* Set initial last_rcv_time to first recv time. */        if ((session->flags & RTP_SESSION_SEND_NOT_STARTED)            || session->mode == RTP_SESSION_RECVONLY)        {            gettimeofday(&session->last_recv_time, NULL);        }		if (session->flags & RTP_SESSION_SCHEDULED)		{			session->rtp.rcv_time_offset = sched->time_;			//ortp_message("setting snd_time_offset=%i",session->rtp.snd_time_offset);		}		rtp_session_unset_flag (session,RTP_SESSION_RECV_NOT_STARTED);	}	session->rtp.rcv_last_app_ts = user_ts;	rtp_recv (session, user_ts);	rtcp_recv(session);	/* check for telephone event first */	/* first lock the session */	rtp_session_lock (session);	mp=getq(&session->rtp.tev_rq);	if (mp!=NULL){		rtp_signal_table_emit2(&session->on_telephone_event_packet,(long)mp);		if (session->on_telephone_event.count>0){			rtp_session_check_telephone_events(session,mp);		}		freemsg(mp);		mp=NULL;	}		/* then now try to return a media packet, if possible */	/* first condition: if the session is starting, don't return anything	 * until the queue size reaches jitt_comp */		if (session->flags & RTP_SESSION_RECV_SYNC)	{		rtp_header_t *oldest, *newest;		queue_t *q = &session->rtp.rq;		if (qempty(q))		{			ortp_debug ("Queue is empty.");			goto end;		}		oldest = (rtp_header_t *) qfirst(q)->b_rptr;		newest = (rtp_header_t *) qlast(q)->b_rptr;		if ((uint32_t) (newest->timestamp - oldest->timestamp) <		    (uint32_t) session->rtp.jittctl.jitt_comp_ts)		{			ortp_debug("Not enough packet bufferised.");			goto end;		}		/* enough packet bufferised */		mp = getq (&session->rtp.rq);		rtp = (rtp_header_t *) mp->b_rptr;		session->rtp.rcv_ts_offset = rtp->timestamp;		/* remember the timestamp offset between the stream timestamp (random)		 * and the user timestamp, that very often starts at zero */		session->rtp.rcv_diff_ts = rtp->timestamp - user_ts;		/* remember the difference between the last received on the socket timestamp and the user timestamp */		session->rtp.hwrcv_diff_ts=session->rtp.rcv_diff_ts + session->rtp.jittctl.jitt_comp_ts;		session->rtp.rcv_last_ret_ts = user_ts;	/* just to have an init value */		session->rtp.rcv_last_ts = rtp->timestamp;		session->recv_ssrc = rtp->ssrc;		/* delete the recv synchronisation flag */		rtp_session_unset_flag (session, RTP_SESSION_RECV_SYNC);		ortp_debug("Returning FIRST packet with ts=%i, hwrcv_diff_ts=%i, rcv_diff_ts=%i", rtp->timestamp,					session->rtp.hwrcv_diff_ts,session->rtp.rcv_diff_ts);		goto end;	}	/* else this the normal case */	/*calculate the stream timestamp from the user timestamp */	ts = user_ts + session->rtp.rcv_diff_ts;	session->rtp.rcv_last_ts = ts;	mp = rtp_getq (&session->rtp.rq, ts,&rejected);		stream->stats.skipped+=rejected;	ortp_global_stats.skipped+=rejected;	goto end;      end:	if (mp != NULL)	{		int msgsize = msgdsize (mp);	/* evaluate how much bytes (including header) is received by app */		uint32_t packet_ts;		ortp_global_stats.recv += msgsize;		stream->stats.recv += msgsize;		rtp = (rtp_header_t *) mp->b_rptr;		packet_ts=rtp->timestamp;		ortp_debug("Returning mp with ts=%i", packet_ts);		/* check for payload type changes */		if (session->recv_pt != rtp->paytype)		{			payload_type_changed_notify(session, rtp->paytype);		}		/* patch the packet so that it has a timestamp compensated by the 		adaptive jitter buffer mechanism */		if (session->rtp.jittctl.adaptive){			rtp->timestamp-=session->rtp.jittctl.corrective_slide;			/*printf("Returned packet has timestamp %u, with clock slide compensated it is %u\n",packet_ts,rtp->timestamp);*/		}	}	else	{		ortp_debug ("No mp for timestamp queried");		stream->stats.unavaillable++;		ortp_global_stats.unavaillable++;	}	rtp_session_rtcp_process(session);	rtp_session_unlock (session);		if (session->flags & RTP_SESSION_SCHEDULED)

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?