📄 parallel.c
字号:
bool * const doneP, unsigned char ** const bigBufferP, unsigned int * const bigBufferSizeP) { int otherSock; int command; const char * error; AcceptConnection(serverSocket, &otherSock, &error); if (error) errorExit("I/O SERVER: Failed to accept next connection. %s", error); ReadInt(otherSock, &command); switch (command) { case -1: *doneP = TRUE; break; case -2: decodedFrameToDisk(otherSock); break; case -3: decodedFrameFromDisk(otherSock); break; case -4: routeFromSocketToDisk(otherSock, bigBufferP, bigBufferSizeP); break; default: readFrameWriteToSocket(inputSourceP, otherSock, command); } close(otherSock);} voidIoServer(struct inputSource * const inputSourceP, const char * const parallelHostName, int const portNum) {/*---------------------------------------------------------------------------- Execute an I/O server. An I/O server is the partner on the master machine of a child process on a "remote" system. "Remote" here doesn't just mean on another system. It means on a system that isn't even in the same cluster -- specifically, a system that doesn't have access to the same filesystem as the master. The child process passes frame contents between it and the master via the I/O server.-----------------------------------------------------------------------------*/ int ioPortNum; int serverSocket; boolean done; unsigned char *bigBuffer; /* A work buffer that we keep around permanently. We increase its size as needed, but never shrink it. */ unsigned int bigBufferSize; /* The current allocated size of bigBuffer[] */ const char * error; bigBufferSize = 0; /* Start with no buffer */ bigBuffer = NULL; /* once we get IO port num, should transmit it to parallel server */ CreateListeningSocket(&serverSocket, &ioPortNum, &error); if (error) errorExit("Unable to create socket on which to listen for " "reports from children. %s", error); if (debugSockets) fprintf(stdout, "====I/O USING PORT %d\n", ioPortNum); TransmitPortNum(parallelHostName, portNum, ioPortNum); if (separateConversion) SetFileType(ioConversion); /* for reading */ else SetFileType(inputConversion); done = FALSE; /* initial value */ while (!done) processNextConnection(serverSocket, inputSourceP, &done, &bigBuffer, &bigBufferSize); close(serverSocket); if ( debugSockets ) { fprintf(stdout, "====I/O SERVER: Shutting Down\n"); }}/*===========================================================================* * * SendRemoteFrame * * called by a slave to the I/O server; sends an encoded frame * to the server to be sent to disk * * RETURNS: nothing * * SIDE EFFECTS: none * *===========================================================================*/voidSendRemoteFrame(int const frameNumber, BitBucket * const bb) { int const negativeFour = -4; int clientSocket; time_t tempTimeStart, tempTimeEnd; const char * error; time(&tempTimeStart); ConnectToSocket(IOhostName, ioPortNumber, &hostEntry, &clientSocket, &error); if (error) errorExit("CHILD: Can't connect to I/O server to deliver results. %s", error); WriteInt(clientSocket, negativeFour); WriteInt(clientSocket, frameNumber); if (frameNumber != -1) { /* send number of bytes */ WriteInt(clientSocket, (bb->totalbits+7)>>3); /* now send the bytes themselves */ Bitio_WriteToSocket(bb, clientSocket); } close(clientSocket); time(&tempTimeEnd); IOtime += (tempTimeEnd-tempTimeStart);}voidGetRemoteFrame(MpegFrame * const frameP, int const frameNumber) {/*---------------------------------------------------------------------------- Get a frame from the I/O server. This is intended for use by a child.-----------------------------------------------------------------------------*/ int clientSocket; const char * error; Fsize_Note(frameNumber, yuvWidth, yuvHeight); if (debugSockets) { fprintf(stdout, "MACHINE %s REQUESTING connection for FRAME %d\n", getenv("HOST"), frameNumber); fflush(stdout); } ConnectToSocket(IOhostName, ioPortNumber, &hostEntry, &clientSocket, &error); if (error) errorExit("CHILD: Can't connect to I/O server to get a frame. %s", error); WriteInt(clientSocket, frameNumber); if (frameNumber != -1) { if (separateConversion) { unsigned char buffer[1024]; /* This is by design the exact size of the data per message (except the last message for a frame) the I/O server sends. */ int numBytes; /* Number of data bytes in message */ FILE * filePtr = pm_tmpfile(); /* read in stuff, write to file, perform local conversion */ do { ReadInt(clientSocket, &numBytes); if (numBytes > sizeof(buffer)) errorExit("Invalid message received: numBytes = %d, " "which is greater than %d\n", numBytes, sizeof(numBytes)); ReadBytes(clientSocket, buffer, numBytes); fwrite(buffer, 1, numBytes, filePtr); } while ( numBytes == sizeof(buffer) ); fflush(filePtr); rewind(filePtr); ReadFrameFile(frameP, filePtr, slaveConversion); fclose(filePtr); } else { Frame_AllocYCC(frameP); if (debugSockets) { fprintf(stdout, "MACHINE %s allocated YCC FRAME %d\n", getenv("HOST"), frameNumber); fflush(stdout); } readYUVOrig(clientSocket, yuvWidth, yuvHeight, frameP); } } WriteInt(clientSocket, 0); close(clientSocket); if ( debugSockets ) { fprintf(stdout, "MACHINE %s READ COMPLETELY FRAME %d\n", getenv("HOST"), frameNumber); fflush(stdout); }}struct combineControl { unsigned int numFrames;};static voidgetAndProcessACombineConnection(int const outputServerSocket) { int otherSock; int command; const char * error; AcceptConnection(outputServerSocket, &otherSock, &error); if (error) errorExit("COMBINE SERVER: " "Failed to accept next connection. %s", error); ReadInt(otherSock, &command); if (command == -2) { /* this is notification from non-remote process that a frame is done. */ int frameStart, frameEnd; ReadInt(otherSock, &frameStart); ReadInt(otherSock, &frameEnd); machineDebug("COMBINE_SERVER: Frames %d - %d done", frameStart, frameEnd); { unsigned int i; for (i = frameStart; i <= frameEnd; ++i) frameDone[i] = TRUE; } } else errorExit("COMBINE SERVER: Unrecognized command %d received.", command); close(otherSock);}#define READ_ATTEMPTS 5 /* number of times (seconds) to retry an input file */static voidopenInputFile(const char * const fileName, FILE ** const inputFilePP) { FILE * inputFileP; unsigned int attempts; inputFileP = NULL; attempts = 0; while (!inputFileP && attempts < READ_ATTEMPTS) { inputFileP = fopen(fileName, "rb"); if (inputFileP == NULL) { pm_message("ERROR Couldn't read frame file '%s' errno = %d (%s)" "attempt %d", fileName, errno, strerror(errno), attempts); sleep(1); } ++attempts; } if (inputFileP == NULL) pm_error("Unable to open file '%s' after %d attempts.", fileName, attempts); *inputFilePP = inputFileP;}static voidwaitForOutputFile(void * const inputHandle, unsigned int const frameNumber, FILE ** const ifPP) {/*---------------------------------------------------------------------------- Keep handling output events until we get the specified frame number. Open the file it's in and return the stream handle.-----------------------------------------------------------------------------*/ struct combineControl * const combineControlP = (struct combineControl *) inputHandle; if (frameNumber >= combineControlP->numFrames) *ifPP = NULL; else { const char * fileName; while (!frameDone[frameNumber]) { machineDebug("COMBINE_SERVER: Waiting for frame %u done", frameNumber); getAndProcessACombineConnection(outputServerSocket); } machineDebug("COMBINE SERVER: Wait for frame %u over", frameNumber); asprintfN(&fileName, "%s.frame.%u", outputFileName, frameNumber); openInputFile(fileName, ifPP); strfree(fileName); }}static voidunlinkFile(void * const inputHandle, unsigned int const frameNumber) { if (!keepTempFiles) { const char * fileName; asprintfN(&fileName, "%s.frame.%u", outputFileName, frameNumber); unlink(fileName); strfree(fileName); }}voidCombineServer(int const numFrames, const char * const masterHostName, int const masterPortNum) {/*---------------------------------------------------------------------------- Execute a combine server. This handles combination of frames.-----------------------------------------------------------------------------*/ int combinePortNum; FILE * ofp; const char * error; struct combineControl combineControl; /* once we get Combine port num, should transmit it to parallel server */ CreateListeningSocket(&outputServerSocket, &combinePortNum, &error); if (error) errorExit("Unable to create socket on which to listen. %s", error); machineDebug("COMBINE SERVER: LISTENING ON PORT %d", combinePortNum); TransmitPortNum(masterHostName, masterPortNum, combinePortNum); MALLOCARRAY_NOFAIL(frameDone, numFrames); { unsigned int i; for (i = 0; i < numFrames; ++i) frameDone[i] = FALSE; } ofp = pm_openw(outputFileName); combineControl.numFrames = numFrames; FramesToMPEG(ofp, &combineControl, &waitForOutputFile, &unlinkFile); machineDebug("COMBINE SERVER: Shutting down"); /* tell Master server we are done */ TransmitPortNum(masterHostName, masterPortNum, combinePortNum); close(outputServerSocket); fclose(ofp);}/*=====================* * MASTER SERVER STUFF * *=====================*/static voidstartCombineServer(const char * const encoderName, unsigned int const numMachines, const char * const masterHostName, int const masterPortNum, unsigned int const numInputFiles, const char * const paramFileName, int const masterSocket, int * const combinePortNumP) { char command[1024]; int otherSock; const char * error; snprintf(command, sizeof(command), "%s %s -max_machines %d -output_server %s %d %d %s", encoderName, debugMachines ? "-debug_machines" : "", numMachines, masterHostName, masterPortNum, numInputFiles, paramFileName); machineDebug("MASTER: Starting combine server with shell command '%s'", command); safe_fork(command); machineDebug("MASTER: Listening for connection back from " "new Combine server"); AcceptConnection(masterSocket, &otherSock, &error); if (error) errorExit("MASTER SERVER: "
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -