📄 rtpsession_inet.c
字号:
#endif return dest;}/** *rtp_session_set_remote_addr: *@session: a rtp session freshly created. *@addr: a local IP address in the xxx.xxx.xxx.xxx form. *@port: a local port. * * Sets the remote address of the rtp session, ie the destination address where rtp packet * are sent. If the session is recv-only or duplex, it also sets the origin of incoming RTP * packets. Rtp packets that don't come from addr:port are discarded. * * Returns: 0 on success.**/intrtp_session_set_remote_addr (RtpSession * session, const char * addr, int port){ return rtp_session_set_remote_addr_full (session, addr, port, port+1);}/** *rtp_session_set_remote_addr_full: *@session: a rtp session freshly created. *@addr: a local IP address in the xxx.xxx.xxx.xxx form. *@rtp_port: a local rtp port. *@rtcp_port: a local rtcp port. * * Sets the remote address of the rtp session, ie the destination address where rtp packet * are sent. If the session is recv-only or duplex, it also sets the origin of incoming RTP * packets. Rtp packets that don't come from addr:port are discarded. * * Returns: 0 on success.**/intrtp_session_set_remote_addr_full (RtpSession * session, const char * addr, int rtp_port, int rtcp_port){ int err;#ifdef ORTP_INET6 struct addrinfo hints, *res0, *res; char num[8]; memset(&hints, 0, sizeof(hints)); hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_DGRAM; snprintf(num, sizeof(num), "%d", rtp_port); err = getaddrinfo(addr, num, &hints, &res0); if (err) { ortp_warning ("Error in socket address: %s", gai_strerror(err)); return -1; }#endif if (session->rtp.socket == -1){ /* the session has not its socket bound, do it */ ortp_message ("Setting random local addresses.");#ifdef ORTP_INET6 /* bind to an address type that matches the destination address */ if (res0->ai_addr->sa_family==AF_INET6) err = rtp_session_set_local_addr (session, "::", -1); else err=rtp_session_set_local_addr (session, "0.0.0.0", -1);#else err = rtp_session_set_local_addr (session, "0.0.0.0", -1);#endif if (err<0) return -1; }#ifdef ORTP_INET6 err=1; for (res = res0; res; res = res->ai_next) { /* set a destination address that has the same type as the local address */ if (res->ai_family==session->rtp.sockfamily ) { memcpy( &session->rtp.rem_addr, res->ai_addr, res->ai_addrlen); session->rtp.rem_addrlen=res->ai_addrlen; err=0; break; } } freeaddrinfo(res0); if (err) { ortp_warning("Could not set destination for RTP socket to %s:%i.",addr,rtp_port); return -1; } memset(&hints, 0, sizeof(hints)); hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_DGRAM; snprintf(num, sizeof(num), "%d", rtcp_port); err = getaddrinfo(addr, num, &hints, &res0); if (err) { ortp_warning ("Error: %s", gai_strerror(err)); return err; } err=1; for (res = res0; res; res = res->ai_next) { /* set a destination address that has the same type as the local address */ if (res->ai_family==session->rtp.sockfamily ) { err=0; memcpy( &session->rtcp.rem_addr, res->ai_addr, res->ai_addrlen); session->rtcp.rem_addrlen=res->ai_addrlen; break; } } freeaddrinfo(res0); if (err) { ortp_warning("Could not set destination for RCTP socket to %s:%i.",addr,rtcp_port); return -1; }#else session->rtp.rem_addrlen=sizeof(session->rtp.rem_addr); session->rtp.rem_addr.sin_family = AF_INET; err = inet_aton (addr, &session->rtp.rem_addr.sin_addr); if (err < 0) { ortp_warning ("Error in socket address:%s.", getSocketError()); return err; } session->rtp.rem_addr.sin_port = htons (rtp_port); memcpy (&session->rtcp.rem_addr, &session->rtp.rem_addr, sizeof (struct sockaddr_in)); session->rtcp.rem_addr.sin_port = htons (rtcp_port); session->rtcp.rem_addrlen=sizeof(session->rtcp.rem_addr);#endif if (can_connect(session)){ if (try_connect(session->rtp.socket,(struct sockaddr*)&session->rtp.rem_addr,session->rtp.rem_addrlen)) session->flags|=RTP_SOCKET_CONNECTED; if (session->rtcp.socket>=0){ if (try_connect(session->rtcp.socket,(struct sockaddr*)&session->rtcp.rem_addr,session->rtcp.rem_addrlen)) session->flags|=RTCP_SOCKET_CONNECTED; } }else if (session->flags & RTP_SOCKET_CONNECTED){ /*must dissolve association done by connect(). See connect(2) manpage*/ struct sockaddr sa; sa.sa_family=AF_UNSPEC; if (connect(session->rtp.socket,&sa,sizeof(sa))<0){ ortp_error("Cannot dissolve connect() association for rtp socket: %s", getSocketError()); } if (connect(session->rtcp.socket,&sa,sizeof(sa))<0){ ortp_error("Cannot dissolve connect() association for rtcp socket: %s", getSocketError()); } session->flags&=~RTP_SOCKET_CONNECTED; session->flags&=~RTCP_SOCKET_CONNECTED; } return 0;}intrtp_session_set_remote_addr_and_port(RtpSession * session, const char * addr, int rtp_port, int rtcp_port){ return rtp_session_set_remote_addr_full(session,addr,rtp_port,rtcp_port);}void rtp_session_set_sockets(RtpSession *session, int rtpfd, int rtcpfd){ if (rtpfd>=0) set_non_blocking_socket(rtpfd); if (rtcpfd>=0) set_non_blocking_socket(rtcpfd); session->rtp.socket=rtpfd; session->rtcp.socket=rtcpfd; if (rtpfd>=0 || rtcpfd>=0 ) session->flags|=(RTP_SESSION_USING_EXT_SOCKETS|RTP_SOCKET_CONNECTED|RTCP_SOCKET_CONNECTED); else session->flags&=~(RTP_SESSION_USING_EXT_SOCKETS|RTP_SOCKET_CONNECTED|RTCP_SOCKET_CONNECTED);}void rtp_session_set_transports(RtpSession *session, struct _RtpTransport *rtptr, struct _RtpTransport *rtcptr){ session->rtp.tr = rtptr; session->rtcp.tr = rtcptr; if (rtptr) rtptr->session=session; if (rtcptr) rtcptr->session=session; if (rtptr || rtcptr ) session->flags|=(RTP_SESSION_USING_TRANSPORT); else session->flags&=~(RTP_SESSION_USING_TRANSPORT);}/** *rtp_session_flush_sockets: *@session: a rtp session * * Flushes the sockets for all pending incoming packets. * This can be usefull if you did not listen to the stream for a while * and wishes to start to receive again. During the time no receive is made * packets get bufferised into the internal kernel socket structure. ***/void rtp_session_flush_sockets(RtpSession *session){ unsigned char trash[4096];#ifdef ORTP_INET6 struct sockaddr_storage from;#else struct sockaddr from;#endif socklen_t fromlen=sizeof(from); if (rtp_session_using_transport(session, rtp)) { mblk_t *trashmp=esballoc(trash,sizeof(trash),0,NULL); while (session->rtp.tr->t_recvfrom(session->rtp.tr,trashmp,0,(struct sockaddr *)&from,&fromlen)>0){}; if (session->rtcp.tr) while (session->rtcp.tr->t_recvfrom(session->rtcp.tr,trashmp,0,(struct sockaddr *)&from,&fromlen)>0){}; freemsg(trashmp); return; } if (session->rtp.socket>=0){ while (recvfrom(session->rtp.socket,trash,sizeof(trash),0,(struct sockaddr *)&from,&fromlen)>0){}; } if (session->rtcp.socket>=0){ while (recvfrom(session->rtcp.socket,trash,sizeof(trash),0,(struct sockaddr*)&from,&fromlen)>0){}; }}#ifdef USE_SENDMSG #define MAX_IOV 30static int rtp_sendmsg(int sock,mblk_t *m, struct sockaddr *rem_addr, int addr_len){ int error; struct msghdr msg; struct iovec iov[MAX_IOV]; int iovlen; for(iovlen=0; iovlen<MAX_IOV && m!=NULL; m=m->b_cont,iovlen++){ iov[iovlen].iov_base=m->b_rptr; iov[iovlen].iov_len=m->b_wptr-m->b_rptr; } if (iovlen==MAX_IOV){ ortp_error("Too long msgb, didn't fit into iov, end discarded."); } msg.msg_name=(void*)rem_addr; msg.msg_namelen=addr_len; msg.msg_iov=&iov[0]; msg.msg_iovlen=iovlen; msg.msg_control=NULL; msg.msg_controllen=0; msg.msg_flags=0; error=sendmsg(sock,&msg,0); return error;}#endif #define IP_UDP_OVERHEAD (20+8)static void update_sent_bytes(RtpSession*s, int nbytes){ if (s->rtp.sent_bytes==0){ gettimeofday(&s->rtp.send_bw_start,NULL); } s->rtp.sent_bytes+=nbytes+IP_UDP_OVERHEAD;}static void update_recv_bytes(RtpSession*s, int nbytes){ if (s->rtp.recv_bytes==0){ gettimeofday(&s->rtp.recv_bw_start,NULL); } s->rtp.recv_bytes+=nbytes+IP_UDP_OVERHEAD;}intrtp_session_rtp_send (RtpSession * session, mblk_t * m){ int error; int i; rtp_header_t *hdr; struct sockaddr *destaddr=(struct sockaddr*)&session->rtp.rem_addr; socklen_t destlen=session->rtp.rem_addrlen; ortp_socket_t sockfd=session->rtp.socket; hdr = (rtp_header_t *) m->b_rptr; /* perform host to network conversions */ hdr->ssrc = htonl (hdr->ssrc); hdr->timestamp = htonl (hdr->timestamp); hdr->seq_number = htons (hdr->seq_number); for (i = 0; i < hdr->cc; i++) hdr->csrc[i] = htonl (hdr->csrc[i]); if (session->flags & RTP_SOCKET_CONNECTED) { destaddr=NULL; destlen=0; } if (rtp_session_using_transport(session, rtp)){ error = (session->rtp.tr->t_sendto) (session->rtp.tr,m,0,destaddr,destlen); }else{#ifdef USE_SENDMSG error=rtp_sendmsg(sockfd,m,destaddr,destlen);#else if (m->b_cont!=NULL) msgpullup(m,-1); error = sendto (sockfd, m->b_rptr, (int) (m->b_wptr - m->b_rptr), 0,destaddr,destlen);#endif } if (error < 0){ if (session->on_network_error.count>0){ rtp_signal_table_emit3(&session->on_network_error,(long)"Error sending RTP packet",INT_TO_POINTER(getSocketErrorCode())); }else ortp_warning ("Error sending rtp packet: %s ; socket=%i", getSocketError(), sockfd); session->rtp.send_errno=getSocketErrorCode(); }else{ update_sent_bytes(session,error); } freemsg (m); return error;}intrtp_session_rtcp_send (RtpSession * session, mblk_t * m){ int error=0; ortp_socket_t sockfd=session->rtcp.socket; struct sockaddr *destaddr=(struct sockaddr*)&session->rtcp.rem_addr; socklen_t destlen=session->rtcp.rem_addrlen; bool_t using_connected_socket=(session->flags & RTCP_SOCKET_CONNECTED)!=0; if (using_connected_socket) { destaddr=NULL; destlen=0; } if (session->rtcp.enabled && ( (sockfd>=0 && (session->rtcp.rem_addrlen>0 ||using_connected_socket)) || rtp_session_using_transport(session, rtcp) ) ){ if (rtp_session_using_transport(session, rtcp)){ error = (session->rtcp.tr->t_sendto) (session->rtcp.tr, m, 0, destaddr, destlen); } else{#ifdef USE_SENDMSG error=rtp_sendmsg(sockfd,m,destaddr, destlen);#else if (m->b_cont!=NULL){ msgpullup(m,-1); } error = sendto (sockfd, m->b_rptr, (int) (m->b_wptr - m->b_rptr), 0, destaddr, destlen);#endif } if (error < 0){ char host[65]; if (session->on_network_error.count>0){ rtp_signal_table_emit3(&session->on_network_error,(long)"Error sending RTCP packet",INT_TO_POINTER(getSocketErrorCode())); }else ortp_warning ("Error sending rtcp packet: %s ; socket=%i; addr=%s", getSocketError(), session->rtcp.socket, ortp_inet_ntoa((struct sockaddr*)&session->rtcp.rem_addr,session->rtcp.rem_addrlen,host,sizeof(host)) ); } }else ortp_message("Not sending rtcp report: sockfd=%i, rem_addrlen=%i, connected=%i",sockfd,session->rtcp.rem_addrlen,using_connected_socket); freemsg (m); return error;}intrtp_session_rtp_recv (RtpSession * session, uint32_t user_ts){ int error; ortp_socket_t sockfd=session->rtp.socket;#ifdef ORTP_INET6 struct sockaddr_storage remaddr;#else struct sockaddr remaddr;#endif socklen_t addrlen = sizeof (remaddr); mblk_t *mp; if ((sockfd<0) && !rtp_session_using_transport(session, rtp)) return -1; /*session has no sockets for the moment*/ while (1) { int bufsz; bool_t sock_connected=!!(session->flags & RTP_SOCKET_CONNECTED); if (session->rtp.cached_mp==NULL) session->rtp.cached_mp = msgb_allocator_alloc(&session->allocator,session->recv_buf_size); mp=session->rtp.cached_mp; bufsz=(int) (mp->b_datap->db_lim - mp->b_datap->db_base); if (sock_connected){ error=recv(sockfd,mp->b_wptr,bufsz,0); }else if (rtp_session_using_transport(session, rtp)) error = (session->rtp.tr->t_recvfrom)(session->rtp.tr, mp, 0, (struct sockaddr *) &remaddr, &addrlen); else error = recvfrom(sockfd, mp->b_wptr, bufsz, 0, (struct sockaddr *) &remaddr, &addrlen); if (error > 0){ if (session->symmetric_rtp && !sock_connected){ if (session->use_connect){ /* store the sender rtp address to do symmetric RTP */ memcpy(&session->rtp.rem_addr,&remaddr,addrlen); session->rtp.rem_addrlen=addrlen; if (try_connect(sockfd,(struct sockaddr*)&remaddr,addrlen)) session->flags|=RTP_SOCKET_CONNECTED; } } /* then parse the message and put on queue */ mp->b_wptr+=error; rtp_session_rtp_parse (session, mp, user_ts, (struct sockaddr*)&remaddr,addrlen); session->rtp.cached_mp=NULL; /*for bandwidth measurements:*/ update_recv_bytes(session,error); } else { int errnum=getSocketErrorCode(); if (error == 0) { ortp_warning ("rtp_recv: strange... recv() returned zero."); } else if (!is_would_block_error(errnum)) { if (session->on_network_error.count>0){ rtp_signal_table_emit3(&session->on_network_error,(long)"Error receiving RTP packet",INT_TO_POINTER(getSocketErrorCode())); }else ortp_warning("Error receiving RTP packet: %s.",getSocketError()); } /* don't free the cached_mp, it will be reused next time */ return -1; /* avoids an infinite loop ! */ } } return error;}void rtp_session_notify_inc_rtcp(RtpSession *session, mblk_t *m){ if (session->eventqs!=NULL){ OrtpEvent *ev=ortp_event_new(ORTP_EVENT_RTCP_PACKET_RECEIVED); OrtpEventData *d=ortp_event_get_data(ev); d->packet=m; rtp_session_dispatch_event(session,ev); } else freemsg(m); /* avoid memory leak */}intrtp_session_rtcp_recv (RtpSession * session){ int error;#ifdef ORTP_INET6 struct sockaddr_storage remaddr;#else struct sockaddr remaddr;#endif socklen_t addrlen=0; mblk_t *mp; if (session->rtcp.socket<0 && !rtp_session_using_transport(session, rtcp)) return -1; /*session has no rtcp sockets for the moment*/ while (1) { bool_t sock_connected=!!(session->flags & RTCP_SOCKET_CONNECTED); if (session->rtcp.cached_mp==NULL) session->rtcp.cached_mp = allocb (RTCP_MAX_RECV_BUFSIZE, 0); mp=session->rtcp.cached_mp; if (sock_connected){ error=recv(session->rtcp.socket,mp->b_wptr,RTCP_MAX_RECV_BUFSIZE,0); }else { addrlen=sizeof (remaddr); if (rtp_session_using_transport(session, rtcp)) error=(session->rtcp.tr->t_recvfrom)(session->rtcp.tr, mp, 0, (struct sockaddr *) &remaddr, &addrlen); else error=recvfrom (session->rtcp.socket, mp->b_wptr, RTCP_MAX_RECV_BUFSIZE, 0, (struct sockaddr *) &remaddr, &addrlen); } if (error > 0) { mp->b_wptr += error; /* post an event to notify the application*/ { rtp_session_notify_inc_rtcp(session,mp); } session->rtcp.cached_mp=NULL; if (session->symmetric_rtp && !sock_connected){ /* store the sender rtp address to do symmetric RTP */ memcpy(&session->rtcp.rem_addr,&remaddr,addrlen); session->rtcp.rem_addrlen=addrlen; if (session->use_connect){ if (try_connect(session->rtcp.socket,(struct sockaddr*)&remaddr,addrlen)) session->flags|=RTCP_SOCKET_CONNECTED; } } } else { int errnum=getSocketErrorCode(); if (error == 0) { ortp_warning ("rtcp_recv: strange... recv() returned zero."); } else if (!is_would_block_error(errnum)) { if (session->on_network_error.count>0){ rtp_signal_table_emit3(&session->on_network_error,(long)"Error receiving RTCP packet",INT_TO_POINTER(errnum)); }else ortp_warning("Error receiving RTCP packet: %s.",getSocketError()); session->rtp.recv_errno=errnum; } /* don't free the cached_mp, it will be reused next time */ return -1; /* avoids an infinite loop ! */ } } return error;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -