📄 mpeg2t_thread.cpp
字号:
/* * The contents of this file are subject to the Mozilla Public * License Version 1.1 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of * the License at http://www.mozilla.org/MPL/ * * Software distributed under the License is distributed on an "AS * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or * implied. See the License for the specific language governing * rights and limitations under the License. * * The Original Code is MPEG4IP. * * The Initial Developer of the Original Code is Cisco Systems Inc. * Portions created by Cisco Systems Inc. are * Copyright (C) Cisco Systems Inc. 2000, 2001. All Rights Reserved. * * Contributor(s): * Bill May wmay@cisco.com *//* * mpeg2t_thread.c */#include "mpeg2t_private.h"#include <rtp/rtp.h>#include <rtp/memory.h>#include <rtp/net_udp.h>#include "player_session.h"#include "player_media.h"#include "player_util.h"#include "our_config_file.h"#include "media_utils.h"#include "codec_plugin_private.h"#include "mpeg2t_bytestream.h"#include "player_sdp.h"#ifdef _WIN32DEFINE_MESSAGE_MACRO(mpeg2t_message, "mpeg2t")#else#define mpeg2t_message(loglevel, fmt...) message(loglevel, "mpeg2t", fmt)#endif/* * mpeg2t_decode_buffer - this is called from the mpeg2t thread. */static void mpeg2t_decode_buffer (mpeg2t_client_t *info, uint8_t *buffer, int blen){ mpeg2t_pid_t *pidptr; uint32_t buflen; uint32_t buflen_used; buflen = blen; do { pidptr = mpeg2t_process_buffer(info->decoder, buffer, buflen, &buflen_used); if (pidptr != NULL) { // have a return - process it switch (pidptr->pak_type) { case MPEG2T_PAS_PAK: break; case MPEG2T_PROG_MAP_PAK: if (info->pam_recvd_sem != NULL) { SDL_SemPost(info->pam_recvd_sem); } break; case MPEG2T_ES_PAK: mpeg2t_stream_t *sptr; CPlayerMedia *mptr; mpeg2t_es_t *es_pid; es_pid = (mpeg2t_es_t *)pidptr; sptr = (mpeg2t_stream_t *)mpeg2t_es_get_userdata(es_pid); if (sptr != NULL) { if (sptr->m_buffering == 0) { if (es_pid->save_frames != 0) { mptr = sptr->m_mptr; mptr->bytestream_primed(); } } else { // we're buffering. if (sptr->m_have_info == 0) { if (es_pid->info_loaded == 0) { // just dump the frame - record the psts mpeg2t_frame_t *fptr; do { fptr = mpeg2t_get_es_list_head(es_pid); if (fptr != NULL) { if (fptr->have_ps_ts) { sptr->m_last_psts = fptr->ps_ts; sptr->m_frames_since_last_psts = 0; } else sptr->m_frames_since_last_psts++; free(fptr); } } while (fptr != NULL); } else { // just got the info in pid sptr->m_have_info = 1; mpeg2t_message(LOG_DEBUG, "%s Info is loading - starting buffering", sptr->m_is_video ? "video" : "audio"); if (es_pid->list->have_ps_ts == 0) { mpeg2t_message(LOG_ERR, "have psts is 0 when info loaded"); } } } else { int msec; if (sptr->m_is_video) { // use the video values for buffering double msec_in_list; msec_in_list = es_pid->frames_in_list; msec_in_list *= 1000.0; msec_in_list /= es_pid->frame_rate; msec = (int)msec_in_list; } else { // use the audio values for buffering msec = es_pid->frames_in_list; msec *= 1000; msec *= es_pid->sample_per_frame; msec /= es_pid->sample_freq; }#if 0 mpeg2t_message(LOG_DEBUG, "%s buffer %d", sptr->m_is_video ? "video" : "audio", msec);#endif if (msec >= config.get_config_value(CONFIG_RTP_BUFFER_TIME_MSEC)) { // yipee - done buffering.. sptr->m_buffering = 0; sptr->m_mptr->start_decoding(); mpeg2t_message(LOG_DEBUG, "%s done buffering %d", sptr->m_is_video ? "video" : "audio", msec); } } } } break; } } buffer += buflen_used; buflen -= buflen_used; } while (buflen >= 188); if (buflen > 0) { mpeg2t_message(LOG_ERR, "decode buffer size is not multiple of 188 - %d %d", blen, buflen); }}static void mpeg2t_rtp_callback (struct rtp *session, rtp_event *e){ mpeg2t_client_t *info; if (e->type == RX_RTP) { rtp_packet *rpak; rpak = (rtp_packet *)e->data; info = (mpeg2t_client_t *)rtp_get_userdata(session); info->rtp_seq = rpak->rtp_pak_seq + 1; mpeg2t_decode_buffer((mpeg2t_client_t *)rtp_get_userdata(session), rpak->rtp_data, rpak->rtp_data_len); xfree(rpak); }}/* * mpeg2t_thread_start_cmd() * Handle the start command - create the socket */static int mpeg2t_thread_start_cmd (mpeg2t_client_t *info){ int ret; mpeg2t_message(LOG_DEBUG, "Processing start command");#ifdef _WIN32 WORD wVersionRequested; WSADATA wsaData; wVersionRequested = MAKEWORD( 2, 0 ); ret = WSAStartup( wVersionRequested, &wsaData ); if ( ret != 0 ) { /* Tell the user that we couldn't find a usable */ /* WinSock DLL.*/ return (ret); }#else ret = 0;#endif if (info->useRTP == FALSE) { info->udp = udp_init(info->address, info->rx_port, info->tx_port, info->ttl); if (info->udp == NULL) return -1; info->data_socket = udp_fd(info->udp); } else { info->rtp_session = rtp_init(info->address, info->rx_port, info->tx_port, info->ttl, info->rtcp_bw, mpeg2t_rtp_callback, (uint8_t *)info); if (info->rtp_session == NULL) { return -1; } rtp_set_option(info->rtp_session, RTP_OPT_WEAK_VALIDATION, FALSE); rtp_set_option(info->rtp_session, RTP_OPT_PROMISC, TRUE); info->data_socket = udp_fd(get_rtp_data_socket(info->rtp_session)); info->rtcp_socket = udp_fd(get_rtp_rtcp_socket(info->rtp_session)); } return (ret);}/* * mpeg2t_thread() - mpeg2t thread handler - receives and * processes all data */int mpeg2t_thread (void *data){ mpeg2t_client_t *info = (mpeg2t_client_t *)data; int ret; int continue_thread; uint8_t buffer[RTP_MAX_PACKET_LEN]; int buflen; continue_thread = 0; //mpeg2t_message(LOG_DEBUG, "thread started"); mpeg2t_thread_init_thread_info(info); while (continue_thread == 0) { //mpeg2t_message(LOG_DEBUG, "thread waiting"); ret = mpeg2t_thread_wait_for_event(info); if (ret <= 0) { if (ret < 0) { //mpeg2t_message(LOG_ERR, "MPEG2T loop error %d errno %d", ret, errno); } else { } continue; } /* * See if the communications socket for IPC has any data */ //mpeg2t_message(LOG_DEBUG, "Thread checking control"); ret = mpeg2t_thread_has_control_message(info); if (ret) { mpeg2t_msg_type_t msg_type; int read; /* * Yes - read the message type. */ read = mpeg2t_thread_get_control_message(info, &msg_type); if (read == sizeof(msg_type)) { // received message //mpeg2t_message(LOG_DEBUG, "Comm socket msg %d", msg_type); switch (msg_type) { case MPEG2T_MSG_QUIT: continue_thread = 1; break; case MPEG2T_MSG_START: ret = mpeg2t_thread_start_cmd(info); mpeg2t_thread_ipc_respond(info, ret); break; default: mpeg2t_message(LOG_ERR, "Unknown message %d received", msg_type); break; } } } if (info->useRTP) { if (mpeg2t_thread_has_receive_data(info)) { rtp_recv_data(info->rtp_session, 0); } if (mpeg2t_thread_has_rtcp_data(info)) { buflen = udp_recv(get_rtp_rtcp_socket(info->rtp_session), buffer, RTP_MAX_PACKET_LEN); rtp_process_ctrl(info->rtp_session, buffer, buflen); } rtp_send_ctrl(info->rtp_session, 0, NULL); rtp_update(info->rtp_session); } else { if (mpeg2t_thread_has_receive_data(info)) { if (info->udp != NULL) { //mpeg2t_message(LOG_DEBUG, "receiving udp data"); buflen = udp_recv(info->udp, buffer, RTP_MAX_PACKET_LEN); mpeg2t_decode_buffer(info, buffer, buflen); } } } } // end while continue_thread SDL_Delay(10); /* * Okay - we've gotten a quit - we're done */ if (info->useRTP) { rtp_send_bye(info->rtp_session); rtp_done(info->rtp_session); info->rtp_session = NULL; } else { if (info->udp != NULL) { udp_exit(info->udp); info->udp = NULL; } } mpeg2t_close_thread(info);#ifdef _WIN32 WSACleanup();#endif return 0;}/* * mpeg2t_create_client_for_rtp_tcp * create threaded mpeg2t session */mpeg2t_client_t *mpeg2t_create_client (const char *address, in_port_t rx_port, in_port_t tx_port, int use_rtp, double rtcp_bw, int ttl, char *errmsg, uint32_t errmsg_len){ mpeg2t_client_t *info; int ret; mpeg2t_msg_type_t msg; mpeg2t_msg_resp_t resp;#if 0 if (func == NULL) { mpeg2t_message(LOG_CRIT, "Callback is NULL"); *err = EINVAL; return NULL; }#endif info = MALLOC_STRUCTURE(mpeg2t_client_t); if (info == NULL) return (NULL); memset(info, 0, sizeof(mpeg2t_client_t)); info->address = strdup(address); info->rx_port = rx_port; info->tx_port = tx_port; info->useRTP = use_rtp; info->rtcp_bw = rtcp_bw; info->ttl = ttl; info->recv_timeout = 100; info->decoder = create_mpeg2_transport(); if (info->decoder == NULL) { mpeg2t_delete_client(info); return (NULL); } info->pam_recvd_sem = SDL_CreateSemaphore(0); info->msg_mutex = SDL_CreateMutex(); if (mpeg2t_create_thread(info) != 0) { mpeg2t_delete_client(info); return (NULL); } SDL_Delay(100); msg = MPEG2T_MSG_START; ret = mpeg2t_thread_ipc_send_wait(info, (unsigned char *)&msg, sizeof(msg), &resp); if (ret < 0 || resp < 0) { mpeg2t_delete_client(info); snprintf(errmsg, errmsg_len, "Couldn't create client - error %d", resp); return NULL; } int max = config.get_config_value(CONFIG_MPEG2T_PAM_WAIT_SECS); do { ret = SDL_SemWaitTimeout(info->pam_recvd_sem, 1000); if (ret == SDL_MUTEX_TIMEDOUT) { max--; mpeg2t_message(LOG_DEBUG, "timeout - still left %d", max); } } while (ret == SDL_MUTEX_TIMEDOUT && max >= 0); if (ret == SDL_MUTEX_TIMEDOUT) { if (info->decoder->program_maps_recvd != 0) { mpeg2t_message(LOG_INFO, "Program count received %d doesn't match program count %d", info->decoder->program_maps_recvd, info->decoder->program_count); } else { snprintf(errmsg, errmsg_len, "Did not receive Transport Stream Program Map in %d seconds", config.get_config_value(CONFIG_MPEG2T_PAM_WAIT_SECS)); mpeg2t_delete_client(info); return NULL; } } return (info);}void mpeg2t_delete_client (mpeg2t_client_t *info){ mpeg2t_stream_t *p; mpeg2t_thread_close(info); CHECK_AND_FREE(info->address); delete_mpeg2t_transport(info->decoder); info->decoder = NULL; SDL_DestroyMutex(info->msg_mutex); SDL_DestroySemaphore(info->pam_recvd_sem); p = info->stream; while (p != NULL) { info->stream = p->next_stream; free(p); p = info->stream; } free(info);}static void close_mpeg2t_client (void *data){ mpeg2t_delete_client((mpeg2t_client_t *)data);}static int mpeg2t_create_video(mpeg2t_client_t *info, CPlayerSession *psptr, video_query_t *vq, int video_offset, char *errmsg, uint32_t errlen, int &sdesc){ int ix; CPlayerMedia *mptr; codec_plugin_t *plugin; for (ix = 0; ix < video_offset; ix++) { if (vq[ix].enabled != 0) { mptr = new CPlayerMedia(psptr); if (mptr == NULL) { return (-1); } video_info_t *vinfo; vinfo = MALLOC_STRUCTURE(video_info_t); vinfo->height = vq[ix].h; vinfo->width = vq[ix].w; if (vinfo->height == 480 && vinfo->width == 352) { psptr->double_screen_width(); } plugin = check_for_video_codec("MPEG2 TRANSPORT", NULL, vq[ix].type, vq[ix].profile,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -