📄 save_daemon.cpp
字号:
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <signal.h>#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <errno.h>#include <fcntl.h>#include <sys/time.h>//#include <sched.h>#include <semaphore.h>#include <signal.h>#include <pthread.h>#include "mysock.h"#include "vodtypes.h"// Revised by wyj in 2002/11/19// task_head is sorted by channel, starttime, so if get a earlist task to // run, must see the starttime of every task item. So we add patch to the // decision, add findtask_with_minstarttime().// local system's timezonelong int global_tm_gmtoff=0; /* Seconds east of UTC. */const char *global_tm_zone=NULL; /* Timezone abbreviation. */TASK_ITEM *task_head=NULL;PTHREAD_ITEM *pthread_header = NULL;int debug_level=FATAL_ERROR|COMMON_LOG;char fname_log[MAX_FILE_NAME_LEN]="save_daemon.log";int packetNum = 0;char lip[MAX_IP_NAME_LEN]="0.0.0.0";char fname_record[MAX_FILE_NAME_LEN]="record.txt";char fname_vodsheet[MAX_FILE_NAME_LEN]="vodsheet.txt";char fname_task[MAX_FILE_NAME_LEN]="task.list";char fname_thread[MAX_FILE_NAME_LEN]="thread.list";int wm_close_flag = 0;char filepath[MAX_FILE_NAME_LEN]="./";int change_task_list=FALSE, change_pthread_list=FALSE;int getEmptySpace(BUFFER_POOL *pShared);int getDataSpace(BUFFER_POOL *pShared);int putDataSpace(BUFFER_POOL *pShared);int putEmptySpace(BUFFER_POOL *pShared);void releaseEmptySpace(BUFFER_POOL *pShared);void releaseDataSpace(BUFFER_POOL *pShared);static int main_saveThread_Create(PTHREAD_ITEM *p);static void main_saveThread_Quit(PTHREAD_ITEM *p);static void *main_saveThread(void *p);static void *receiveThread(void *);static void *sendThread(void *);int createtasklist();void sorttasklistbystarttime();void savetasklist();TASK_ITEM* findtask_with_minstarttime(TASK_ITEM * task_head);void removetask_from_tasklist(TASK_ITEM **ptask_head, TASK_ITEM *mintask);void inserttotasklist(TASK_ITEM * item);int getataskfromfile(FILE *fp,struct task * item);PTHREAD_ITEM *addto_pthreadlist(TASK_ITEM *task_ptr);void fillfilename(PTHREAD_ITEM* ptr);time_t local_mktime(struct tm *timeptr);static int compare_tm(struct tm *p1, struct tm *p2 );static void DoCommand( int fd );static void record_to_file( PTHREAD_ITEM *p);static void output_task_list(TASK_ITEM *pheader);static void output_pthread_list(PTHREAD_ITEM *pheader);static void dumpcommand(SAVE_CMD *cmd);static void dumptaskitem(TASK_ITEM *task_ptr);void wm_Update(int signo){ // update task list from sheet_file}void wm_Quit(int signo){ wm_close_flag = 1; printf("wm_close_flag, receive %d\n", signo); return;}void receiveQuit(int signo){ printf("receive Quit now, receive %d\n", signo); return;}int main(int argv, char **argc){ int index=0; int sockfd=0; int ret=0; char currentDir[MAX_FILE_NAME_LEN]="./"; // get program's directory char * tempstr=NULL; strcpy(currentDir,argc[0]); tempstr = rindex(currentDir,'/'); if ( tempstr==NULL ) strcpy(currentDir, "./"); else *(tempstr+1)='\0'; strcpy(fname_record, currentDir); strcat(fname_record,"record.txt"); strcpy(fname_vodsheet, currentDir); strcat(fname_vodsheet,"vodsheet.txt"); strcpy(fname_task, currentDir); strcat(fname_task,"task.list"); strcpy(fname_thread, currentDir); strcat(fname_thread,"thread.list"); strcpy(fname_log, currentDir); strcat(fname_log,"save_daemon.log"); strcpy(filepath, currentDir); /* get remote and local ip+port, mpeg filename from argument */ for ( index=1; index<argv; index++) { if ( (tempstr=strstr(argc[index], "sheet_file="))!=NULL ) { tempstr+=11; strcpy(fname_vodsheet, tempstr); } else if ( (tempstr=strstr(argc[index], "path="))!=NULL ) { tempstr+=5; strcpy(filepath, tempstr); } else if ( (tempstr=strstr(argc[index], "lip="))!=NULL ) { tempstr+=4; strcpy(lip, tempstr); } else if ( (tempstr=strstr(argc[index], "--help"))!=NULL ) { printf("Usage: save_daemon [sheet_file=XXX] [path=] [lip=]\n"); printf(" sheet_file the file of program sheet\n"); printf(" path record file's default directory\n"); printf(" lip local interface address\n"); return 0; } } sigset_t sigset; signal(SIGUSR1, wm_Update); signal(SIGQUIT, wm_Quit); signal(SIGTERM, wm_Quit); signal(SIGINT, wm_Quit); signal(SIGSTOP, wm_Quit); signal(SIGALRM, SIG_DFL); signal(SIGCHLD,SIG_IGN); // avoid child to be dead process sigemptyset(&sigset); sigprocmask(SIG_BLOCK, &sigset, NULL); // get current system's timezone and tm_gmtoff { time_t tempTime; struct tm *ptm; time(&tempTime); ptm = localtime(&tempTime); global_tm_zone= ptm->tm_zone; global_tm_gmtoff = ptm->tm_gmtoff; } // create command socket sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (sockfd < 0) { dbg_print1(COMMON_LOG, "socket() failed, Err: %d\n", errno); return 0; } struct sockaddr_in st_addr; char on=1; setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on)); st_addr.sin_family = AF_INET; st_addr.sin_addr.s_addr = inet_addr(lip); st_addr.sin_port = htons(SAVE_DAEMON_CMD_PORT); ret = bind(sockfd, (struct sockaddr*)&st_addr, sizeof(st_addr)); if ( ret<0 ) { dbg_print1(COMMON_LOG, "bind() failed, Err: %d\n", errno); close(sockfd); return 0; } createtasklist(); output_task_list(task_head); output_pthread_list(pthread_header); time_t tm; TASK_ITEM *task_ptr; PTHREAD_ITEM *pthread_ptr; double elapse; struct timeval tv; fd_set rset; int maxfdp; int m_bSocketReady; struct task *mintask; FD_ZERO(&rset); maxfdp = sockfd+1; m_bSocketReady = FALSE; while (wm_close_flag!=1) { while ( m_bSocketReady ) { // deal with remote command DoCommand(sockfd); tv.tv_sec = 0l; tv.tv_usec = 0l; FD_SET(sockfd, &rset); maxfdp = sockfd+1; ret = select(maxfdp, &rset, NULL, NULL, &tv); if ( FD_ISSET(sockfd, &rset) ) { m_bSocketReady = TRUE; continue; } m_bSocketReady = FALSE; if ( ret==0 ) {// timeout // Revised 2002/11/19 continue; } else { // if ( ret<0 ) wm_close_flag=1; } } if ( change_task_list==TRUE ) { output_task_list(task_head); change_task_list=FALSE; } if ( change_pthread_list==TRUE ) { output_pthread_list(pthread_header); change_pthread_list=FALSE; } if ( wm_close_flag==1 ) continue; time(&tm); if ( task_head ) { // (1) get the earlist starttime of task_head, 2002/11/19 mintask = findtask_with_minstarttime(task_head); dbg_print0(COMMON_LOG, "dumping mintask if (task_head)\n"); dumptaskitem(mintask); if ( difftime(tm, local_mktime(&(mintask->starttime)))>=0 ){//get an task and startup // (2) delete this task and add a new pthread, 2002/11/19 { TASK_ITEM *task_ptr=task_head; dbg_print0(COMMON_LOG, "before remove, dumping task_head\n"); while (task_ptr) { dumptaskitem(task_ptr); task_ptr = task_ptr->next; } } removetask_from_tasklist(&task_head, mintask); { TASK_ITEM *task_ptr=task_head; dbg_print0(COMMON_LOG, "after remove, dumping task_head\n"); while (task_ptr) { dumptaskitem(task_ptr); task_ptr = task_ptr->next; } } task_ptr = mintask; pthread_ptr = addto_pthreadlist(task_ptr); delete task_ptr; dbg_print0(COMMON_LOG, "del a task item\n"); if ( pthread_ptr ) { main_saveThread_Create(pthread_ptr); dbg_print0(COMMON_LOG, "add a pthread item\n"); } output_task_list(task_head); output_pthread_list(pthread_header); continue; } } if ( pthread_header ) { if ( pthread_header->died==TRUE ) { //remove a finished pthread pthread_ptr = pthread_header; pthread_header = pthread_header->next; delete pthread_ptr; dbg_print0(COMMON_LOG, "del a died pthread item\n"); output_pthread_list(pthread_header); if ( pthread_header==NULL ) continue; } if ( difftime(tm, local_mktime(&(pthread_header->stoptime)))>=0 ) { //stop and remove a timeout pthread main_saveThread_Quit(pthread_header); pthread_ptr = pthread_header; pthread_header = pthread_header->next; delete pthread_ptr; dbg_print0(COMMON_LOG, "stop and del a timeout pthread item\n"); output_pthread_list(pthread_header); continue; } } if ( !pthread_header && !task_head ) { // no task and thread, wait command and signal dbg_print0(COMMON_LOG, "no pthread and task item, wait\n");// sigsuspend(&sigset); // wait on socket and sigset FD_SET(sockfd, &rset); maxfdp = sockfd+1; ret = select(maxfdp, &rset, NULL, NULL, NULL); if ( FD_ISSET(sockfd, &rset) ) {// else breaked by signal m_bSocketReady = TRUE; } continue; } else if ( !pthread_header && task_head ) { // wait until next earlist task's starttime // (2) wait until this task's starttime, 2002/11/19 elapse = difftime( local_mktime(&(mintask->starttime)),tm );// sleep((int)elapse); // wait on socket and timeout tv.tv_sec = (int)elapse; tv.tv_usec = 0l; FD_SET(sockfd, &rset); maxfdp = sockfd+1; ret = select(maxfdp, &rset, NULL, NULL, &tv); if ( FD_ISSET(sockfd, &rset) ) { m_bSocketReady = TRUE; } continue; } else if ( pthread_header && !task_head ) { // wait until next pthread's stoptime // Revised 2002/11/19 elapse = difftime( local_mktime(&(pthread_header->stoptime)),tm ); dbg_print1(COMMON_LOG,"wait next pthread stoptime expired %f\n",elapse);// sleep((int)elapse); // wait on socket and timeout tv.tv_sec = (int)elapse; tv.tv_usec = 0l; FD_SET(sockfd, &rset); maxfdp = sockfd+1; ret = select(maxfdp, &rset, NULL, NULL, &tv); if ( FD_ISSET(sockfd, &rset) ) { m_bSocketReady = TRUE; } continue; } else { // wait until next earlist task's starttime or pthread's stoptime // Revised 2002/11/19 if ( difftime(local_mktime(&(pthread_header->stoptime)), local_mktime(&(mintask->starttime)))<=0 ) { elapse = difftime( local_mktime(&(pthread_header->stoptime)),tm ); dbg_print1(COMMON_LOG,"wait next pthread stoptime expired %f\n",elapse); } else { elapse = difftime( local_mktime(&(mintask->starttime)),tm ); dbg_print1(COMMON_LOG, "wait next task starttime expired %f\n", elapse); }// sleep((int)elapse); // wait on socket and timeout tv.tv_sec = (int)elapse; tv.tv_usec = 0l; FD_SET(sockfd, &rset); maxfdp = sockfd+1; ret = select(maxfdp, &rset, NULL, NULL, &tv); if ( FD_ISSET(sockfd, &rset) ) { m_bSocketReady = TRUE; } continue; }// end sleep and suspend }// end while dbg_print0(COMMON_LOG, "main process end\n"); while ( pthread_header ) { pthread_ptr = pthread_header; pthread_header = pthread_header->next; main_saveThread_Quit(pthread_ptr); delete pthread_ptr; } while ( task_head ) { task_ptr = task_head; task_head = task_head->next; delete task_ptr; } output_task_list(task_head); output_pthread_list(pthread_header); close(sockfd); // close socket return 0;}static int main_saveThread_Create(PTHREAD_ITEM *p){ time_t tempTime; struct tm *ptm; time(&tempTime); ptm = localtime(&tempTime); p->starttime = *ptm; // now ptm have local system time_zone p->close_receive = p->close_send = p->died = FALSE; if(pthread_create(&(p->pid),NULL,main_saveThread,p)){ dbg_print0(FATAL_ERROR, "main_saveThread create error\n"); p->died = TRUE; return FALSE; } dbg_print2(COMMON_LOG, "main_saveThread create one pthread, pid %d, filename %s\n", p->pid, p->filename); return TRUE;}static void main_saveThread_Quit(PTHREAD_ITEM *p){ printf("Enter main_saveThread\n"); if ( p->died==TRUE ) return; if ( p->pid<=0 ) return; p->close_receive = TRUE; pthread_kill(p->receivepid, SIGALRM); p->t_sock.Close(); dbg_print1(COMMON_LOG, "filename %s, close_recieve=TRUE\n", p->filename); printf("Enter main_saveThread(), wait p->pid quit\n"); pthread_join(p->pid, NULL); printf("Release main_saveThread(), wait p->pid quit\n"); p->died = TRUE; dbg_print1(COMMON_LOG, "main_saveThread quit one pthread, filename %s\n", p->filename); return ;}/************************************************************************ * close main_saveThread: * close t_sock and set close_receive to let the receiveThread quit * signal sendThread to quit when receiveThread quit * sendThread quit to signal main_saveThread quit * main_saveThread quit to signal main process's pthread_join ************************************************************************/static void *main_saveThread(void *p){ PTHREAD_ITEM *pCurrItem; pCurrItem = (PTHREAD_ITEM*)p; if ( !(pCurrItem->t_sock.Create(pCurrItem->channel_ip, pCurrItem->channel_port, lip, 0, S_WYJ_RECV)) ){ pCurrItem->died = TRUE; dbg_print0(FATAL_ERROR, "create socket failure\n"); return NULL; }// closed by main() to let receiveThread quit pCurrItem->fp = fopen(pCurrItem->filename, "wb"); if ( pCurrItem->fp==NULL ) { pCurrItem->t_sock.Close();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -