📄 save_daemon.cpp
字号:
pCurrItem->died = TRUE; dbg_print1(FATAL_ERROR, "%s can't open for write\n", pCurrItem->filename); return NULL; }// closed by sendThread to record true data sem_init(&(pCurrItem->shared.nempty),0,0); sem_init(&(pCurrItem->shared.nstored),0,MAXBUFFLEN); if(pthread_create(&(pCurrItem->receivepid),NULL,receiveThread,p)){ dbg_print0(FATAL_ERROR, "receiveThread creation was not successful\n"); goto main_saveThread_quit; } dbg_print1(COMMON_LOG, "create receiveThread, pid %d\n", pCurrItem->receivepid); if(pthread_create(&(pCurrItem->sendpid),NULL,sendThread,p)){ dbg_print0(FATAL_ERROR, "sendThread creation was not successful\n"); pCurrItem->close_receive=TRUE; putEmptySpace(&(pCurrItem->shared)); pthread_join(pCurrItem->receivepid,NULL); goto main_saveThread_quit; } dbg_print1(COMMON_LOG, "create sendThread, pid %d\n",pCurrItem->sendpid); printf("main_saveThread wait receiveThread\n"); pthread_join(pCurrItem->receivepid,NULL); printf("main_saveThread wait sendThread\n"); pthread_join(pCurrItem->sendpid,NULL); main_saveThread_quit: printf("main_saveThread destroy nempty and nstored\n"); sem_destroy(&(pCurrItem->shared.nempty)); sem_destroy(&(pCurrItem->shared.nstored)); printf("main_saveThread close socket\n"); pCurrItem->t_sock.Close(); pCurrItem->receivepid=0; pCurrItem->sendpid=0; pCurrItem->died = TRUE; printf("main_saveThread quit\n"); dbg_print0(COMMON_LOG, "main_saveThread quit"); return NULL;}//modified by xj on Nov.6,2003static void *receiveThread(void *p){ PTHREAD_ITEM *pCurrItem; int index, read_size; struct timeval T0, T1; BYTE8 totalRecChar=0l; int packetNum=0; STREAMPACKETHEADER *pStreamPacket=NULL; LIVE_PACKET *packet=NULL; struct timeval machineTime; char str[256]; signal(SIGALRM, receiveQuit); pCurrItem = (PTHREAD_ITEM*)p; printf("Enter receiveThread\n"); index=getEmptySpace(&(pCurrItem->shared)); //pStreamPacket = (STREAMPACKETHEADER *)(pCurrItem->shared.buffer[index]); //packet = &(pStreamPacket->packet); printf("Ready for RecvMsg\n"); gettimeofday(&T0,NULL); //read_size= pCurrItem->t_sock.RecvMsg((char*)packet,MAXPACKETLEN); read_size= pCurrItem->t_sock.RecvMsg(pCurrItem->shared.buffer[index],MAXPACKETLEN); if ( read_size > 0 ) { //gettimeofday(&machineTime,NULL); //T0=machineTime; //pStreamPacket->start_timestamp = machineTime.tv_sec*1000.0 + machineTime.tv_usec/1000; //pStreamPacket->length = read_size; //pCurrItem->shared.bufLength[index] = read_size+STREAMPACKETHEADER_PREFIX; pCurrItem->shared.bufLength[index] = read_size; totalRecChar+= read_size; //packetNum = packet->num; putDataSpace(&(pCurrItem->shared)); } else{ releaseEmptySpace(&(pCurrItem->shared)); printf("receiveThread() : release the last empty space\n"); pCurrItem->close_send = TRUE; // let sendThread to quit putDataSpace(&(pCurrItem->shared)); printf("receiveThread() : put a last data space\n"); return NULL; } printf("Ready for While RecvMsg\n"); while (pCurrItem->close_receive==FALSE) { index=getEmptySpace(&(pCurrItem->shared)); //packetNum++; //pStreamPacket = (STREAMPACKETHEADER *)(pCurrItem->shared.buffer[index]); //packet = &(pStreamPacket->packet); //read_size=pCurrItem->t_sock.RecvMsg((char*)packet, MAXPACKETLEN); read_size=pCurrItem->t_sock.RecvMsg(pCurrItem->shared.buffer[index], MAXPACKETLEN); if ( read_size > 0 ) { // record the timestamp //gettimeofday(&machineTime,NULL); //pStreamPacket->start_timestamp = machineTime.tv_sec*1000.0 + machineTime.tv_usec/1000; // record the length //pStreamPacket->length = read_size; // update packetNum // if(packet->num != packetNum){// dbg_print3(DEBUG_LOG, "expect %d,receive %d,lost %d\n" , packetNum,packet->num,packet->num-packetNum);// packetNum = packet->num;// } // pCurrItem->shared.bufLength[index] = read_size+STREAMPACKETHEADER_PREFIX; pCurrItem->shared.bufLength[index] = read_size; putDataSpace(&(pCurrItem->shared)); totalRecChar+=read_size; } else {//socket error releaseEmptySpace(&(pCurrItem->shared)); printf("receiveThread() : release the last empty space and break from while\n"); break; } } gettimeofday(&T1,NULL); if ( T1.tv_sec==T0.tv_sec && T1.tv_sec==T0.tv_sec ) pCurrItem->bandwidth = 0; else pCurrItem->bandwidth = totalRecChar*8l /((T1.tv_sec-T0.tv_sec)+(T1.tv_usec-T0.tv_usec)/1000000l); sprintf(str,"totalRecord:%ld bytes,startTime:%ld.%ld,stopTime:%d.%d,bandWidth:%ld\n", totalRecChar, T1.tv_sec, T1.tv_usec, T0.tv_sec, T0.tv_usec, pCurrItem->bandwidth); dbg_print1(COMMON_LOG, "%s\n", str); // let sendThread to quit pCurrItem->close_send = TRUE; putDataSpace(&(pCurrItem->shared)); printf("receiveThread() : put a last data space and return\n"); return NULL;}//modified by xj on Nov.7,2003static void *sendThread(void *p){ PTHREAD_ITEM *pCurrItem; TSFILEHEADER fileHdr; BYTE8 totalChars=0l; int ret; STREAMPACKETHEADER *pStreamPacket=NULL; LIVE_PACKET *packet=NULL; char str[256]; pCurrItem = (PTHREAD_ITEM*)p; // fileHdr.type = TYPE_WSFILEHEADER; // fileHdr.reserved = 0;// fileHdr.length = 0l;// fileHdr.band = 0l; // ret = fwrite(&fileHdr, sizeof(char), sizeof(TSFILEHEADER)/sizeof(char), pCurrItem->fp); // if ( ret<=0 ) {// fclose(pCurrItem->fp);// pCurrItem->close_receive=TRUE;// putEmptySpace(&(pCurrItem->shared)); // printf("sendThread() : write fileheader error\n");// return NULL;// } while(pCurrItem->close_send==FALSE){ int index=getDataSpace(&(pCurrItem->shared)); if ( pCurrItem->close_send==TRUE ) { printf("sendThread() : close_send is set, break from while\n"); break; } packet = (LIVE_PACKET *)(pCurrItem->shared.buffer[index]); ret = fwrite(packet->str, sizeof(char), pCurrItem->shared.bufLength[index] - sizeof(int), pCurrItem->fp); if ( ret<=0 ) { fclose(pCurrItem->fp); pCurrItem->close_receive=TRUE; putEmptySpace(&(pCurrItem->shared)); sprintf(str, "rm -rf %s\n", pCurrItem->filename); system(str); printf("sendThread() : write file error, delete file and return\n"); return NULL; } putEmptySpace(&(pCurrItem->shared)); totalChars += ret; } // get bandwidth and length// fileHdr.band = pCurrItem->bandwidth;// fileHdr.length = totalChars;// if ( fileHdr.band<=0l || fileHdr.length<=0l ) {// printf("if ( fileHdr.band<=0l || fileHdr.length<=0l ) {\n");// dbg_print2(FATAL_ERROR, "error, fileHdr.band=%I64d, length=%I64d\n", fileHdr.band, fileHdr.length);// fclose(pCurrItem->fp);// return NULL;// }// fseek(pCurrItem->fp, 0l, SEEK_SET);// ret = fwrite(&fileHdr, sizeof(char), sizeof(TSFILEHEADER)/sizeof(char), pCurrItem->fp);// if ( ret<=0 ) {// printf("if ( ret<=0 ) {\n");// dbg_print0(FATAL_ERROR, "file write error\n");// fclose(pCurrItem->fp);// return NULL;// } fclose(pCurrItem->fp); // dbg_print2(COMMON_LOG, "fileHdr.band=%ld, length=%ld\n", fileHdr.band, fileHdr.length); // write this record to the record file// time_t tempTime;// struct tm *ptm;// time(&tempTime);// ptm = localtime(&tempTime); // pCurrItem->stoptime = *ptm;// record_to_file(pCurrItem); return NULL;}static void record_to_file(PTHREAD_ITEM *p){ FILE* record_fp; record_fp = fopen(fname_record, "a"); if ( record_fp==NULL ) return; fprintf(record_fp, "%d年%d月%d日%d时%d分 %d年%d月%d日%d时%d分 ", p->starttime.tm_year+1900, p->starttime.tm_mon+1, p->starttime.tm_mday, p->starttime.tm_hour, p->starttime.tm_min, p->stoptime.tm_year+1900, p->stoptime.tm_mon+1, p->stoptime.tm_mday, p->stoptime.tm_hour, p->stoptime.tm_min); fprintf(record_fp, "%d ", p->channel); fprintf(record_fp, "%ld ", p->bandwidth); fprintf(record_fp, "%s\n", p->filename); fclose(record_fp); return;} int getEmptySpace(BUFFER_POOL *pShared){ sem_wait(&(pShared->nstored)); return pShared->bufTail;}//return a empty block int getDataSpace(BUFFER_POOL *pShared){ sem_wait(&(pShared->nempty)); return pShared->bufHead;}//return a available data block to send int putDataSpace(BUFFER_POOL *pShared){ pShared->bufTail = (pShared->bufTail+1)%MAXBUFFLEN; sem_post(&(pShared->nempty)); return 1; } int putEmptySpace(BUFFER_POOL *pShared){ pShared->bufHead = (pShared->bufHead+1)%MAXBUFFLEN; sem_post(&(pShared->nstored)); return 1;}void releaseEmptySpace(BUFFER_POOL *pShared){ sem_post(&(pShared->nstored));}void releaseDataSpace(BUFFER_POOL *pShared){ sem_post(&(pShared->nempty));} /********************************************* * task list function *********************************************//* return a task list, header pointed by task_head */int createtasklist(){ time_t tm; FILE *fp_vod; int flag=0; TASK_ITEM *task_ptr = new TASK_ITEM; memset(task_ptr, 0, sizeof(TASK_ITEM)); fp_vod=fopen(fname_vodsheet,"r"); if ( fp_vod==NULL ) return FALSE; while(1){ if(getataskfromfile(fp_vod,task_ptr)==0){ fclose(fp_vod); delete task_ptr; if ( flag==0 ) return FALSE; savetasklist(); return TRUE; } else{ time(&tm); printf("stoptime %ld, currenttime %ld\n", local_mktime(&(task_ptr->stoptime)), tm); if(difftime(local_mktime(&(task_ptr->stoptime)),tm)<0) // outdated item continue; inserttotasklist(task_ptr); task_ptr=(struct task *)new char[sizeof(struct task)]; memset(task_ptr, 0, sizeof(TASK_ITEM)); flag=1; } }// end while fclose(fp_vod); delete(task_ptr); return TRUE;}void sorttasklistbystarttime() { TASK_ITEM * sortedtail_ptr, *currptr, *preptr,*tempptr; sortedtail_ptr=task_head; while(!sortedtail_ptr){ for(currptr=task_head;difftime(local_mktime(&(currptr->stoptime)),local_mktime(&(sortedtail_ptr->next->stoptime)))<0;preptr=currptr,currptr=currptr->next); if(currptr==sortedtail_ptr->next) sortedtail_ptr=sortedtail_ptr->next; else{ tempptr=sortedtail_ptr->next; tempptr->next=currptr; preptr->next=tempptr; sortedtail_ptr->next=sortedtail_ptr->next->next; sortedtail_ptr=sortedtail_ptr->next; } }//end while return;} void savetasklist(){ TASK_ITEM *task_ptr = task_head; FILE *fp_vod; fp_vod=fopen(fname_vodsheet,"w"); if ( fp_vod==NULL ) return; while ( task_ptr!=NULL ) { // write one item to file //年 月 日 时:分 年 月 日 时:分 频道号 频道ip 频道port fprintf(fp_vod, "%d %d %d %d:%d ", task_ptr->starttime.tm_year+1900, task_ptr->starttime.tm_mon+1, task_ptr->starttime.tm_mday, task_ptr->starttime.tm_hour, task_ptr->starttime.tm_min); fprintf(fp_vod, "%d %d %d %d:%d ", task_ptr->stoptime.tm_year+1900, task_ptr->stoptime.tm_mon+1, task_ptr->stoptime.tm_mday, task_ptr->stoptime.tm_hour, task_ptr->stoptime.tm_min); fprintf(fp_vod, "%d %s %d\n", task_ptr->channel, task_ptr->channel_ip, task_ptr->channel_port); task_ptr = task_ptr->next; } fclose(fp_vod); return; }void removetask_from_tasklist(TASK_ITEM **ptask_head, TASK_ITEM *mintask){ struct task *ptr1=*ptask_head, *ptr2=*ptask_head; if ( *ptask_head==NULL || mintask==NULL ) { dbg_print0(COMMON_LOG, "removetask_from_tasklist have error parameter\n"); return; } if ( *ptask_head==mintask ) { *ptask_head = (*ptask_head)->next; return; } ptr1 = *ptask_head; ptr2=(*ptask_head)->next; while (ptr2) { if (ptr2==mintask) { ptr1->next = ptr2->next; return; } ptr1 = ptr1->next; ptr2 = ptr2->next; } return;}TASK_ITEM* findtask_with_minstarttime(TASK_ITEM * task_head){ struct task *ptr1=task_head, *ptr2=task_head;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -