📄 vodfile.cpp
字号:
/* Version1.2 */#include <sys/stat.h>#include <sys/socket.h>#include <sys/time.h>#include <netinet/in.h>#include <sys/types.h>#include <arpa/inet.h>#include <unistd.h>#include <stdio.h>#include <string.h>#include <stdlib.h>#include <fcntl.h>#include <semaphore.h>#include <signal.h>#include <pthread.h>#include <error.h>#include <time.h>#include <unistd.h>#include <sys/mman.h>#include "sched.h"#include "my_time.h"#include "mysock.h"#include "vodtypes.h"#define TWO_MAXPACKETLENGTH 2*MAXPACKETLEN#define MAX_PREF_LENGTH 1024*1024*1//#define MAX_PREF_LENGTH 1024*1024*4//#define MAX_PREF_LENGTH 150*1024#define MAX_PEAK_RATE 5242880.0 //5*1024*1024#define ONE_SHOT_TIME 10000 //10ms#define ERR_READ_ZERO_DATA 0#define ERR_READ_LESS_DATA -1#define ERR_READ_MORE_DATA -2int errno=0;// int debug_level=COMMON_LOG | FATAL_ERROR | COMMON_ERROR;int debug_level=0;char fname_log[MAX_FILE_NAME_LEN]="vodfile.log";int nooutput=0;// add by hongfei for testing the error of processdyingint notimeofday=0;int main_broadcast_err=ERR_NONE;int pref_length=0;char *pref_header=NULL;char *pref_buffer=NULL;struct timeval firstTime, secondTime;double outputPTS=0.0;pthread_t mainpid=0, main_broadcastpid=0, sendpid=0;BUFFER_POOL *pshared=NULL; char channel_ip[MAX_IP_NAME_LEN]="225.1.2.18", lip[MAX_IP_NAME_LEN]="159.226.5.118";int channel_port=8018, lport=2901;int bufTail=0 , bufHead=0;double nextPTS=0.0;double one_token = 0.0;double token=0.0;double one_peak_token=0.0;double peak_token=0.0;int global_type=-1; BYTE4 StreamFileType;int bChannelStart=FALSE;mySock t_sock;//FILE *connfd=NULL;//u_int32_t totalSize;int connfd =0;u_int64_t totalSize;// file listprogItem progSheet[7][MAX_ITEM_ONE_DAY+1];progItem currentprog;char fname_config[MAX_FILE_NAME_LEN]="progSheet.txt";static inline int MatchHeader(unsigned char const code[4]);static void *sendThread(void *);static void *main_broadcast(void *);u_int64_t GetFilesize(int mpeg_fd);u_int32_t getTotalSize(const char *filename);u_int32_t getTotalSize(FILE *mpeg_fd);static void token_proc(int signo);static void main_Continue(int signo);static void main_Quit(int signo);static void main_broadcast_Quit(int signo);static void child_Quit(int signo);static void mpeg1_token_proc(int signo);double getMpeg1Band(FILE *fpRead);double getTsBand(FILE *fp);void getProgSheet(char * filename);int isanotherday(char * str);int seekprogtobroad(struct tm * time, int &i);static int getEmptySpace();static int getDataSpace();static int putDataSpace();static int putEmptySpace();//size_t ssread(void *ptr, size_t size, size_t length, FILE *stream);//size_t ssread(void *ptr, size_t size, size_t length, int fd);size_t ssread(void *ptr, size_t length, int fd);static int createTimer();static int destroyTimer();//begin added by xjBOOL HasPCR(u8 *bData);BOOL IsDiscontinuity(u8 *bData);s64 GetPCRTime(u8 *bData);s64 GetDate();//end added by xj static void main_Continue(int signo){ dbg_print1(COMMON_LOG, "main Continue signal=%d\n", signo); return;}static void main_Quit(int signo){ // printf("main Quit\n"); dbg_print1(COMMON_LOG,"Enter main_Quit, %d\n", signo); if ( main_broadcastpid ) { pthread_kill(main_broadcastpid, MAIN_BROADCAST_SIGNAL); dbg_print0(COMMON_LOG,"kill main_broadcast thread\n"); } dbg_print0(COMMON_LOG,"Leave main_Quit"); if ( pshared ) { delete pshared; pshared=NULL; } if ( pref_buffer ) { delete pref_buffer; pref_buffer=NULL; } exit(0);}static void main_broadcast_Quit(int signo){ struct itimerval timer_value; pthread_t child_pid; dbg_print2(COMMON_LOG,"Enter main_broadcast_Quit %d, %d\n", signo, main_broadcastpid); main_broadcastpid=0; // close timer event timer_value.it_interval.tv_sec = timer_value.it_value.tv_sec =0; timer_value.it_interval.tv_usec= timer_value.it_value.tv_usec=0; setitimer(ITIMER_REAL, &timer_value, NULL); signal(SIGALRM, SIG_DFL); // close sendThread t_sock.Close(); child_pid = sendpid; if ( child_pid ) { pthread_kill(child_pid, CHILD_QUIT_SIGNAL); pthread_join(child_pid,NULL); child_pid=0; } // close mainThread sem_destroy(&(pshared->nempty)); sem_destroy(&(pshared->nstored));// if ( connfd ) { fclose(connfd); connfd=0; } if ( connfd!=0 ) { close(connfd); connfd=0; } // pthread_kill(mainpid, MAIN_CONTINUE_SIGNAL); dbg_print0(COMMON_LOG,"Leave main_broadcast_Quit\n"); pthread_exit(NULL); return;}static void child_Quit(int signo){ dbg_print2(COMMON_LOG, "Enter Child_Quit %d, %d\n", signo, sendpid); if (global_type==MPEG1) destroyTimer(); sendpid = 0; dbg_print0(COMMON_LOG, "Leave Child_Quit\n"); pthread_exit(NULL);}static void debug_output(int signo){ int empty, full, ret; ret = sem_getvalue(&(pshared->nempty), &full); if ( ret!=0 ) { dbg_print1(FATAL_ERROR, "sem_getvalue(nempty) error %d\n", errno); } ret = sem_getvalue(&(pshared->nstored), &empty); if ( ret!=0 ) { dbg_print1(FATAL_ERROR, "sem_getvalue(nstored) error %d\n", errno); } dbg_print2(FATAL_ERROR, "nempty %d,nstored %d\n", empty, full); dbg_print2(FATAL_ERROR, "bufhead %d, buftail %d\n", empty, full); return;}int main(int argv , char **argc){ int index=0; char currentDir[MAX_FILE_NAME_LEN]="./"; char * tempstr=NULL; // get program's directory strcpy(currentDir,argc[0]); tempstr = rindex(currentDir,'/'); if ( tempstr==NULL ) strcpy(currentDir, "./"); else *(tempstr+1)='\0'; strcpy(fname_config, currentDir); strcat(fname_config,"progSheet.txt"); strcpy(fname_log, currentDir); strcat(fname_log,"vodfile.log"); /* get remote and local ip+port, mpeg filename from argument */ for ( index=1; index<argv; index++) { if ( (tempstr=strstr(argc[index], "channel_ip="))!=NULL ) { tempstr+=11; strcpy(channel_ip, tempstr); } else if ( (tempstr=strstr(argc[index], "lip="))!=NULL ) { tempstr+=4; strcpy(lip, tempstr); } else if ( (tempstr=strstr(argc[index], "channel_port="))!=NULL ) { tempstr+=13; channel_port=atoi(tempstr); } else if ( (tempstr=strstr(argc[index], "lport="))!=NULL ) { tempstr+=6; lport = atoi(tempstr); } else if ( (tempstr=strstr(argc[index], "configfile="))!=NULL ) { tempstr+=11; strcpy(fname_config,tempstr); } else if ( (tempstr=strstr(argc[index], "notimeofday"))!=NULL ) { notimeofday=1; dbg_print0(COMMON_LOG, "no timeofday\n"); } else if ( (tempstr=strstr(argc[index], "nooutput"))!=NULL ) { nooutput=1; dbg_print0(COMMON_LOG, "no output \n"); } else if ( (tempstr=strstr(argc[index], "log="))!=NULL ) { tempstr+=4; strcpy(fname_log, tempstr); } else if ( (tempstr=strstr(argc[index], "debug="))!=NULL ) { tempstr+=6; if ( strcmp(tempstr, "fatal")==0 ) debug_level=FATAL_ERROR; else if (strcmp(tempstr, "common")==0 ) debug_level=FATAL_ERROR|COMMON_ERROR; else if (strcmp(tempstr, "log")==0 ) debug_level=FATAL_ERROR|COMMON_ERROR|COMMON_LOG; else if (strcmp(tempstr, "debug")==0 ) debug_level=FATAL_ERROR|COMMON_ERROR|COMMON_LOG|DEBUG_LOG; } } { char str[256]; sprintf(str, "%s channel_ip=%s,channel_port=%d,lip=%s,lport=%d,configfile=%s,log=%s\n", argc[0], channel_ip, channel_port,lip, lport, fname_config,fname_log); dbg_print1(COMMON_LOG, "%s", str); } signal(SIGPIPE, debug_output); signal(SIGKILL, main_Quit); signal(SIGQUIT, main_Quit); signal(SIGTERM, main_Quit); signal(SIGINT , main_Quit); signal(SIGSTOP, main_Quit); signal(MAIN_BROADCAST_SIGNAL, SIG_IGN); signal(MAIN_CONTINUE_SIGNAL, main_Continue); mainpid = pthread_self(); dbg_print1(COMMON_LOG, "mainpid=%d\n", mainpid); // get progSheet { int i,j; for(i=0;i<7;i++) for(j=0;j<MAX_ITEM_ONE_DAY+1;j++) progSheet[i][j].isvalid=0; getProgSheet(fname_config); } struct tm localtm, *ptm=&localtm, *tmpptm; time_t t; struct timeval tv; int ret; int current=-1; pshared = new BUFFER_POOL; pref_buffer = new char [MAX_PREF_LENGTH+TWO_MAXPACKETLENGTH]; if ( pshared==NULL ) return 0; while(1) { time(&t); tmpptm = localtime(&t); *ptm = *tmpptm; current += 1; dbg_print1(COMMON_LOG, "begine seekprogtobroad %d\n", current); if(seekprogtobroad(ptm, current)<0) { current = -1; // sleep until next day tv.tv_sec = ((24-ptm->tm_hour)*60-ptm->tm_min)*60; tv.tv_usec=0; dbg_print1(COMMON_LOG, "Today no program.,sleep to %ld\n", tv.tv_sec); ret = sleep(tv.tv_sec); //ret = select(0, NULL, NULL, NULL, &tv); if ( ret==0 ) { dbg_print0(COMMON_LOG, "wait today's starttime expired\n"); } else { dbg_print1(COMMON_LOG, "wait until next day, receive non-block signal, ret=%d\n", ret); } continue; } dbg_print2(COMMON_LOG, "Item %d, %s\n", current, currentprog.filename); int temp = ((currentprog.start_hour-ptm->tm_hour)*60+currentprog.start_minute-ptm->tm_min)*60; dbg_print2(COMMON_LOG, "find a item %s, wait %d\n", currentprog.filename, temp); if(temp>0) { tv.tv_sec = temp; tv.tv_usec=0; dbg_print1(COMMON_LOG, "wait starttime, sleep to %ld\n", tv.tv_sec); ret = sleep(tv.tv_sec); // ret = select(0, NULL, NULL, NULL, &tv); if ( ret==0 ) { dbg_print0(COMMON_LOG, "wait one item's starttime expired\n"); } else { dbg_print1(COMMON_LOG, "startime, receive non-block signal, ret=%d\n", ret); } } dbg_print1(COMMON_LOG, "sleep to starttime %d\n", temp); dbg_print0(COMMON_LOG, "start broadcast\n"); if( (ret=pthread_create(&main_broadcastpid,NULL,main_broadcast,NULL)) ) { dbg_print1(FATAL_ERROR,"main_broadcast create error, return %d\n",ret); exit(0); } dbg_print1(COMMON_LOG, "main_broadcastpid=%d\n", main_broadcastpid); time(&t); tmpptm = localtime(&t); *ptm = *tmpptm; temp = ((currentprog.stop_hour-ptm->tm_hour)*60+currentprog.stop_minute-ptm->tm_min)*60; if ( temp>0 ) { tv.tv_sec = temp; tv.tv_usec=0; dbg_print1(COMMON_LOG, "wait stoptime, sleep to %ld\n", tv.tv_sec); ret = sleep(tv.tv_sec); if ( ret==0 ) { dbg_print0(COMMON_LOG,"wait ont item's stoptime expired\n"); } else { dbg_print1(COMMON_LOG,"stoptime receive non-block signal, ret=%d\n", ret); } //dbg_print0(COMMON_LOG, "sleep 5 seconds\n"); //sleep(5); } dbg_print1(COMMON_LOG, "sleep to stoptime %d to stop broadcast\n", temp); pthread_kill(main_broadcastpid, MAIN_BROADCAST_SIGNAL); dbg_print0(COMMON_LOG, "pthread_kill main_broadcastpid\n"); pthread_join(main_broadcastpid, NULL); if ( main_broadcast_err==ERR_FILE ) { // delete this item int i=current; dbg_print1(COMMON_LOG, "wait main_broadcastpid, ERR_FILE, delete this file %s from progSheet\n", progSheet[ptm->tm_wday][i].filename); progSheet[ptm->tm_wday][i].isvalid = 0; while ( progSheet[ptm->tm_wday][i].isvalid ) { progSheet[ptm->tm_wday][i] = progSheet[ptm->tm_wday][i+1]; i++; } main_broadcast_err = ERR_NONE; } else if ( main_broadcast_err==ERR_SYSTEM ) { dbg_print0(COMMON_LOG, "wait main_broadcastpid, ERR_SYSTEM, so exit\n"); main_broadcast_err = ERR_NONE; exit(0); } else { dbg_print1(COMMON_LOG, "wait main_broadcastpid, err=%d\n", main_broadcast_err); main_broadcast_err = ERR_NONE; } } if ( pshared ) { delete pshared;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -