📄 vodfile.cpp
字号:
u8 packet[TS_PACKET_LEN]; BOOL m_bFirstPCR; u64 m_uiByteRead; s64 m_iCurrentPCRTime; s64 m_iLastPCRTime; s64 m_iDeltaClock; double m_dSlope; unsigned int m_iHowMany; unsigned int numread; LIVE_PACKET *pLivePacket; dbg_print0(COMMON_LOG ,"Enter NATIVE_MPEG2_TS\n"); m_uiByteRead = 0; m_bFirstPCR = 1; // Set those values to 0 in order to send the packets that come before // the first one with a PCR as fast as possible (best behaviour) m_iLastPCRTime = 0; m_iDeltaClock = 0; m_dSlope = 0; m_iHowMany = 0; pLivePacket = (LIVE_PACKET*) buffer; pLivePacket->num = -1; while(TRUE) { numread = ssread(packet, TS_PACKET_LEN, connfd); if ( numread < TS_PACKET_LEN) { dbg_print0(COMMON_LOG ,"read source file error!maybe reach EOF\n"); break; } m_uiByteRead += TS_PACKET_LEN; m_iHowMany ++; if (packet[0] != 0x47) { dbg_print0(COMMON_LOG ,"bad sync byte!\n"); continue; } if(HasPCR(packet)) { if(m_bFirstPCR) { m_iCurrentPCRTime = GetPCRTime(packet); // Compute the delta between the clock of the PC and the one of the encoder m_iDeltaClock = GetDate() - m_iCurrentPCRTime; // Update the data for the next PCR m_uiByteRead = 0; m_iLastPCRTime = m_iCurrentPCRTime; m_bFirstPCR = 0; } else { m_iCurrentPCRTime = GetPCRTime(packet); if(IsDiscontinuity(packet)) { // Reajust the delta between the clock of the PC and the one of the encoder dbg_print0(COMMON_LOG ,"Adjusting timer discontinuity!\n"); m_iDeltaClock = GetDate() - m_iCurrentPCRTime; } else { // (Re)evaluate the slope if(m_uiByteRead > 0) { m_dSlope = ((double)m_iCurrentPCRTime - m_iLastPCRTime) / m_uiByteRead; } else dbg_print0(COMMON_LOG ,"m_uiByteRead must be greater than 0!\n"); } // Update the data for the next PCR m_uiByteRead = 0; m_iLastPCRTime = m_iCurrentPCRTime; } } if(m_iHowMany == MAX_TS_PACKET) { s64 iSendDate = m_iLastPCRTime + m_iDeltaClock + (s64)m_dSlope*m_uiByteRead; s64 iWait = iSendDate - GetDate(); // Wait only if the delay becomes > 0.5 ms, for the select will make we late // (it usually takes more than 1 ms) if(iWait > 500) { struct timespec tsDelay; tsDelay.tv_sec = iWait / 1000000; tsDelay.tv_nsec = 1000 * (iWait % 1000000); // Wait for the given delay nanosleep(&tsDelay, NULL); } //send out the TS Packets if ( !t_sock.SendMsg((char*)pLivePacket, MAX_TS_PACKET*TS_PACKET_LEN+sizeof(int)) ) { dbg_print0(COMMON_LOG, "send error\n"); goto broadcast_quit; } pLivePacket->num++; m_iHowMany = 0; } else { memcpy(pLivePacket->str + m_iHowMany*TS_PACKET_LEN, packet, TS_PACKET_LEN); m_iHowMany++; } } //send the remainder TS packets if ( !t_sock.SendMsg((char*)pLivePacket, m_iHowMany*TS_PACKET_LEN+sizeof(int)) ) { dbg_print0(COMMON_LOG, "send error\n"); goto broadcast_quit; } dbg_print0(COMMON_LOG ,"Leave NATIVE_MPEG2_TS\n"); } break; default: dbg_print1(COMMON_LOG, "unknown file type 0x%x\n", pFileHeader->type); main_broadcast_err = ERR_FILE; goto broadcast_quit; }broadcast_quit: timer_value.it_interval.tv_sec = timer_value.it_value.tv_sec =0; timer_value.it_interval.tv_usec= timer_value.it_value.tv_usec=0; setitimer(ITIMER_REAL, &timer_value, NULL); signal(SIGALRM, SIG_DFL); if ( connfd!=0 ) { close(connfd); connfd = 0; } t_sock.Close(); main_broadcastpid=0; dbg_print0(COMMON_LOG, "Leave main_broadcast\n"); pthread_kill(mainpid, MAIN_CONTINUE_SIGNAL); pthread_exit(NULL); return NULL;}/************************************ * buffer pool proc ************************************/static inline int getEmptySpace(){ sem_wait(&(pshared->nstored)); return bufTail;}//return a empty block static inline int getDataSpace(){ sem_wait(&(pshared->nempty)); return bufHead;}//return a available data block to send static inline int putDataSpace(){ bufTail = (bufTail+1)%MAXBUFFLEN; if ( sem_post(&(pshared->nempty))!=0 ) { dbg_print1(FATAL_ERROR, "putDataSpace sem_wait error %d\n", errno); } return 1; } static inline int putEmptySpace(){ bufHead = (bufHead+1)%MAXBUFFLEN; if ( sem_post(&(pshared->nstored))!=0 ) { dbg_print1(FATAL_ERROR, "putEmptySpace sem_wait error %d\n", errno); } return 1;}/************************************ * timer proc ************************************/static void token_proc(int signo){ if ( bChannelStart==FALSE ) return; token+=one_token; if ( token>=nextPTS && main_broadcastpid!=0 ) pthread_kill(main_broadcastpid, SIGCONT);}/************************************ * file list proc ************************************/void getProgSheet(char * filename){ FILE * fp; char alloc_string[128]; char *tempstr=alloc_string; int i,j=0; fp=fopen(fname_config,"r"); if(fp==NULL){ dbg_print1(FATAL_ERROR, "error: open progsheet file %s\n", fname_config); exit(1); } while(!feof(fp)){ fscanf(fp,"%s",tempstr); i=isanotherday(tempstr); if(i>0) break; } if(feof(fp)){ fclose(fp); dbg_print1(FATAL_ERROR, "error: no day in file %s.\n", fname_config); exit(1); } while(!feof(fp)){ tempstr = alloc_string; if ( fscanf(fp,"%s",tempstr)<=0 ) break; if(isanotherday(tempstr)<0){ if ( j>=MAX_ITEM_ONE_DAY ) continue; progSheet[i][j].isvalid=1; progSheet[i][j].start_hour=atoi(tempstr); tempstr=strstr(tempstr,":")+1; progSheet[i][j].start_minute=atoi(tempstr); fscanf(fp,"%s",tempstr); progSheet[i][j].stop_hour=atoi(tempstr); tempstr=strstr(tempstr,":")+1; progSheet[i][j].stop_minute=atoi(tempstr); fscanf(fp,"%s",progSheet[i][j++].filename); } else{ i=isanotherday(tempstr); j=0; }// end if()..else... }// end while fclose(fp);/* for ( int i=0; i<7; i++ ) { for ( int j=0; j<MAX_ITEM_ONE_DAY; j++ ) { if ( progSheet[i][j].isvalid!=1 ) continue; printf("%d,%d : from %d::%d to %d::%d, %s\n", i,j,progSheet[i][j].start_hour, progSheet[i][j].start_minute, progSheet[i][j].stop_hour, progSheet[i][j].stop_minute, progSheet[i][j].filename); } }*/ return; } int isanotherday(char * str){ if(!strcmp(str,"Sunday")) return 0; else if(!strcmp(str,"Monday")) return 1; else if(!strcmp(str,"Tuesday")) return 2; else if(!strcmp(str,"Wednesday")) return 3; else if(!strcmp(str,"Thursday")) return 4; else if(!strcmp(str,"Friday")) return 5; else if(!strcmp(str,"Saturday")) return 6; else return -1; }int seekprogtobroad(struct tm * time, int &i){ int current_pos = i; while(progSheet[time->tm_wday][i].isvalid){ if( (time->tm_hour*60+time->tm_min)<(progSheet[time->tm_wday][i].stop_hour*60+progSheet[time->tm_wday][i].stop_minute) ){ currentprog=progSheet[time->tm_wday][i]; return 1; } i++; } // find from current_pos to last nonValid i=0; while(i<current_pos && progSheet[time->tm_wday][i].isvalid){ if( (time->tm_hour*60+time->tm_min)<(progSheet[time->tm_wday][i].stop_hour*60+progSheet[time->tm_wday][i].stop_minute) ){ currentprog=progSheet[time->tm_wday][i]; return 1; } i++; } // find from start to current_pos return -1;} // get file sizeu_int32_t getTotalSize(FILE *mpeg_fd){ u_int32_t size; u_int32_t pos; if((pos = fseek(mpeg_fd, 0, SEEK_CUR)) == -1L) return 0; if((size = fseek(mpeg_fd, 0, SEEK_END)) == -1L) return(0); size = ftell(mpeg_fd); if((pos = fseek(mpeg_fd, pos, SEEK_SET)) == -1L) return(0); return(size);}// get file sizeu_int32_t getTotalSize(const char *filename){ struct stat filestat; if ( stat(filename, &filestat)!=0 ) return 0; return filestat.st_size;}u_int64_t GetFilesize(int mpeg_fd){ u_int64_t size,pos; size = lseek(mpeg_fd,0,SEEK_END); if(size==-1L) { dbg_print0(COMMON_LOG,"seek to file end failed!\n "); return 0; } dbg_print1(COMMON_LOG,"The file size is %lld!\n ", size); pos = lseek(mpeg_fd,0,SEEK_SET); if(pos==-1L) { dbg_print0(COMMON_LOG,"seek to file beginning failed!\n "); return 0; } return(size);}//size_t ssread(void *ptr, size_t size, size_t length, int fd)size_t ssread(void *ptr, size_t length, int fd){ if ( length > TWO_MAXPACKETLENGTH ) return 0; if ( length > pref_length ) { // need to read if ( pref_header!=pref_buffer ) { memmove(pref_buffer, pref_header, pref_length); pref_header = pref_buffer; } pref_length+=read(fd,pref_header+pref_length, MAX_PREF_LENGTH); } length = (length>pref_length) ? pref_length : length; memcpy(ptr, pref_header, length); pref_length -= length; pref_header += length; return length;}//wyj: if occur error, report error to daemon//wyj: the info of starting and stoping channel must be written to syslog static int createTimer(){ struct itimerval timer_value; timer_value.it_interval.tv_sec = timer_value.it_value.tv_sec =0; timer_value.it_interval.tv_usec= timer_value.it_value.tv_usec=ONE_SHOT_TIME; if ( setitimer(ITIMER_REAL, &timer_value, NULL)!=0 ) { dbg_print1(FATAL_ERROR, "setitimer return error %d\n", errno); return FALSE; } signal(SIGALRM, mpeg1_token_proc); return TRUE;}static int destroyTimer(){ struct itimerval timer_value; timer_value.it_interval.tv_sec = timer_value.it_value.tv_sec =0; timer_value.it_interval.tv_usec= timer_value.it_value.tv_usec=0; if ( setitimer(ITIMER_REAL, &timer_value, NULL)!=0 ) { dbg_print1(FATAL_ERROR, "setitimer return error %d\n", errno); return FALSE; } signal(SIGALRM, SIG_IGN); return TRUE;}static void mpeg1_token_proc(int signo){ if ( bChannelStart==FALSE ) return; peak_token += one_peak_token; if ( peak_token>=MAXPACKETLEN ) { pthread_kill(sendpid, SIGCONT); }}BOOL HasPCR(u8 *bData){ // Return true only if adaptation_field_control is set, // adaptation_field_length is not 0 and PCR_flag is set return ((bData[3] & 0x20) && bData[4] && (bData[5] & 0x10));}BOOL IsDiscontinuity(u8 *bData) { // Return true only if adaptation_field_control is set, // adaptation_field_length is not 0 and discontinuity_flag is set return ((bData[3] & 0x20) && bData[4] && (bData[5] & 0x80));}s64 GetPCRTime(u8 *bData){ s64 iPCRTime; iPCRTime = ( (s64)bData[6] << 25 ) | ( (s64)bData[7] << 17 ) | ( (s64)bData[8] << 9 ) | ( (s64)bData[9] << 1 ) | ( (s64)bData[10] >> 7 ); iPCRTime = (iPCRTime * 300) / 27; //in microseconds return iPCRTime;}inline s64 GetDate(){ struct timeval tv; gettimeofday(&tv, NULL); return( (s64)tv.tv_sec * 1000000 + (s64)tv.tv_usec );}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -