📄 mpeg2t_thread.cpp
字号:
/* * The contents of this file are subject to the Mozilla Public * License Version 1.1 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of * the License at http://www.mozilla.org/MPL/ * * Software distributed under the License is distributed on an "AS * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or * implied. See the License for the specific language governing * rights and limitations under the License. * * The Original Code is MPEG4IP. * * The Initial Developer of the Original Code is Cisco Systems Inc. * Portions created by Cisco Systems Inc. are * Copyright (C) Cisco Systems Inc. 2000, 2001. All Rights Reserved. * * Contributor(s): * Bill May wmay@cisco.com *//* * mpeg2t_thread.c */#include "mpeg2t_private.h"#include <rtp/rtp.h>#include <rtp/memory.h>#include <rtp/net_udp.h>#include "player_session.h"#include "player_media.h"#include "player_util.h"#include "our_config_file.h"#include "media_utils.h"#include "codec_plugin_private.h"#include "mpeg2t_bytestream.h"#include "player_sdp.h"#include "ip_port.h"#include "player_rtsp.h"#ifdef _WIN32DEFINE_MESSAGE_MACRO(mpeg2t_message, "mpeg2t")#else#define mpeg2t_message(loglevel, fmt...) message(loglevel, "mpeg2t", fmt)#endif/* * mpeg2t_decode_buffer - this is called from the mpeg2t thread. */static uint32_t mpeg2t_decode_buffer (mpeg2t_client_t *info, uint8_t *buffer, uint32_t buflen){ mpeg2t_pid_t *pidptr; uint32_t buflen_used; do { pidptr = mpeg2t_process_buffer(info->decoder, buffer, buflen, &buflen_used); if (pidptr != NULL) { // have a return - process it switch (pidptr->pak_type) { case MPEG2T_PAS_PAK: break; case MPEG2T_PROG_MAP_PAK: if (info->pam_recvd_sem != NULL) { SDL_SemPost(info->pam_recvd_sem); } break; case MPEG2T_ES_PAK: mpeg2t_stream_t *sptr; CPlayerMedia *mptr; mpeg2t_es_t *es_pid; es_pid = (mpeg2t_es_t *)pidptr; sptr = (mpeg2t_stream_t *)mpeg2t_get_userdata(&es_pid->pid); if (sptr != NULL) { if (sptr->m_buffering == 0) { // not buffering - if we're saving frames, indicate that to // the bytestream. if (es_pid->save_frames != 0) { mptr = sptr->m_mptr; mptr->bytestream_primed(); } } else { // we're buffering. if (sptr->m_have_info == 0) { if (es_pid->info_loaded == 0) { // just dump the frame - record the psts mpeg2t_frame_t *fptr; while ((fptr = mpeg2t_get_es_list_head(es_pid)) != NULL) { mpeg2t_free_frame(fptr); } } else { // just got the info in pid sptr->m_have_info = 1; mpeg2t_message(LOG_DEBUG, "%s Info is loading - starting buffering", sptr->m_is_video ? "video" : "audio"); if (es_pid->list->have_ps_ts == 0) { mpeg2t_message(LOG_ERR, "have psts is 0 when info loaded"); } } } else { uint32_t msec = 0; if (sptr->m_is_video) { if (es_pid->stream_type != MPEG2T_STREAM_H264) { // use the video values for buffering double msec_in_list; msec_in_list = es_pid->frames_in_list; msec_in_list *= 1000.0; msec_in_list /= es_pid->frame_rate; msec = (int)msec_in_list; } else { mpeg2t_frame_t *fptr = es_pid->list; while (fptr != NULL && (fptr->have_ps_ts == 0 && fptr->have_dts == 0)) fptr = fptr->next_frame; if (fptr != NULL) { mpeg2t_frame_t *eptr = fptr; while (eptr->next_frame != NULL) eptr = eptr->next_frame; if (fptr != eptr && (eptr->have_ps_ts != 0 || eptr->have_dts != 0)) { uint64_t start, end; start = fptr->have_dts ? fptr->dts : fptr->ps_ts; end = eptr->have_dts ? eptr->dts : eptr->ps_ts; if (end > start) { end -= start; end *= 1000; end /= 90000; msec = end; }#if 0 mpeg2t_message(LOG_DEBUG, "start "U64" end "U64" msec %u", start, end, msec);#endif } else {#if 0 mpeg2t_message(LOG_INFO, "no end dts or pts");#endif } } else {#if 0 mpeg2t_message(LOG_INFO, "no dts or pts");#endif } } } else { // use the audio values for buffering msec = es_pid->frames_in_list; msec *= 1000; msec *= es_pid->sample_per_frame; msec /= es_pid->sample_freq; }#if 0 mpeg2t_message(LOG_DEBUG, "%s buffer %d", sptr->m_is_video ? "video" : "audio", msec);#endif if (msec >= config.get_config_value(CONFIG_RTP_BUFFER_TIME_MSEC)) { // yipee - done buffering.. sptr->m_buffering = 0; sptr->m_mptr->start_decoding(); mpeg2t_message(LOG_DEBUG, "%s done buffering %d", sptr->m_is_video ? "video" : "audio", msec); } } } } break; } } buffer += buflen_used; buflen -= buflen_used; } while (buflen >= 188); if (buflen > 0) { //mpeg2t_message(LOG_DEBUG, "left %d at end", buflen); } return (buflen);}static void mpeg2t_rtp_callback (struct rtp *session, rtp_event *e){ mpeg2t_client_t *info; if (e->type == RX_RTP) { rtp_packet *rpak; rpak = (rtp_packet *)e->data; info = (mpeg2t_client_t *)rtp_get_recv_userdata(session); info->rtp_seq = rpak->rtp_pak_seq + 1; // need to handle "left over" here... mpeg2t_decode_buffer(info, rpak->rtp_data, rpak->rtp_data_len); xfree(rpak); }}/* * mpeg2t_thread_start_cmd() * Handle the start command - create the socket */static int mpeg2t_thread_start_cmd (mpeg2t_client_t *info){ int ret; mpeg2t_message(LOG_DEBUG, "Processing start command");#ifdef _WIN32 WORD wVersionRequested; WSADATA wsaData; wVersionRequested = MAKEWORD( 2, 0 ); ret = WSAStartup( wVersionRequested, &wsaData ); if ( ret != 0 ) { /* Tell the user that we couldn't find a usable */ /* WinSock DLL.*/ return (ret); }#else ret = 0;#endif if (info->useRTP == FALSE) { if (config.get_config_string(CONFIG_MULTICAST_RX_IF) != NULL) { struct in_addr if_addr; // configure out the specified interface if (getIpAddressFromInterface(config.get_config_string(CONFIG_MULTICAST_RX_IF), &if_addr) >= 0) { player_debug_message("if address is %s %s", inet_ntoa(if_addr), info->address); info->udp = udp_init_if(info->address, inet_ntoa(if_addr), info->rx_port, info->tx_port, info->ttl); } } if (info->udp == NULL) { info->udp = udp_init(info->address, info->rx_port, info->tx_port, info->ttl); } if (info->udp == NULL) return -1; info->data_socket = udp_fd(info->udp); } else { info->rtp_session = NULL; rtp_stream_params_t rsp; rtp_default_params(&rsp); rsp.rtp_addr = info->address; rsp.rtp_rx_port = info->rx_port; rsp.rtp_tx_port = info->tx_port; rsp.rtp_ttl = info->ttl; rsp.rtcp_bandwidth = info->rtcp_bw; rsp.rtp_callback = mpeg2t_rtp_callback; rsp.recv_userdata = info; if (config.get_config_string(CONFIG_MULTICAST_RX_IF) != NULL) { struct in_addr if_addr; if (getIpAddressFromInterface(config.get_config_string(CONFIG_MULTICAST_RX_IF), &if_addr) >= 0) { rsp.physical_interface_addr = inet_ntoa(if_addr); } } info->rtp_session = rtp_init_stream(&rsp); if (info->rtp_session == NULL) { return -1; } rtp_set_option(info->rtp_session, RTP_OPT_WEAK_VALIDATION, FALSE); rtp_set_option(info->rtp_session, RTP_OPT_PROMISC, TRUE); info->data_socket = udp_fd(get_rtp_data_socket(info->rtp_session)); info->rtcp_socket = udp_fd(get_rtp_rtcp_socket(info->rtp_session)); } return (ret);}/* * mpeg2t_thread() - mpeg2t thread handler - receives and * processes all data */int mpeg2t_thread (void *data){ mpeg2t_client_t *info = (mpeg2t_client_t *)data; int ret; int continue_thread; uint8_t buffer[17000]; uint32_t buflen_left = 0; uint32_t buflen; int consec_timeout = 0; continue_thread = 0; //mpeg2t_message(LOG_DEBUG, "thread started"); mpeg2t_thread_init_thread_info(info); while (continue_thread == 0) { //mpeg2t_message(LOG_DEBUG, "thread waiting"); ret = mpeg2t_thread_wait_for_event(info); if (ret <= 0) { if (ret < 0) { //mpeg2t_message(LOG_ERR, "MPEG2T loop error %d errno %d", ret, errno); } else { consec_timeout++; if (info->m_have_rtsp && consec_timeout > 10) { // this could be better... Maybe detect the final time vs the // start time. mpeg2t_stream_t *sptr; sptr = info->stream; while (sptr != NULL) { sptr->m_have_eof = 1; sptr = sptr->next_stream; } } } continue; } consec_timeout = 0; /* * See if the communications socket for IPC has any data */ //mpeg2t_message(LOG_DEBUG, "Thread checking control"); ret = mpeg2t_thread_has_control_message(info); if (ret) { mpeg2t_msg_type_t msg_type; int read; /* * Yes - read the message type. */ read = mpeg2t_thread_get_control_message(info, &msg_type); if (read == sizeof(msg_type)) { // received message //mpeg2t_message(LOG_DEBUG, "Comm socket msg %d", msg_type); switch (msg_type) { case MPEG2T_MSG_QUIT: continue_thread = 1; break; case MPEG2T_MSG_START: ret = mpeg2t_thread_start_cmd(info); mpeg2t_thread_ipc_respond(info, ret); break; default: mpeg2t_message(LOG_ERR, "Unknown message %d received", msg_type); break; } } } if (info->useRTP) { if (mpeg2t_thread_has_receive_data(info)) { rtp_recv_data(info->rtp_session, 0); } if (mpeg2t_thread_has_rtcp_data(info)) { buflen = udp_recv(get_rtp_rtcp_socket(info->rtp_session), buffer, sizeof(buffer)); rtp_process_ctrl(info->rtp_session, buffer, buflen); } rtp_send_ctrl(info->rtp_session, 0, NULL); rtp_update(info->rtp_session); } else { if (mpeg2t_thread_has_receive_data(info)) { if (info->udp != NULL) { //mpeg2t_message(LOG_DEBUG, "receiving udp data"); // we may have leftover data in the buffer if some people send // non-transport stream packet aligned udp packets. buflen = udp_recv(info->udp, buffer + buflen_left, sizeof(buffer) - buflen_left); buflen += buflen_left; buflen_left = mpeg2t_decode_buffer(info, buffer, buflen); if (buflen_left > 0) { memmove(buffer, buffer + buflen - buflen_left, buflen_left); } } } } } // end while continue_thread SDL_Delay(10); /* * Okay - we've gotten a quit - we're done */ if (info->useRTP) { rtp_send_bye(info->rtp_session); rtp_done(info->rtp_session); info->rtp_session = NULL; } else { if (info->udp != NULL) { udp_exit(info->udp); info->udp = NULL; } } mpeg2t_close_thread(info);#ifdef _WIN32 WSACleanup();#endif return 0;}/* * mpeg2t_create_client * create threaded mpeg2t session
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -