📄 rtpsession.c
字号:
/* this is perhaps a telephony event */ if (rtp->paytype==session->telephone_events_pt){ rtp_signal_table_emit2(&session->on_telephone_event_packet,(gpointer)mp); if (session->on_telephone_event.count>0){ if (mp==NULL) { g_warning("mp is null!"); }else rtp_session_check_telephone_events(session,mp); } /************ warning**********/ /* we free the telephony event packet and the function will return NULL */ /* is this good ? */ freemsg(mp); mp=NULL; }else{ g_message ("rtp_parse: payload type changed to %i !", rtp->paytype); session->payload_type = rtp->paytype; rtp_signal_table_emit (&session->on_payload_type_changed); } } } else { g_message ("No mp for timestamp queried"); session->stats.unavaillable++; ortp_global_stats.unavaillable++; } rtp_session_unlock (session);#ifdef BUILD_SCHEDULER if (session->flags & RTP_SESSION_SCHEDULED) { /* if we are in blocking mode, then suspend the calling process until timestamp * wanted expires */ /* but we must not block the process if the timestamp wanted by the application is older * than current time */ sched = ortp_get_scheduler (); packet_time = rtp_session_ts_to_t (session, user_ts - session->rtp.rcv_query_ts_offset) + session->rtp.rcv_time_offset; //g_message ("rtp_session_recvm_with_ts: packet_time=%i, time=%i",packet_time, sched->time); if (TIME_IS_STRICTLY_NEWER_THAN (packet_time, sched->time)) { if (session->flags & RTP_SESSION_BLOCKING_MODE) { g_mutex_lock (session->rtp. wait_for_packet_to_be_recv_mutex); g_cond_wait (session->rtp. wait_for_packet_to_be_recv_cond, session->rtp. wait_for_packet_to_be_recv_mutex); g_mutex_unlock (session->rtp. wait_for_packet_to_be_recv_mutex); } session_set_clr(&sched->r_sessions,session); } else session_set_set(&sched->r_sessions,session); /*to unblock _select() immediately */ }#endif return mp;}gint msg_to_buf (mblk_t * mp, char *buffer, gint len){ gint rlen = len; mblk_t *m, *mprev; gint mlen; m = mp->b_cont; mprev = mp; while (m != NULL) { mlen = m->b_wptr - m->b_rptr; if (mlen <= rlen) { mblk_t *consumed = m; memcpy (buffer, m->b_rptr, mlen); /* go to next mblk_t */ mprev->b_cont = m->b_cont; m = m->b_cont; consumed->b_cont = NULL; freeb (consumed); buffer += mlen; rlen -= mlen; } else { /*if mlen>rlen */ memcpy (buffer, m->b_rptr, rlen); m->b_rptr += rlen; return len; } } return len - rlen;}/** *rtp_session_recv_with_ts: *@session: a rtp session. *@buffer: a user supplied buffer to write the data. *@len: the length in bytes of the user supplied buffer. *@time: the timestamp wanted. *@have_more: the address of an integer to indicate if more data is availlable for the given timestamp. * * Tries to read the bytes of the incoming rtp stream related to timestamp @time. In case * where the user supplied buffer @buffer is not large enough to get all the data * related to timestamp @time, then *( @have_more) is set to 1 to indicate that the application * should recall the function with the same timestamp to get more data. * * When the rtp session is scheduled (see rtp_session_set_scheduling_mode() ), and the * blocking mode is on (see rtp_session_set_blocking_mode() ), then the calling thread * is suspended until the timestamp given as argument expires, whatever a received packet * fits the query or not. * * Important note: it is clear that the application cannot know the timestamp of the first * packet of the incoming stream, because it can be random. The @time timestamp given to the * function is used relatively to first timestamp of the stream. In simple words, 0 is a good * value to start calling this function. * * This function internally calls rtp_session_recvm_with_ts() to get a rtp packet. The content * of this packet is then copied into the user supplied buffer in an intelligent manner: * the function takes care of the size of the supplied buffer and the timestamp given in * argument. Using this function it is possible to read continous audio data (e.g. pcma,pcmu...) * with for example a standart buffer of size of 160 with timestamp incrementing by 160 while the incoming * stream has a different packet size. * *Returns: if a packet was availlable with the corresponding timestamp supplied in argument * then the number of bytes written in the user supplied buffer is returned. If no packets * are availlable, either because the sender has not started to send the stream, or either * because silence packet are not transmitted, or either because the packet was lost during * network transport, then the function returns zero.**/gint rtp_session_recv_with_ts (RtpSession * session, gchar * buffer, gint len, guint32 time, gint * have_more){ mblk_t *mp, *mprev, *mnext; gint rlen = len; gint wlen, mlen; guint32 ts_int = 0; /*the length of the data returned in the user supplied buffer, in TIMESTAMP UNIT */ PayloadType *payload; *have_more = 0; mp = rtp_session_recvm_with_ts (session, time); payload =rtp_profile_get_payload (session->profile, session->payload_type); if (payload==NULL){ g_warning("rtp_session_recv_with_ts: unable to recv an unsupported payload."); } if (!(session->flags & RTP_SESSION_RECV_SYNC)) { //g_message("time=%i rcv_last_ret_ts=%i",time,session->rtp.rcv_last_ret_ts); if (RTP_TIMESTAMP_IS_STRICTLY_NEWER_THAN (time, session->rtp.rcv_last_ret_ts)) { /* the user has missed some data previously, so we are going to give him now. */ /* we must tell him to call the function once again with the same timestamp * by setting *have_more=1 */ *have_more = 1; } if (payload->type == PAYLOAD_AUDIO_CONTINUOUS) { ts_int = (guint32) (((double) len) / payload->bytes_per_sample); session->rtp.rcv_last_ret_ts += ts_int; //g_message("ts_int=%i",ts_int); } else ts_int = 0; } else return 0; /* try to fill the user buffer */ while (1) { if (mp != NULL) { mlen = msgdsize (mp->b_cont); wlen = msg_to_buf (mp, buffer, rlen); buffer += wlen; rlen -= wlen; g_message ("mlen=%i wlen=%i rlen=%i", mlen, wlen, rlen); /* do we fill all the buffer ? */ if (rlen > 0) { /* we did not fill all the buffer */ freemsg (mp); /* if we have continuous audio, try to get other packets to fill the buffer, * ie continue the loop */ //g_message("User buffer not filled entirely"); if (ts_int > 0) { time = session->rtp.rcv_last_ret_ts; g_message ("Need more: will ask for %i.", time); } else return len - rlen; } else if (mlen > wlen) { int unread = mlen - wlen + (mp->b_wptr - mp->b_rptr); /* not enough space in the user supplied buffer */ /* we re-enqueue the msg with its updated read pointers for next time */ g_message ("Re-enqueuing packet."); rtp_session_lock (session); rtp_putq (session->rtp.rq, mp); rtp_session_unlock (session); /* quite ugly: I change the stats ... */ ortp_global_stats.recv -= unread; session->stats.recv -= unread; return len; } else { /* the entire packet was written to the user buffer */ freemsg (mp); return len; } } else { /* fill with a zero pattern (silence) */ int i = 0, j = 0; if (payload->pattern_length != 0) { while (i < rlen) { buffer[i] = payload->zero_pattern[j]; i++; j++; if (j <= payload->pattern_length) j = 0; } } return len; } mp = rtp_session_recvm_with_ts (session, time); payload = rtp_profile_get_payload (session->profile, session->payload_type); } return -1;}#ifdef TARGET_IS_HPUXKERNELvoid rtp_session_set_timeout (RtpSession * session, guint milisec){ return;}#elsevoid rtp_session_set_timeout (RtpSession * session, guint milisec){ if (milisec == 0) { session->rtp.timeout = NULL; return; } session->rtp._timeout.tv_sec = milisec / 1000; session->rtp._timeout.tv_usec = (milisec % 1000) * 1000000; session->rtp.timeout = &session->rtp._timeout;}#endifvoid rtp_session_uninit (RtpSession * session){ /* first of all remove the session from the scheduler */#ifdef BUILD_SCHEDULER if (session->flags & RTP_SESSION_SCHEDULED) { if ((!(session->flags & RTP_SESSION_RECV_NOT_STARTED)) || (!(session->flags & RTP_SESSION_SEND_NOT_STARTED))) rtp_scheduler_remove_session (session->sched, session); }#endif /*flush all queues */ flushq (session->rtp.rq, FLUSHALL); flushq (session->rtp.wq, FLUSHALL);#ifndef TARGET_IS_HPUXKERNEL /* close sockets */ close (session->rtp.socket); close (session->rtcp.socket);#else if (session->dest_mproto!=NULL) freeb(session->dest_mproto);#endif#ifdef BUILD_SCHEDULER g_cond_free (session->rtp.wait_for_packet_to_be_sent_cond); g_mutex_free (session->rtp.wait_for_packet_to_be_sent_mutex); g_cond_free (session->rtp.wait_for_packet_to_be_recv_cond); g_mutex_free (session->rtp.wait_for_packet_to_be_recv_mutex);#endif g_mutex_free (session->lock); session->lock=NULL; if (session->current_tev!=NULL) freemsg(session->current_tev);}/** *rtp_session_reset: *@session: a rtp session. * * Reset the session: local and remote addresses are kept unchanged but the internal * queue for ordering and buffering packets is flushed, the session is ready to be * re-synchronised to another incoming stream. ***/void rtp_session_reset (RtpSession * session){ flushq (session->rtp.rq, FLUSHALL); flushq (session->rtp.wq, FLUSHALL); rtp_session_set_flag (session, RTP_SESSION_RECV_SYNC); rtp_session_set_flag (session, RTP_SESSION_SEND_SYNC); //session->ssrc=0; session->rtp.snd_time_offset = 0; session->rtp.snd_ts_offset = 0; session->rtp.snd_rand_offset = 0; session->rtp.snd_last_ts = 0; session->rtp.rcv_time_offset = 0; session->rtp.rcv_ts_offset = 0; session->rtp.rcv_query_ts_offset = 0; session->rtp.rcv_app_ts_offset = 0; session->rtp.rcv_diff_ts = 0; session->rtp.rcv_ts = 0; session->rtp.rcv_last_ts = 0; session->rtp.rcv_last_app_ts = 0; session->rtp.rcv_seq = 0; session->rtp.snd_seq = 0;}/** *rtp_session_destroy: *@session: a rtp session. * * Destroys a rtp session. ***/void rtp_session_destroy (RtpSession * session){ rtp_session_uninit (session); g_free (session);}/* function used by the scheduler only:*/guint32 rtp_session_ts_to_t (RtpSession * session, guint32 timestamp){ PayloadType *payload; g_return_val_if_fail (session->payload_type < 127, 0); payload = rtp_profile_get_payload (session->profile, session->payload_type); if (payload == NULL) { g_warning ("rtp_session_ts_to_t: use of unsupported payload type."); return 0; } /* the return value is in milisecond */ return (guint32) (1000.0 * ((double) timestamp / (double) payload->clock_rate));}#ifdef BUILD_SCHEDULER/* time is the number of miliseconds elapsed since the start of the scheduler */void rtp_session_process (RtpSession * session, guint32 time, RtpScheduler *sched){ queue_t *wq = session->rtp.wq; rtp_header_t *hdr; gint cond = 1; guint32 packet_time; gint packet_sent = 0; guint32 last_recv_time; rtp_session_lock (session); if (wq->q_first == NULL) cond = 0; /* send all packets that need to be sent */ while (cond) { //g_message("GRMGIMIM"); if (wq->q_first != NULL) { hdr = (rtp_header_t *) wq->q_first->b_rptr; packet_time = rtp_session_ts_to_t (session, hdr->timestamp - session->rtp. snd_ts_offset) + session->rtp.snd_time_offset; /*g_message("session->rtp.snd_time_offset= %i, time= %i, packet_time= %i", session->rtp.snd_time_offset, time, packet_time); g_message("seeing packet seq=%i ts=%i",hdr->seq_number,hdr->timestamp);*/ if (TIME_IS_NEWER_THAN (time, packet_time)) { mblk_t *mp; mp = getq (wq); rtp_send (session, mp); packet_sent++; } else cond = 0; } else { cond = 0; } } /* and then try to recv packets */ rtp_stack_recv (session); //g_message("after recv"); /*if we are in blocking mode or in _select(), we must wake up (or at least notify) * the application process, if its last * packet has been sent, if it can recv a new packet */ if (packet_sent > 0) { /* the session has finished to send: notify it for _select() */ session_set_set(&sched->w_sessions,session); if (session->flags & RTP_SESSION_BLOCKING_MODE) { g_mutex_lock (session->rtp. wait_for_packet_to_be_sent_mutex); g_cond_signal (session->rtp. wait_for_packet_to_be_sent_cond); g_mutex_unlock (session->rtp. wait_for_packet_to_be_sent_mutex); } } if (!(session->flags & RTP_SESSION_RECV_NOT_STARTED)) { //g_message("unblocking.."); /* and also wake up the application if the timestamp it asked has expired */ last_recv_time = rtp_session_ts_to_t (session, session->rtp.rcv_last_app_ts - session->rtp. rcv_query_ts_offset) + session->rtp.rcv_time_offset; //g_message("time=%i, last_recv_time=%i",time,last_recv_time); if TIME_IS_NEWER_THAN (time, last_recv_time) { /* notify it in the w_sessions mask */ session_set_set(&sched->r_sessions,session); if (session->flags & RTP_SESSION_BLOCKING_MODE) { //g_message("rtp_session_process: Unlocking."); g_mutex_lock (session->rtp. wait_for_packet_to_be_recv_mutex); g_cond_signal (session->rtp. wait_for_packet_to_be_recv_cond); g_mutex_unlock (session->rtp. wait_for_packet_to_be_recv_mutex); } } } rtp_session_unlock (session);}#endif/* packet api */void rtp_add_csrc(mblk_t *mp, guint32 csrc){ rtp_header_t *hdr=(rtp_header_t*)mp->b_rptr; hdr->csrc[hdr->cc]=csrc; hdr->cc++;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -