⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rtpsession_inet.c

📁 mediastreamer2是开源的网络传输媒体流的库
💻 C
📖 第 1 页 / 共 2 页
字号:
#endif	return dest;}/** *rtp_session_set_remote_addr: *@session:		a rtp session freshly created. *@addr:		a local IP address in the xxx.xxx.xxx.xxx form. *@port:		a local port. * *	Sets the remote address of the rtp session, ie the destination address where rtp packet *	are sent. If the session is recv-only or duplex, it also sets the origin of incoming RTP  *	packets. Rtp packets that don't come from addr:port are discarded. * *	Returns: 0 on success.**/intrtp_session_set_remote_addr (RtpSession * session, const char * addr, int port){	return rtp_session_set_remote_addr_full (session, addr, port, port+1);}/** *rtp_session_set_remote_addr_full: *@session:		a rtp session freshly created. *@addr:		a local IP address in the xxx.xxx.xxx.xxx form. *@rtp_port:		a local rtp port. *@rtcp_port:		a local rtcp port. * *	Sets the remote address of the rtp session, ie the destination address where rtp packet *	are sent. If the session is recv-only or duplex, it also sets the origin of incoming RTP  *	packets. Rtp packets that don't come from addr:port are discarded. * *	Returns: 0 on success.**/intrtp_session_set_remote_addr_full (RtpSession * session, const char * addr, int rtp_port, int rtcp_port){	int err;#ifdef ORTP_INET6	struct addrinfo hints, *res0, *res;	char num[8];	memset(&hints, 0, sizeof(hints));	hints.ai_family = PF_UNSPEC;	hints.ai_socktype = SOCK_DGRAM;	snprintf(num, sizeof(num), "%d", rtp_port);	err = getaddrinfo(addr, num, &hints, &res0);	if (err) {		ortp_warning ("Error in socket address: %s", gai_strerror(err));		return -1;	}#endif	if (session->rtp.socket == -1){		/* the session has not its socket bound, do it */		ortp_message ("Setting random local addresses.");#ifdef ORTP_INET6		/* bind to an address type that matches the destination address */		if (res0->ai_addr->sa_family==AF_INET6)			err = rtp_session_set_local_addr (session, "::", -1);		else err=rtp_session_set_local_addr (session, "0.0.0.0", -1);#else		err = rtp_session_set_local_addr (session, "0.0.0.0", -1);#endif		if (err<0) return -1;	}#ifdef ORTP_INET6	err=1;	for (res = res0; res; res = res->ai_next) {		/* set a destination address that has the same type as the local address */		if (res->ai_family==session->rtp.sockfamily ) {			memcpy( &session->rtp.rem_addr, res->ai_addr, res->ai_addrlen);			session->rtp.rem_addrlen=res->ai_addrlen;		  	err=0;		  	break;		}	}	freeaddrinfo(res0);	if (err) {		ortp_warning("Could not set destination for RTP socket to %s:%i.",addr,rtp_port);		return -1;	}		memset(&hints, 0, sizeof(hints));	hints.ai_family = PF_UNSPEC;	hints.ai_socktype = SOCK_DGRAM;	snprintf(num, sizeof(num), "%d", rtcp_port);	err = getaddrinfo(addr, num, &hints, &res0);	if (err) {		ortp_warning ("Error: %s", gai_strerror(err));		return err;	}	err=1;	for (res = res0; res; res = res->ai_next) {		/* set a destination address that has the same type as the local address */		if (res->ai_family==session->rtp.sockfamily ) {		  	err=0;		  	memcpy( &session->rtcp.rem_addr, res->ai_addr, res->ai_addrlen);			session->rtcp.rem_addrlen=res->ai_addrlen;		  	break;		}	}	freeaddrinfo(res0);	if (err) {		ortp_warning("Could not set destination for RCTP socket to %s:%i.",addr,rtcp_port);		return -1;	}#else	session->rtp.rem_addrlen=sizeof(session->rtp.rem_addr);	session->rtp.rem_addr.sin_family = AF_INET;	err = inet_aton (addr, &session->rtp.rem_addr.sin_addr);	if (err < 0)	{		ortp_warning ("Error in socket address:%s.", getSocketError());		return err;	}	session->rtp.rem_addr.sin_port = htons (rtp_port);	memcpy (&session->rtcp.rem_addr, &session->rtp.rem_addr,		sizeof (struct sockaddr_in));	session->rtcp.rem_addr.sin_port = htons (rtcp_port);	session->rtcp.rem_addrlen=sizeof(session->rtcp.rem_addr);#endif	if (can_connect(session)){		if (try_connect(session->rtp.socket,(struct sockaddr*)&session->rtp.rem_addr,session->rtp.rem_addrlen))			session->flags|=RTP_SOCKET_CONNECTED;		if (session->rtcp.socket>=0){			if (try_connect(session->rtcp.socket,(struct sockaddr*)&session->rtcp.rem_addr,session->rtcp.rem_addrlen))				session->flags|=RTCP_SOCKET_CONNECTED;		}	}else if (session->flags & RTP_SOCKET_CONNECTED){		/*must dissolve association done by connect().		See connect(2) manpage*/		struct sockaddr sa;		sa.sa_family=AF_UNSPEC;		if (connect(session->rtp.socket,&sa,sizeof(sa))<0){			ortp_error("Cannot dissolve connect() association for rtp socket: %s", getSocketError());		}		if (connect(session->rtcp.socket,&sa,sizeof(sa))<0){			ortp_error("Cannot dissolve connect() association for rtcp socket: %s", getSocketError());		}		session->flags&=~RTP_SOCKET_CONNECTED;		session->flags&=~RTCP_SOCKET_CONNECTED;	}	return 0;}intrtp_session_set_remote_addr_and_port(RtpSession * session, const char * addr, int rtp_port, int rtcp_port){	return rtp_session_set_remote_addr_full(session,addr,rtp_port,rtcp_port);}void rtp_session_set_sockets(RtpSession *session, int rtpfd, int rtcpfd){	if (rtpfd>=0) set_non_blocking_socket(rtpfd);	if (rtcpfd>=0) set_non_blocking_socket(rtcpfd);	session->rtp.socket=rtpfd;	session->rtcp.socket=rtcpfd;	if (rtpfd>=0 || rtcpfd>=0 )		session->flags|=(RTP_SESSION_USING_EXT_SOCKETS|RTP_SOCKET_CONNECTED|RTCP_SOCKET_CONNECTED);	else session->flags&=~(RTP_SESSION_USING_EXT_SOCKETS|RTP_SOCKET_CONNECTED|RTCP_SOCKET_CONNECTED);}void rtp_session_set_transports(RtpSession *session, struct _RtpTransport *rtptr, struct _RtpTransport *rtcptr){	session->rtp.tr = rtptr;	session->rtcp.tr = rtcptr;	if (rtptr)		rtptr->session=session;	if (rtcptr)		rtcptr->session=session;	if (rtptr || rtcptr )		session->flags|=(RTP_SESSION_USING_TRANSPORT);	else session->flags&=~(RTP_SESSION_USING_TRANSPORT);}/** *rtp_session_flush_sockets: *@session: a rtp session * * Flushes the sockets for all pending incoming packets. * This can be usefull if you did not listen to the stream for a while * and wishes to start to receive again. During the time no receive is made * packets get bufferised into the internal kernel socket structure. ***/void rtp_session_flush_sockets(RtpSession *session){	unsigned char trash[4096];#ifdef ORTP_INET6	struct sockaddr_storage from;#else	struct sockaddr from;#endif	socklen_t fromlen=sizeof(from);	if (rtp_session_using_transport(session, rtp))	  {		mblk_t *trashmp=esballoc(trash,sizeof(trash),0,NULL);			    while (session->rtp.tr->t_recvfrom(session->rtp.tr,trashmp,0,(struct sockaddr *)&from,&fromlen)>0){};	    if (session->rtcp.tr)	      while (session->rtcp.tr->t_recvfrom(session->rtcp.tr,trashmp,0,(struct sockaddr *)&from,&fromlen)>0){};		freemsg(trashmp);	    return;	  }	if (session->rtp.socket>=0){		while (recvfrom(session->rtp.socket,trash,sizeof(trash),0,(struct sockaddr *)&from,&fromlen)>0){};	}	if (session->rtcp.socket>=0){		while (recvfrom(session->rtcp.socket,trash,sizeof(trash),0,(struct sockaddr*)&from,&fromlen)>0){};	}}#ifdef USE_SENDMSG #define MAX_IOV 30static int rtp_sendmsg(int sock,mblk_t *m, struct sockaddr *rem_addr, int addr_len){	int error;	struct msghdr msg;	struct iovec iov[MAX_IOV];	int iovlen;	for(iovlen=0; iovlen<MAX_IOV && m!=NULL; m=m->b_cont,iovlen++){		iov[iovlen].iov_base=m->b_rptr;		iov[iovlen].iov_len=m->b_wptr-m->b_rptr;	}	if (iovlen==MAX_IOV){		ortp_error("Too long msgb, didn't fit into iov, end discarded.");	}	msg.msg_name=(void*)rem_addr;	msg.msg_namelen=addr_len;	msg.msg_iov=&iov[0];	msg.msg_iovlen=iovlen;	msg.msg_control=NULL;	msg.msg_controllen=0;	msg.msg_flags=0;	error=sendmsg(sock,&msg,0);	return error;}#endif	#define IP_UDP_OVERHEAD (20+8)static void update_sent_bytes(RtpSession*s, int nbytes){	if (s->rtp.sent_bytes==0){		gettimeofday(&s->rtp.send_bw_start,NULL);	}	s->rtp.sent_bytes+=nbytes+IP_UDP_OVERHEAD;}static void update_recv_bytes(RtpSession*s, int nbytes){	if (s->rtp.recv_bytes==0){		gettimeofday(&s->rtp.recv_bw_start,NULL);	}	s->rtp.recv_bytes+=nbytes+IP_UDP_OVERHEAD;}intrtp_session_rtp_send (RtpSession * session, mblk_t * m){	int error;	int i;	rtp_header_t *hdr;	struct sockaddr *destaddr=(struct sockaddr*)&session->rtp.rem_addr;	socklen_t destlen=session->rtp.rem_addrlen;	ortp_socket_t sockfd=session->rtp.socket;	hdr = (rtp_header_t *) m->b_rptr;	/* perform host to network conversions */	hdr->ssrc = htonl (hdr->ssrc);	hdr->timestamp = htonl (hdr->timestamp);	hdr->seq_number = htons (hdr->seq_number);	for (i = 0; i < hdr->cc; i++)		hdr->csrc[i] = htonl (hdr->csrc[i]);	if (session->flags & RTP_SOCKET_CONNECTED) {		destaddr=NULL;		destlen=0;	}	if (rtp_session_using_transport(session, rtp)){		error = (session->rtp.tr->t_sendto) (session->rtp.tr,m,0,destaddr,destlen);	}else{#ifdef USE_SENDMSG		error=rtp_sendmsg(sockfd,m,destaddr,destlen);#else		if (m->b_cont!=NULL)			msgpullup(m,-1);		error = sendto (sockfd, m->b_rptr, (int) (m->b_wptr - m->b_rptr),			 0,destaddr,destlen);#endif	}	if (error < 0){		if (session->on_network_error.count>0){			rtp_signal_table_emit3(&session->on_network_error,(long)"Error sending RTP packet",INT_TO_POINTER(getSocketErrorCode()));		}else ortp_warning ("Error sending rtp packet: %s ; socket=%i", getSocketError(), sockfd);		session->rtp.send_errno=getSocketErrorCode();	}else{		update_sent_bytes(session,error);	}	freemsg (m);	return error;}intrtp_session_rtcp_send (RtpSession * session, mblk_t * m){	int error=0;	ortp_socket_t sockfd=session->rtcp.socket;	struct sockaddr *destaddr=(struct sockaddr*)&session->rtcp.rem_addr;	socklen_t destlen=session->rtcp.rem_addrlen;	bool_t using_connected_socket=(session->flags & RTCP_SOCKET_CONNECTED)!=0;	if (using_connected_socket) {		destaddr=NULL;		destlen=0;	}	if (session->rtcp.enabled &&		( (sockfd>=0 && (session->rtcp.rem_addrlen>0 ||using_connected_socket))			|| rtp_session_using_transport(session, rtcp) ) ){		if (rtp_session_using_transport(session, rtcp)){			error = (session->rtcp.tr->t_sendto) (session->rtcp.tr, m, 0,			destaddr, destlen);		}		else{#ifdef USE_SENDMSG			error=rtp_sendmsg(sockfd,m,destaddr, destlen);#else			if (m->b_cont!=NULL){				msgpullup(m,-1);			}			error = sendto (sockfd, m->b_rptr,			(int) (m->b_wptr - m->b_rptr), 0,			destaddr, destlen);#endif		}		if (error < 0){			char host[65];			if (session->on_network_error.count>0){				rtp_signal_table_emit3(&session->on_network_error,(long)"Error sending RTCP packet",INT_TO_POINTER(getSocketErrorCode()));			}else ortp_warning ("Error sending rtcp packet: %s ; socket=%i; addr=%s", getSocketError(), session->rtcp.socket, ortp_inet_ntoa((struct sockaddr*)&session->rtcp.rem_addr,session->rtcp.rem_addrlen,host,sizeof(host)) );		}	}else ortp_message("Not sending rtcp report: sockfd=%i, rem_addrlen=%i, connected=%i",sockfd,session->rtcp.rem_addrlen,using_connected_socket);	freemsg (m);	return error;}intrtp_session_rtp_recv (RtpSession * session, uint32_t user_ts){	int error;	ortp_socket_t sockfd=session->rtp.socket;#ifdef ORTP_INET6	struct sockaddr_storage remaddr;#else	struct sockaddr remaddr;#endif	socklen_t addrlen = sizeof (remaddr);	mblk_t *mp;		if ((sockfd<0) && !rtp_session_using_transport(session, rtp)) return -1;  /*session has no sockets for the moment*/	while (1)	{		int bufsz;		bool_t sock_connected=!!(session->flags & RTP_SOCKET_CONNECTED);		if (session->rtp.cached_mp==NULL)			 session->rtp.cached_mp = msgb_allocator_alloc(&session->allocator,session->recv_buf_size);		mp=session->rtp.cached_mp;		bufsz=(int) (mp->b_datap->db_lim - mp->b_datap->db_base);		if (sock_connected){			error=recv(sockfd,mp->b_wptr,bufsz,0);		}else if (rtp_session_using_transport(session, rtp)) 			error = (session->rtp.tr->t_recvfrom)(session->rtp.tr, mp, 0,				  (struct sockaddr *) &remaddr,				  &addrlen);		else error = recvfrom(sockfd, mp->b_wptr,				  bufsz, 0,				  (struct sockaddr *) &remaddr,				  &addrlen);		if (error > 0){			if (session->symmetric_rtp && !sock_connected){				if (session->use_connect){					/* store the sender rtp address to do symmetric RTP */					memcpy(&session->rtp.rem_addr,&remaddr,addrlen);					session->rtp.rem_addrlen=addrlen;					if (try_connect(sockfd,(struct sockaddr*)&remaddr,addrlen))						session->flags|=RTP_SOCKET_CONNECTED;				}			}			/* then parse the message and put on queue */			mp->b_wptr+=error;			rtp_session_rtp_parse (session, mp, user_ts, (struct sockaddr*)&remaddr,addrlen);			session->rtp.cached_mp=NULL;			/*for bandwidth measurements:*/			update_recv_bytes(session,error);		}		else		{		 	int errnum=getSocketErrorCode();			if (error == 0)			{				ortp_warning					("rtp_recv: strange... recv() returned zero.");			}			else if (!is_would_block_error(errnum))			{				if (session->on_network_error.count>0){					rtp_signal_table_emit3(&session->on_network_error,(long)"Error receiving RTP packet",INT_TO_POINTER(getSocketErrorCode()));				}else ortp_warning("Error receiving RTP packet: %s.",getSocketError());			}			/* don't free the cached_mp, it will be reused next time */			return -1;	/* avoids an infinite loop ! */		}	}	return error;}void rtp_session_notify_inc_rtcp(RtpSession *session, mblk_t *m){	if (session->eventqs!=NULL){		OrtpEvent *ev=ortp_event_new(ORTP_EVENT_RTCP_PACKET_RECEIVED);		OrtpEventData *d=ortp_event_get_data(ev);		d->packet=m;		rtp_session_dispatch_event(session,ev);	}	else freemsg(m);  /* avoid memory leak */}intrtp_session_rtcp_recv (RtpSession * session){	int error;#ifdef ORTP_INET6	struct sockaddr_storage remaddr;#else	struct sockaddr remaddr;#endif	socklen_t addrlen=0;	mblk_t *mp;	if (session->rtcp.socket<0 && !rtp_session_using_transport(session, rtcp)) return -1;  /*session has no rtcp sockets for the moment*/		while (1)	{		bool_t sock_connected=!!(session->flags & RTCP_SOCKET_CONNECTED);		if (session->rtcp.cached_mp==NULL)			 session->rtcp.cached_mp = allocb (RTCP_MAX_RECV_BUFSIZE, 0);				mp=session->rtcp.cached_mp;		if (sock_connected){			error=recv(session->rtcp.socket,mp->b_wptr,RTCP_MAX_RECV_BUFSIZE,0);		}else {			addrlen=sizeof (remaddr);			if (rtp_session_using_transport(session, rtcp))			  error=(session->rtcp.tr->t_recvfrom)(session->rtcp.tr, mp, 0,				  (struct sockaddr *) &remaddr,				  &addrlen);			else			  error=recvfrom (session->rtcp.socket, mp->b_wptr,				  RTCP_MAX_RECV_BUFSIZE, 0,				  (struct sockaddr *) &remaddr,				  &addrlen);		}		if (error > 0)		{			mp->b_wptr += error;			/* post an event to notify the application*/			{				rtp_session_notify_inc_rtcp(session,mp);			}			session->rtcp.cached_mp=NULL;			if (session->symmetric_rtp && !sock_connected){				/* store the sender rtp address to do symmetric RTP */				memcpy(&session->rtcp.rem_addr,&remaddr,addrlen);				session->rtcp.rem_addrlen=addrlen;				if (session->use_connect){					if (try_connect(session->rtcp.socket,(struct sockaddr*)&remaddr,addrlen))						session->flags|=RTCP_SOCKET_CONNECTED;				}			}		}		else		{			int errnum=getSocketErrorCode();			if (error == 0)			{				ortp_warning					("rtcp_recv: strange... recv() returned zero.");			}			else if (!is_would_block_error(errnum))			{				if (session->on_network_error.count>0){					rtp_signal_table_emit3(&session->on_network_error,(long)"Error receiving RTCP packet",INT_TO_POINTER(errnum));				}else ortp_warning("Error receiving RTCP packet: %s.",getSocketError());				session->rtp.recv_errno=errnum;			}			/* don't free the cached_mp, it will be reused next time */			return -1;	/* avoids an infinite loop ! */		}	}	return error;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -