📄 rtpsession.c
字号:
}\ }while (0); #endif #define rtp_send(_session,_m) \ do{\ mblk_t *_destmp;\ if ((_session)->dest_mproto!=NULL){\ _destmp=dupb((_session)->dest_mproto);\ _destmp->b_cont=(_m);\ streams_put(putnext,(_session)->rtp.wq,(_destmp),(void*)(_session)->rtp.wq);\ } else {\ streams_put(putnext,(_session)->rtp.wq,(_m),(void*)(_session)->rtp.wq);\ }\ }while (0); #endif#elsestatic gintrtp_send (RtpSession * session, mblk_t * m){ gint error; int i; rtp_header_t *hdr; if (m->b_cont!=NULL){ mblk_t *newm; newm=msgpullup(m,-1); freemsg(m); m=newm; } hdr = (rtp_header_t *) m->b_rptr; hdr->ssrc = htonl (hdr->ssrc); hdr->timestamp = htonl (hdr->timestamp); hdr->seq_number = htons (hdr->seq_number); for (i = 0; i < hdr->cc; i++) hdr->csrc[i] = htonl (hdr->csrc[i]); error = sendto (session->rtp.socket, m->b_rptr, (m->b_wptr - m->b_rptr), 0, (struct sockaddr *) &session->rtp.rem_addr, sizeof (struct sockaddr_in)); if (error < 0) g_warning ("Error sending rtp packet: %s.", strerror (errno)); freemsg (m); return error;}#endif/** *rtp_session_set_jitter_compensation: *@session: a RtpSession *@milisec: the time interval in milisec to be jitter compensed. * * Sets the time interval for which packet are buffered instead of being delivered to the * application. **/voidrtp_session_set_jitter_compensation (RtpSession * session, gint milisec){ PayloadType *payload = rtp_profile_get_payload (session->profile, session-> payload_type); /* REVISIT: need to take in account the payload description */ session->rtp.jitt_comp = milisec; /* convert in timestamp unit: */ session->rtp.jitt_comp_time = (gint) (((double) milisec / 1000.0) * (payload->clock_rate));}/** *rtp_session_set_ssrc: *@session: a rtp session. *@ssrc: an unsigned 32bit integer representing the synchronisation source identifier (SSRC). * * Sets the SSRC of the session. ***/voidrtp_session_set_ssrc (RtpSession * session, guint32 ssrc){ session->ssrc = ssrc;}/** *rtp_session_set_payload_type: *@session: a rtp session *@paytype: the payload type * * Sets the payload type of the rtp session. It decides of the payload types written in the * of the rtp header for the outgoing stream, if the session is SENDRECV or SENDONLY. * For the incoming stream, it sets the waited payload type. If that value does not match * at any time this waited value, then the application can be informed by registering * for the "payload_type_changed" signal, so that it can make the necessary changes * on the downstream decoder that deals with the payload of the packets. * *Returns: 0 on success, -1 if the payload is not defined.**/intrtp_session_set_payload_type (RtpSession * session, int paytype){ session->payload_type = paytype; return 0;}/** *rtp_session_create_packet: *@session: a rtp session. *@header_size: the rtp header size. For standart size (without extensions), it is #RTP_FIXED_HEADER_SIZE *@payload :data to be copied into the rtp packet. *@payload_size : size of data carried by the rtp packet. * * Allocates a new rtp packet. In the header, ssrc and payload_type according to the session's * context. Timestamp and seq number are not set, there will be set when the packet is going to be * sent with rtp_session_sendm_with_ts(). * *Returns: a rtp packet in a mblk_t (message block) structure.**/mblk_t * rtp_session_create_packet(RtpSession *session,gint header_size, char *payload, gint payload_size){ mblk_t *mp; gint msglen=header_size+payload_size; rtp_header_t *rtp; mp=allocb(msglen,BPRI_MED);#ifdef _KERNEL if (mp==NULL) return NULL;#endif rtp=(rtp_header_t*)mp->b_rptr; rtp->version = 2; rtp->padbit = 0; rtp->extbit = 0; rtp->markbit= 0; rtp->cc = 0; //rtp_session_lock(session); rtp->paytype = session->payload_type; rtp->ssrc = session->ssrc; //rtp_session_unlock(session); rtp->timestamp = 0; /* set later, when packet is sended */ rtp->seq_number = 0; /*set later, when packet is sended */ /*copy the payload */ mp->b_wptr+=header_size; memcpy(mp->b_wptr,payload,payload_size); mp->b_wptr+=payload_size; return mp;}/** *rtp_session_sendm_with_ts: *@session : a rtp session. *@mp : a rtp packet presented as a mblk_t. *@timestamp: the timestamp of the data to be sent. Refer to the rfc to know what it is. * * Send the rtp datagram @mp to the destination set by rtp_session_set_remote_addr() * with timestamp @timestamp. For audio data, the timestamp is the number * of the first sample resulting of the data transmitted. See rfc1889 for details. * *Returns: the number of bytes sent over the network.**/gintrtp_session_sendm_with_ts (RtpSession * session, mblk_t *mp, guint32 timestamp){ rtp_header_t *rtp; guint32 packet_time; gint error = 0; gint msgsize;#ifdef BUILD_SCHEDULER RtpScheduler *sched;#endif if (session->flags & RTP_SESSION_SEND_NOT_STARTED) { session->rtp.snd_ts_offset = timestamp;#ifdef BUILD_SCHEDULER if (session->flags & RTP_SESSION_SCHEDULED) { sched = ortp_get_scheduler (); session->rtp.snd_time_offset = sched->time; //g_message("setting snd_time_offset=%i",session->rtp.snd_time_offset); }#endif rtp_session_unset_flag (session,RTP_SESSION_SEND_NOT_STARTED); } rtp=(rtp_header_t*)mp->b_rptr; msgsize = msgdsize(mp); rtp_session_lock (session); /* set a seq number */ rtp->seq_number=session->rtp.snd_seq; rtp->timestamp=timestamp; session->rtp.snd_seq++; session->rtp.snd_last_ts = timestamp; ortp_global_stats.sent += msgsize; session->stats.sent += msgsize; ortp_global_stats.packet_sent++; session->stats.packet_sent++;#ifdef TARGET_IS_HPUXKERNEL /* send directly the message through the stream */ rtp_send (session, mp);#else if (!(session->flags & RTP_SESSION_SCHEDULED)) { error = rtp_send (session, mp); } else { putq (session->rtp.wq, mp); }#endif rtp_session_unlock (session); /* if we are in blocking mode, then suspend the process until the scheduler sends the * packet */ /* if the timestamp of the packet queued is older than current time, then you we must * not block */#ifdef BUILD_SCHEDULER if (session->flags & RTP_SESSION_SCHEDULED) { sched = ortp_get_scheduler (); packet_time = rtp_session_ts_to_t (session, timestamp - session->rtp.snd_ts_offset) + session->rtp.snd_time_offset; //g_message("rtp_session_send_with_ts: packet_time=%i time=%i",packet_time,sched->time); if (TIME_IS_STRICTLY_NEWER_THAN (packet_time, sched->time)) { if (session->flags & RTP_SESSION_BLOCKING_MODE) { //g_message("waiting packet to be sent"); g_mutex_lock (session->rtp. wait_for_packet_to_be_sent_mutex); g_cond_wait (session->rtp. wait_for_packet_to_be_sent_cond, session->rtp. wait_for_packet_to_be_sent_mutex); g_mutex_unlock (session->rtp. wait_for_packet_to_be_sent_mutex); } session_set_clr(&sched->w_sessions,session); /* the session has written */ } else session_set_set(&sched->w_sessions,session); /*to indicate select to return immediately */ }#endif return error;}/** *rtp_session_send_with_ts: *@session: a rtp session. *@buffer: a buffer containing the data to be sent in a rtp packet. *@len: the length of the data buffer, in bytes. *@userts: the timestamp of the data to be sent. Refer to the rfc to know what it is. * * Send a rtp datagram to the destination set by rtp_session_set_remote_addr() containing * the data from @buffer with timestamp @userts. This is a high level function that uses * rtp_session_create_packet() and rtp_session_sendm_with_ts() to send the data. * * *Returns: the number of bytes sent over the network.**/gintrtp_session_send_with_ts (RtpSession * session, gchar * buffer, gint len, guint32 userts){ mblk_t *m; rtp_header_t *rtp; guint32 timestamp; guint32 packet_time; gint error = 0; gint msgsize; /* allocate a mblk_t, set the haeder. Perhaps if len>MTU, we should allocate a new * mlkt_t to split the packet FIXME */ msgsize = len + RTP_FIXED_HEADER_SIZE; m = rtp_session_create_packet(session,RTP_FIXED_HEADER_SIZE,buffer,len); if (m == NULL) { g_warning ("Could not allocate message block for sending packet."); return -1; } return rtp_session_sendm_with_ts(session,m,userts);}#ifdef TARGET_IS_HPUXKERNELstatic gintrtp_stack_recv (RtpSession * session){ return EOPNOTSUPP;}#elsestatic gintrtp_stack_recv (RtpSession * session){ gint error; struct sockaddr_in remaddr; int addrlen = sizeof (struct sockaddr_in); char *p; mblk_t *mp; fd_set fdset; struct timeval timeout = { 0, 0 }; FD_ZERO (&fdset); FD_SET (session->rtp.socket, &fdset); while (1) { error = select (session->rtp.socket + 1, &fdset, NULL, NULL, &timeout); if ((error == 1) && (FD_ISSET (session->rtp.socket, &fdset))) /* something to read */ { mp = allocb (session->max_buf_size, 0); error = recvfrom (session->rtp.socket, mp->b_wptr, session->max_buf_size, 0, (struct sockaddr *) &remaddr, &addrlen); if (error > 0) { /* resize the memory allocated to fit the udp message */ p = g_realloc (mp->b_wptr, error); if (p != mp->b_wptr) g_message ("The recv area has moved during reallocation."); mp->b_datap->db_base = mp->b_rptr = mp->b_wptr = p; mp->b_wptr += error; mp->b_datap->db_lim = mp->b_wptr; /* then put the new message on queue */ rtp_parse (session, mp); } else { if (error == 0) { g_warning ("rtp_stack_recv: strange... recv() returned zero."); } else if (errno != EWOULDBLOCK) { g_warning ("Error receiving udp packet: %s.", strerror (errno)); } freemsg (mp); } } else return 0; } return error;}#endif/** *rtp_session_recvm_with_ts: *@session: a rtp session. *@user_ts: a timestamp. * * Try to get a rtp packet presented as a mblk_t structure from the rtp session. * The @user_ts parameter is relative to the first timestamp of the incoming stream. In other * words, the application does not have to know the first timestamp of the stream, it can * simply call for the first time this function with @user_ts=0, and then incrementing it * as it want. The RtpSession takes care of synchronisation between the stream timestamp * and the user timestamp given here. * *Returns: a rtp packet presented as a mblk_t.**/mblk_t *rtp_session_recvm_with_ts (RtpSession * session, guint32 user_ts){ mblk_t *mp = NULL; rtp_header_t *rtp; guint32 ts; guint32 packet_time;#ifdef BUILD_SCHEDULER RtpScheduler *sched;#endif /* if we are scheduled, remember the scheduler time at which the application has * asked for its first timestamp */ if (session->flags & RTP_SESSION_RECV_NOT_STARTED) { session->rtp.rcv_query_ts_offset = user_ts;#ifdef BUILD_SCHEDULER if (session->flags & RTP_SESSION_SCHEDULED) { sched = ortp_get_scheduler (); session->rtp.rcv_time_offset = sched->time; //g_message("setting snd_time_offset=%i",session->rtp.snd_time_offset); }#endif rtp_session_unset_flag (session,RTP_SESSION_RECV_NOT_STARTED); } session->rtp.rcv_last_app_ts = user_ts;#ifdef TARGET_IS_HPUXKERNEL /* nothing to do: packets are enqueued on interrupt ! */#else if (!(session->flags & RTP_SESSION_SCHEDULED)) /* if not scheduled */ { rtp_stack_recv (session); }#endif /* then now try to return a packet, if possible */ /* first condition: if the session is starting, don't return anything * until the queue size reaches jitt_comp */ /* first lock the session */ rtp_session_lock (session); if (session->flags & RTP_SESSION_RECV_SYNC) { rtp_header_t *oldest, *newest; queue_t *q = session->rtp.rq; if (q->q_last == NULL) { g_message ("Queue is empty."); goto end; } oldest = (rtp_header_t *) q->q_first->b_rptr; newest = (rtp_header_t *) q->q_last->b_rptr; if ((guint32) (newest->timestamp - oldest->timestamp) < session->rtp.jitt_comp_time) { g_message ("Not enough packet bufferised."); goto end; } /* if enough packet queued continue but delete the starting flag */ rtp_session_unset_flag (session, RTP_SESSION_RECV_SYNC); mp = getq (session->rtp.rq); rtp = (rtp_header_t *) mp->b_rptr; session->rtp.rcv_ts_offset = rtp->timestamp; session->rtp.rcv_app_ts_offset = user_ts; /* and also remember the timestamp offset between the stream timestamp (random) * and the user timestamp, that very often starts at zero */ session->rtp.rcv_diff_ts = rtp->timestamp - user_ts; session->rtp.rcv_last_ret_ts = user_ts; /* just to have an init value */ session->ssrc = rtp->ssrc; g_message ("Returning FIRST mp with ts=%i", rtp->timestamp); goto end; } /* else this the normal case */ /*calculate the stream timestamp from the user timestamp */ ts = user_ts + session->rtp.rcv_diff_ts; session->rtp.rcv_last_ts = ts; mp = rtp_getq (session->rtp.rq, ts); /* perhaps we can now make some checks to see if a resynchronization is needed */ /* TODO */ goto end; end: if (mp != NULL) { int msgsize = msgdsize (mp); /* evaluate how much bytes (including header) is received by app */ ortp_global_stats.recv += msgsize; session->stats.recv += msgsize; rtp = (rtp_header_t *) mp->b_rptr; g_message ("Returning mp with ts=%i", rtp->timestamp); /* check for payload type changes */ if (session->payload_type != rtp->paytype) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -