⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpeg2t_thread.cpp

📁 网络MPEG4IP流媒体开发源代码
💻 CPP
📖 第 1 页 / 共 2 页
字号:
/* * The contents of this file are subject to the Mozilla Public * License Version 1.1 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of * the License at http://www.mozilla.org/MPL/ *  * Software distributed under the License is distributed on an "AS * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or * implied. See the License for the specific language governing * rights and limitations under the License. *  * The Original Code is MPEG4IP. *  * The Initial Developer of the Original Code is Cisco Systems Inc. * Portions created by Cisco Systems Inc. are * Copyright (C) Cisco Systems Inc. 2000, 2001.  All Rights Reserved. *  * Contributor(s):  *              Bill May        wmay@cisco.com *//* * mpeg2t_thread.c */#include "mpeg2t_private.h"#include <rtp/rtp.h>#include <rtp/memory.h>#include <rtp/net_udp.h>#include "player_session.h"#include "player_media.h"#include "player_util.h"#include "our_config_file.h"#include "media_utils.h"#include "codec_plugin_private.h"#include "mpeg2t_bytestream.h"#include "player_sdp.h"#ifdef _WIN32DEFINE_MESSAGE_MACRO(mpeg2t_message, "mpeg2t")#else#define mpeg2t_message(loglevel, fmt...) message(loglevel, "mpeg2t", fmt)#endif/* * mpeg2t_decode_buffer - this is called from the mpeg2t thread. */static void mpeg2t_decode_buffer (mpeg2t_client_t *info, 				  uint8_t *buffer, 				  int blen){  mpeg2t_pid_t *pidptr;  uint32_t buflen;  uint32_t buflen_used;  buflen = blen;  do {    pidptr = mpeg2t_process_buffer(info->decoder,				   buffer, 				   buflen,				   &buflen_used);    if (pidptr != NULL) {      // have a return - process it      switch (pidptr->pak_type) {      case MPEG2T_PAS_PAK:	break;      case MPEG2T_PROG_MAP_PAK:	if (info->pam_recvd_sem != NULL) {	  SDL_SemPost(info->pam_recvd_sem);	}	break;      case MPEG2T_ES_PAK:	mpeg2t_stream_t *sptr;	CPlayerMedia *mptr;	mpeg2t_es_t *es_pid;	es_pid = (mpeg2t_es_t *)pidptr;	sptr = (mpeg2t_stream_t *)mpeg2t_es_get_userdata(es_pid);	if (sptr != NULL) {	  if (sptr->m_buffering == 0) {	    if (es_pid->save_frames != 0) {	      mptr = sptr->m_mptr;	      mptr->bytestream_primed();	    }	  } else {	    // we're buffering.	    if (sptr->m_have_info == 0) {	      if (es_pid->info_loaded == 0) {		// just dump the frame - record the psts		mpeg2t_frame_t *fptr;		do {		  fptr = mpeg2t_get_es_list_head(es_pid);		  if (fptr != NULL) {		    if (fptr->have_ps_ts) {		      sptr->m_last_psts = fptr->ps_ts;		      sptr->m_frames_since_last_psts = 0;		    } else 		      sptr->m_frames_since_last_psts++;		    free(fptr);		  }		} while (fptr != NULL);	      } else {		// just got the info in pid		sptr->m_have_info = 1;		mpeg2t_message(LOG_DEBUG, "%s Info is loading - starting buffering", 			       sptr->m_is_video ? "video" : "audio");		if (es_pid->list->have_ps_ts == 0) {		  mpeg2t_message(LOG_ERR, "have psts is 0 when info loaded");		}	      }	    } else {	      int msec;	      if (sptr->m_is_video) {		// use the video values for buffering		double msec_in_list;		msec_in_list = es_pid->frames_in_list;		msec_in_list *= 1000.0;		msec_in_list /= es_pid->frame_rate;		msec = (int)msec_in_list;	      } else {		// use the audio values for buffering		msec = es_pid->frames_in_list;		msec *= 1000;		msec *= es_pid->sample_per_frame;		msec /= es_pid->sample_freq;	      }#if 0	      mpeg2t_message(LOG_DEBUG, "%s buffer %d", 			     sptr->m_is_video ? "video" : "audio", msec);#endif	      if (msec >= config.get_config_value(CONFIG_RTP_BUFFER_TIME_MSEC)) {		// yipee - done buffering..		sptr->m_buffering = 0;		sptr->m_mptr->start_decoding();		mpeg2t_message(LOG_DEBUG, "%s done buffering %d",			       sptr->m_is_video ? "video" : "audio", msec);	      }	    }	  }	}	break;      }    }    buffer += buflen_used;    buflen -= buflen_used;  } while (buflen >= 188);  if (buflen > 0) {    mpeg2t_message(LOG_ERR, 		   "decode buffer size is not multiple of 188 - %d %d",		   blen, buflen);  }}static void mpeg2t_rtp_callback (struct rtp *session, rtp_event *e){  mpeg2t_client_t *info;  if (e->type == RX_RTP) {    rtp_packet *rpak;    rpak = (rtp_packet *)e->data;    info = (mpeg2t_client_t *)rtp_get_userdata(session);    info->rtp_seq = rpak->rtp_pak_seq + 1;    mpeg2t_decode_buffer((mpeg2t_client_t *)rtp_get_userdata(session), 			 rpak->rtp_data, 			 rpak->rtp_data_len);    xfree(rpak);  }}/* * mpeg2t_thread_start_cmd() * Handle the start command - create the socket */static int mpeg2t_thread_start_cmd (mpeg2t_client_t *info){  int ret;  mpeg2t_message(LOG_DEBUG, "Processing start command");#ifdef _WIN32  WORD wVersionRequested;  WSADATA wsaData;   wVersionRequested = MAKEWORD( 2, 0 );   ret = WSAStartup( wVersionRequested, &wsaData );  if ( ret != 0 ) {    /* Tell the user that we couldn't find a usable */    /* WinSock DLL.*/    return (ret);  }#else  ret = 0;#endif  if (info->useRTP == FALSE) {    info->udp = udp_init(info->address, info->rx_port,			 info->tx_port, info->ttl);    if (info->udp == NULL)       return -1;    info->data_socket = udp_fd(info->udp);  } else {    info->rtp_session = rtp_init(info->address,				 info->rx_port,				 info->tx_port,				 info->ttl,				 info->rtcp_bw,				 mpeg2t_rtp_callback,				 (uint8_t *)info);    if (info->rtp_session == NULL) {      return -1;    }    rtp_set_option(info->rtp_session, RTP_OPT_WEAK_VALIDATION, FALSE);    rtp_set_option(info->rtp_session, RTP_OPT_PROMISC, TRUE);    info->data_socket = udp_fd(get_rtp_data_socket(info->rtp_session));    info->rtcp_socket = udp_fd(get_rtp_rtcp_socket(info->rtp_session));  }  return (ret);}/* * mpeg2t_thread() - mpeg2t thread handler - receives and * processes all data */int mpeg2t_thread (void *data){  mpeg2t_client_t *info = (mpeg2t_client_t *)data;  int ret;  int continue_thread;  uint8_t buffer[RTP_MAX_PACKET_LEN];  int buflen;    continue_thread = 0;  //mpeg2t_message(LOG_DEBUG, "thread started");  mpeg2t_thread_init_thread_info(info);  while (continue_thread == 0) {    //mpeg2t_message(LOG_DEBUG, "thread waiting");    ret = mpeg2t_thread_wait_for_event(info);    if (ret <= 0) {      if (ret < 0) {	//mpeg2t_message(LOG_ERR, "MPEG2T loop error %d errno %d", ret, errno);      } else {      }      continue;    }    /*     * See if the communications socket for IPC has any data     */    //mpeg2t_message(LOG_DEBUG, "Thread checking control");    ret = mpeg2t_thread_has_control_message(info);        if (ret) {      mpeg2t_msg_type_t msg_type;      int read;      /*       * Yes - read the message type.       */      read = mpeg2t_thread_get_control_message(info, &msg_type);      if (read == sizeof(msg_type)) {	// received message	//mpeg2t_message(LOG_DEBUG, "Comm socket msg %d", msg_type);	switch (msg_type) {	case MPEG2T_MSG_QUIT:	  continue_thread = 1;	  break;	case MPEG2T_MSG_START:	  ret = mpeg2t_thread_start_cmd(info);	  mpeg2t_thread_ipc_respond(info, ret);	  break;	default:	  mpeg2t_message(LOG_ERR, "Unknown message %d received", msg_type);	  break;	}      }    }    if (info->useRTP) {      if (mpeg2t_thread_has_receive_data(info)) {	rtp_recv_data(info->rtp_session, 0);      }      if (mpeg2t_thread_has_rtcp_data(info)) {	buflen = udp_recv(get_rtp_rtcp_socket(info->rtp_session), 			  buffer, RTP_MAX_PACKET_LEN);	rtp_process_ctrl(info->rtp_session, buffer, buflen);      }      rtp_send_ctrl(info->rtp_session, 0, NULL);      rtp_update(info->rtp_session);    } else {      if (mpeg2t_thread_has_receive_data(info)) {	if (info->udp != NULL) {	  //mpeg2t_message(LOG_DEBUG, "receiving udp data");	  buflen = udp_recv(info->udp, buffer, RTP_MAX_PACKET_LEN);	  mpeg2t_decode_buffer(info, buffer, buflen);	}      }    }  } // end while continue_thread  SDL_Delay(10);  /*   * Okay - we've gotten a quit - we're done   */  if (info->useRTP) {    rtp_send_bye(info->rtp_session);    rtp_done(info->rtp_session);    info->rtp_session = NULL;  } else {    if (info->udp != NULL) {      udp_exit(info->udp);      info->udp = NULL;    }  }  mpeg2t_close_thread(info);#ifdef _WIN32  WSACleanup();#endif  return 0;}/* * mpeg2t_create_client_for_rtp_tcp * create threaded mpeg2t session */mpeg2t_client_t *mpeg2t_create_client (const char *address,				       in_port_t rx_port,				       in_port_t tx_port,				       int use_rtp,				       double rtcp_bw, 				       int ttl,				       char *errmsg,				       uint32_t errmsg_len){  mpeg2t_client_t *info;  int ret;  mpeg2t_msg_type_t msg;  mpeg2t_msg_resp_t resp;#if 0  if (func == NULL) {    mpeg2t_message(LOG_CRIT, "Callback is NULL");    *err = EINVAL;    return NULL;  }#endif  info = MALLOC_STRUCTURE(mpeg2t_client_t);  if (info == NULL) return (NULL);  memset(info, 0, sizeof(mpeg2t_client_t));  info->address = strdup(address);  info->rx_port = rx_port;  info->tx_port = tx_port;  info->useRTP = use_rtp;  info->rtcp_bw = rtcp_bw;  info->ttl = ttl;  info->recv_timeout = 100;  info->decoder = create_mpeg2_transport();  if (info->decoder == NULL) {    mpeg2t_delete_client(info);    return (NULL);  }  info->pam_recvd_sem = SDL_CreateSemaphore(0);  info->msg_mutex = SDL_CreateMutex();  if (mpeg2t_create_thread(info) != 0) {    mpeg2t_delete_client(info);    return (NULL);  }  SDL_Delay(100);  msg = MPEG2T_MSG_START;  ret = mpeg2t_thread_ipc_send_wait(info,				  (unsigned char *)&msg,				  sizeof(msg),				  &resp);  if (ret < 0 || resp < 0) {    mpeg2t_delete_client(info);    snprintf(errmsg, errmsg_len, "Couldn't create client - error %d", resp);    return NULL;  }    int max = config.get_config_value(CONFIG_MPEG2T_PAM_WAIT_SECS);  do {    ret = SDL_SemWaitTimeout(info->pam_recvd_sem, 1000);    if (ret == SDL_MUTEX_TIMEDOUT) {      max--;      mpeg2t_message(LOG_DEBUG, "timeout - still left %d", max);    }  } while (ret == SDL_MUTEX_TIMEDOUT && max >= 0);  if (ret == SDL_MUTEX_TIMEDOUT) {    if (info->decoder->program_maps_recvd != 0) {      mpeg2t_message(LOG_INFO, "Program count received %d doesn't match program count %d", 		     info->decoder->program_maps_recvd, 		     info->decoder->program_count);    } else {      snprintf(errmsg, errmsg_len, "Did not receive Transport Stream Program Map in %d seconds", config.get_config_value(CONFIG_MPEG2T_PAM_WAIT_SECS));      mpeg2t_delete_client(info);      return NULL;    }  }  return (info);}void mpeg2t_delete_client (mpeg2t_client_t *info){  mpeg2t_stream_t *p;  mpeg2t_thread_close(info);  CHECK_AND_FREE(info->address);  delete_mpeg2t_transport(info->decoder);  info->decoder = NULL;  SDL_DestroyMutex(info->msg_mutex);  SDL_DestroySemaphore(info->pam_recvd_sem);  p = info->stream;  while (p != NULL) {    info->stream = p->next_stream;    free(p);    p = info->stream;  }  free(info);}static void close_mpeg2t_client (void *data){  mpeg2t_delete_client((mpeg2t_client_t *)data);}static int mpeg2t_create_video(mpeg2t_client_t *info,			       CPlayerSession *psptr, 			       video_query_t *vq, 			       int video_offset,			       char *errmsg, 			       uint32_t errlen,			       int &sdesc){  int ix;  CPlayerMedia *mptr;  codec_plugin_t *plugin;  for (ix = 0; ix < video_offset; ix++) {    if (vq[ix].enabled != 0) {      mptr = new CPlayerMedia(psptr);      if (mptr == NULL) {	return (-1);      }      video_info_t *vinfo;      vinfo = MALLOC_STRUCTURE(video_info_t);      vinfo->height = vq[ix].h;      vinfo->width = vq[ix].w;      if (vinfo->height == 480 && vinfo->width == 352) {	psptr->double_screen_width();      }      plugin = check_for_video_codec("MPEG2 TRANSPORT",				     NULL,				     vq[ix].type,				     vq[ix].profile,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -