📄 parallel.c
字号:
errorExit("I/O SERVER: Failed to accept next connection. %s", error); /* should we verify this is decode server? */ /* for now, we won't */ close(otherSock); close(waitSocket); } else { close(clientSocket); } if ( debugSockets ) { fprintf(stdout, "YE-HA FRAME %d IS NOW READY\n", id); }}/*===========================================================================* * * SendDecodedFrame * * Send the frame to the decode server. * * RETURNS: nothing * * SIDE EFFECTS: none * *===========================================================================*/voidSendDecodedFrame(MpegFrame * const frameP) {/*---------------------------------------------------------------------------- Send frame *frameP to the decode server.-----------------------------------------------------------------------------*/ int const negativeTwo = -2; int clientSocket; const char * error; /* send to IOServer */ ConnectToSocket(IOhostName, ioPortNumber, &hostEntry, &clientSocket, &error); if (error) errorExit("CHILD: Can't connect to decode server to " "give it a decoded frame. %s", error); WriteInt(clientSocket, negativeTwo); WriteInt(clientSocket, frameP->id); writeYUVDecoded(clientSocket, Fsize_x, Fsize_y, frameP); close(clientSocket);}/*===========================================================================* * * GetRemoteDecodedFrame * * get the decoded frame from the decode server. * * RETURNS: nothing * * SIDE EFFECTS: * *===========================================================================*/voidGetRemoteDecodedRefFrame(MpegFrame * const frameP, int const frameNumber) {/*---------------------------------------------------------------------------- Get decoded frame number 'frameNumber' *frameP from the decode server.-----------------------------------------------------------------------------*/ int const negativeThree = -3; int clientSocket; const char * error; /* send to IOServer */ ConnectToSocket(IOhostName, ioPortNumber, &hostEntry, &clientSocket, &error); if (error) errorExit("CHILD: Can't connect to decode server " "to get a decoded frame. %s", error); /* ask IOServer for decoded frame */ WriteInt(clientSocket, negativeThree); WriteInt(clientSocket, frameP->id); readYUVDecoded(clientSocket, Fsize_x, Fsize_y, frameP); close(clientSocket);}/********* routines handling forks, execs, PIDs and signals save, system-style forks apian@ise.fhg.de *******//*===========================================================================* * * cleanup_fork * * Kill all the children, to be used when we get killed * * RETURNS: nothing * * SIDE EFFECTS: kills other processes * *===========================================================================*/static void cleanup_fork( dummy ) /* try to kill all child processes */ int dummy;{ register int i; for (i = 0; i < current_max_forked_pid; ++i ) {#ifdef DEBUG_FORK fprintf(stderr, "cleanup_fork: killing PID %d\n", ClientPid[i]);#endif if (kill(ClientPid[i], TERMINATE_PID_SIGNAL)) { fprintf(stderr, "cleanup_fork: killed PID=%d failed (errno %d)\n", ClientPid[i], errno); } }}/*===========================================================================* * * safe_fork * * fork a command * * RETURNS: success/failure * * SIDE EFFECTS: Fork the command, and save to PID so you can kil it later! * *===========================================================================*/static int safe_fork(command) /* fork child process and remember its PID */ char *command;{ static int init=0; char *argis[MAXARGS]; register int i=1; if (!(argis[0] = strtok(command, " \t"))) return(0); /* tokenize */ while ((argis[i] = strtok(NULL, " \t")) && i < MAXARGS) ++i; argis[i] = NULL; #ifdef DEBUG_FORK {register int i=0; fprintf(stderr, "Command %s becomes:\n", command); while(argis[i]) {fprintf(stderr, "--%s--\n", argis[i]); ++i;} }#endif if (!init) { /* register clean-up routine */ signal (SIGQUIT, cleanup_fork); signal (SIGTERM, cleanup_fork); signal (SIGINT , cleanup_fork); init=1; } if (-1 == (ClientPid[current_max_forked_pid] = fork()) ) { perror("safe_fork: fork failed "); return(-1); } if( !ClientPid[current_max_forked_pid]) { /* we are in child process */ execvp(argis[0], argis ); perror("safe_fork child: exec failed "); exit(1); }#ifdef DEBUG_FORK fprintf(stderr, "parallel: forked PID=%d\n", ClientPid[current_max_forked_pid]);#endif current_max_forked_pid++; return(0);}/*=====================* * EXPORTED PROCEDURES * *=====================*/ /*=================* * IO SERVER STUFF * *=================*//*===========================================================================* * * SetIOConvert * * sets the IO conversion to be separate or not. If separate, then * some post-processing is done at slave end * * RETURNS: nothing * * SIDE EFFECTS: none * *===========================================================================*/voidSetIOConvert(separate) boolean separate;{ separateConversion = separate;}/*===========================================================================* * * SetParallelPerfect * * If this is called, then frames will be divided up completely, and * evenly (modulo rounding) between all the processors * * RETURNS: nothing * * SIDE EFFECTS: Sets parallelPerfect .... * *===========================================================================*/voidSetParallelPerfect(val)boolean val;{ parallelPerfect = val;}/*===========================================================================* * * SetRemoteShell * * sets the remote shell program (usually rsh, but different on some * machines) * * RETURNS: nothing * * SIDE EFFECTS: none * *===========================================================================*/voidSetRemoteShell(const char * const shell) { strcpy(rsh, shell);}static voiddecodedFrameToDisk(int const otherSock) {/*---------------------------------------------------------------------------- Get a decoded from from socket 'otherSock' and write it to disk.-----------------------------------------------------------------------------*/ int frameNumber; MpegFrame * frameP; ReadInt(otherSock, &frameNumber); if (debugSockets) { fprintf(stdout, "INPUT SERVER: GETTING DECODED FRAME %d\n", frameNumber); fflush(stdout); } /* should read frame from socket, then write to disk */ frameP = Frame_New(frameNumber, 'i'); Frame_AllocDecoded(frameP, TRUE); readYUVDecoded(otherSock, Fsize_x, Fsize_y, frameP); /* now output to disk */ WriteDecodedFrame(frameP); Frame_Free(frameP);} static voiddecodedFrameFromDisk(int const otherSock) { /* request for decoded frame from disk */ int frameNumber; MpegFrame * frameP; ReadInt(otherSock, &frameNumber); if (debugSockets) { fprintf(stdout, "INPUT SERVER: READING DECODED FRAME %d " "from DISK\n", frameNumber); fflush(stdout); } /* should read frame from disk, then write to socket */ frameP = Frame_New(frameNumber, 'i'); Frame_AllocDecoded(frameP, TRUE); ReadDecodedRefFrame(frameP, frameNumber); writeYUVDecoded(otherSock, Fsize_x, Fsize_y, frameP); Frame_Free(frameP);} static voidrouteFromSocketToDisk(int const otherSock, unsigned char ** const bigBufferP, unsigned int * const bigBufferSizeP) { /* routing output frame from socket to disk */ int frameNumber; int numBytes; unsigned char * bigBuffer; unsigned int bigBufferSize; const char * fileName; FILE * filePtr; bigBuffer = *bigBufferP; bigBufferSize = *bigBufferSizeP; ReadInt(otherSock, &frameNumber); ReadInt(otherSock, &numBytes); /* Expand bigBuffer if necessary to fit this frame */ if (numBytes > bigBufferSize) { bigBufferSize = numBytes; if (bigBuffer != NULL) free(bigBuffer); MALLOCARRAY_NOFAIL(bigBuffer, bigBufferSize); } /* now read in the bytes */ ReadBytes(otherSock, bigBuffer, numBytes); /* open file to output this stuff to */ asprintfN(&fileName, "%s.frame.%d", outputFileName, frameNumber); filePtr = fopen(fileName, "wb"); if (filePtr == NULL) errorExit("I/O SERVER: Could not open output file(3): %s", fileName); strfree(fileName); /* now write the bytes here */ fwrite(bigBuffer, sizeof(char), numBytes, filePtr); fclose(filePtr); if (debugSockets) { fprintf(stdout, "====I/O SERVER: WROTE FRAME %d to disk\n", frameNumber); fflush(stdout); } *bigBufferP = bigBuffer; *bigBufferSizeP = bigBufferSize;} static voidreadFrameWriteToSocket(struct inputSource * const inputSourceP, int const otherSock, int const frameNumber) { /* should read in frame, then write to socket */ if (debugSockets) { fprintf(stdout, "I/O SERVER GETTING FRAME %d\n", frameNumber); fflush(stdout); } if (separateConversion) { FILE * filePtr; bool eof; /* do conversion and send right to the socket */ filePtr = ReadIOConvert(inputSourceP, frameNumber); eof = FALSE; /* initial value */ while (!eof) { unsigned char buffer[1024]; unsigned int numBytes; numBytes = fread(buffer, 1, sizeof(buffer), filePtr); if (numBytes > 0) { WriteInt(otherSock, numBytes); WriteBytes(otherSock, buffer, numBytes); } else eof = TRUE; } if (strcmp(ioConversion, "*") == 0 ) fclose(filePtr); else pclose(filePtr); } else { MpegFrame * frameP; frameP = Frame_New(frameNumber, 'i'); ReadFrame(frameP, inputSourceP, frameNumber, inputConversion); writeYUVOrig(otherSock, Fsize_x, Fsize_y, frameP); Frame_Free(frameP); /* now, make sure we don't leave until other processor read everything */ { int dummy; ReadInt(otherSock, &dummy); assert(dummy == 0); } } if (debugSockets) { fprintf(stdout, "====I/O SERVER: READ FRAME %d\n", frameNumber); }}static voidprocessNextConnection(int const serverSocket, struct inputSource * const inputSourceP,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -