📄 parallel.c
字号:
"Failed to accept next connection. %s", error); ReadInt(otherSock, combinePortNumP); close(otherSock); machineDebug("MASTER: Combine port number = %d", *combinePortNumP);}static voidstartDecodeServer(const char * const encoderName, unsigned int const numMachines, const char * const masterHostName, int const masterPortNum, unsigned int const numInputFiles, const char * const paramFileName, int const masterSocket, int * const decodePortNumP) { char command[1024]; int otherSock; const char * error; snprintf(command, sizeof(command), "%s %s -max_machines %d -decode_server %s %d %d %s", encoder_name, debugMachines ? "-debug_machines" : "", numMachines, masterHostName, masterPortNum, numInputFiles, paramFileName); machineDebug("MASTER: Starting decode server with shell command '%s'", command); safe_fork(command); machineDebug("MASTER: Listening for connection back from " "new Decode server"); AcceptConnection(masterSocket, &otherSock, &error); if (error) errorExit("MASTER SERVER: " "Failed to accept connection back from the new " "decode server. %s", error); ReadInt(otherSock, decodePortNumP); close(otherSock); machineDebug("MASTER: Decode port number = %d", *decodePortNumP);}static voidstartIoServer(const char * const encoderName, unsigned int const numChildren, const char * const masterHostName, int const masterPortNum, int const masterSocket, const char * const paramFileName, int * const ioPortNumP) { char command[1024]; int otherSock; const char * error; sprintf(command, "%s -max_machines %d -io_server %s %d %s", encoderName, numChildren, masterHostName, masterPortNum, paramFileName); machineDebug("MASTER: Starting I/O server with remote shell command '%s'", command); safe_fork(command); machineDebug("MASTER: Listening for connection back from " "new I/O server"); AcceptConnection(masterSocket, &otherSock, &error); if (error) errorExit("MASTER SERVER: " "Failed to accept connection back from the new " "I/O server. %s", error); ReadInt(otherSock, ioPortNumP); close(otherSock); machineDebug("MASTER: I/O port number = %d", *ioPortNumP);} static voidextendToEndOfPattern(unsigned int * const nFramesP, unsigned int const startFrame, unsigned int const framePatternLen, unsigned int const numFramesInStream) { assert(framePatternLen >= 1); while (startFrame + *nFramesP < numFramesInStream && (startFrame + *nFramesP) % framePatternLen != 0) ++(*nFramesP);}static voidallocateInitialFrames(struct scheduler * const schedulerP, boolean const parallelPerfect, boolean const forceIalign, unsigned int const framePatternLen, unsigned int const parallelTestFrames, unsigned int const childNum, unsigned int * const startFrameP, unsigned int * const nFramesP) {/*---------------------------------------------------------------------------- Choose which frames, to hand out to the new child numbered 'childNum'.-----------------------------------------------------------------------------*/ unsigned int const framesPerChild = MAX(1, ((schedulerP->numFramesInJob - schedulerP->nextFrame) / (schedulerP->numMachines - childNum))); unsigned int nFrames; if (parallelPerfect) nFrames = framesPerChild; else { assert(parallelTestFrames >= 1); nFrames = MIN(parallelTestFrames, framesPerChild); } if (forceIalign) extendToEndOfPattern(&nFrames, schedulerP->nextFrame, framePatternLen, schedulerP->numFramesInJob); nFrames = MIN(nFrames, schedulerP->numFramesInJob - schedulerP->nextFrame); *startFrameP = schedulerP->nextFrame; *nFramesP = nFrames; schedulerP->nextFrame += nFrames;}static floattaperedGoalTime(struct childState const childState[], unsigned int const remainingFrameCount) { float goalTime; float allChildrenFPS; float remainingJobTime; /* How long we expect it to be before the whole movie is encoded*/ float sum; int numMachinesToEstimate; unsigned int childNum; /* frames left = lastFrameInStream - startFrame + 1 */ for (childNum = 0, sum = 0.0, numMachinesToEstimate = 0; childNum < numMachines; ++childNum) { if (!childState[childNum].finished) { if (childState[childNum].fps < 0.0 ) ++numMachinesToEstimate; else sum += childState[childNum].fps; } } allChildrenFPS = (float)numMachines * (sum/(float)(numMachines-numMachinesToEstimate)); remainingJobTime = (float)remainingFrameCount/allChildrenFPS; goalTime = MAX(5.0, remainingJobTime/2); return goalTime;}static voidallocateMoreFrames(struct scheduler * const schedulerP, unsigned int const childNum, struct childState const childState[], bool const forceIalign, unsigned int const framePatternLen, bool const goalTimeSpecified, unsigned int const goalTimeArg, unsigned int * const startFrameP, unsigned int * const nFramesP) {/*---------------------------------------------------------------------------- Decide which frames should be child 'childNum''s next assignment, given the state/history of all children is childState[]. The lowest numbered frame which needs yet to be encoded is frame number 'startFrame' and 'lastFrameInStream' is the highest. The allocation always starts at the lowest numbered frame that hasn't yet been allocated and is sequential. We return as *startFrameP the frame number of the first frame in the allocation and as *nFramesP the number of frames. If 'goalTimeSpecified' is true, we try to make the assignment take 'goalTimeArg' seconds. If 'goalTimeSpecified' is not true, we choose a goal time ourselves, which is based on how long we think it will take for all the children to finish all the remaining frames.-----------------------------------------------------------------------------*/ float goalTime; /* Number of seconds we want the assignment to take. We size the assignment to try to meet this goal. */ unsigned int nFrames; float avgFps; if (!goalTimeSpecified) { goalTime = taperedGoalTime(childState, schedulerP->numFramesInJob - schedulerP->nextFrame); pm_message("MASTER: ASSIGNING %s %.2f seconds of work", machineName[childNum], goalTime); } else goalTime = goalTimeArg; if (childState[childNum].numSeconds != 0) avgFps = (float)childState[childNum].numFrames / childState[childNum].numSeconds; else avgFps = 0.1; /* arbitrary small value */ nFrames = MAX(1u, (unsigned int)(goalTime * avgFps + 0.5)); nFrames = MIN(nFrames, schedulerP->numFramesInJob - schedulerP->nextFrame); if (forceIalign) extendToEndOfPattern(&nFrames, schedulerP->nextFrame, framePatternLen, schedulerP->numFramesInJob); *startFrameP = schedulerP->nextFrame; *nFramesP = nFrames; schedulerP->nextFrame += nFrames;}static voidstartChildren(struct scheduler * const schedulerP, const char * const encoderName, const char * const masterHostName, int const masterPortNum, const char * const paramFileName, boolean const parallelPerfect, boolean const forceIalign, unsigned int const framePatternLen, unsigned int const parallelTestFrames, boolean const beNice, int const masterSocket, int const combinePortNum, int const decodePortNum, int * const ioPortNum, unsigned int * const numIoServersP, struct childState ** const childStateP) {/*---------------------------------------------------------------------------- Start up the children. Tell them to work for the master at 'masterHostName':'masterPortNum'. Start I/O servers (as processes on this system) as required and return the port numbers of the TCP ports on which they listen as ioPortNum[] and the number of them as *numIoServersP. Give each of the children some initial work to do. This may be just a small amount for timing purposes. We access and manipulate the various global variables that represent the state of the children, and the scheduler structure.-----------------------------------------------------------------------------*/ struct childState * childState; /* malloc'ed */ unsigned int childNum; unsigned int numIoServers; unsigned int childrenLeftCurrentIoServer; /* The number of additional children we can hook up to the current I/O server before reaching our maximum children per I/O server. 0 if there is no current I/O server. */ MALLOCARRAY_NOFAIL(childState, schedulerP->numMachines); childrenLeftCurrentIoServer = 0; /* No current I/O server yet */ numIoServers = 0; /* None created yet */ for (childNum = 0; childNum < schedulerP->numMachines; ++childNum) { char command[1024]; unsigned int startFrame; unsigned int nFrames; childState[childNum].fps = -1.0; /* illegal value as flag */ childState[childNum].numSeconds = 0; allocateInitialFrames(schedulerP, parallelPerfect, forceIalign, framePatternLen, parallelTestFrames, childNum, &startFrame, &nFrames); if (nFrames == 0) { childState[childNum].finished = TRUE; machineDebug("MASTER: No more frames; not starting child '%s'", machineName[childNum]); } else { childState[childNum].finished = FALSE; if (remote[childNum]) { if (childrenLeftCurrentIoServer == 0) { startIoServer(encoderName, schedulerP->numMachines, masterHostName, masterPortNum, masterSocket, paramFileName, &ioPortNum[numIoServers++]); childrenLeftCurrentIoServer = SOMAXCONN; } --childrenLeftCurrentIoServer; } snprintf(command, sizeof(command), "%s %s -l %s %s " "%s %s -child %s %d %d %d %d %d %d " "-frames %d %d %s", rsh, machineName[childNum], userName[childNum], beNice ? "nice" : "", executable[childNum], debugMachines ? "-debug_machines" : "", masterHostName, masterPortNum, remote[childNum] ? ioPortNum[numIoServers-1] : 0, combinePortNum, decodePortNum, childNum, remote[childNum] ? 1 : 0, startFrame, startFrame + nFrames - 1, remote[childNum] ? remoteParamFile[childNum] : paramFileName ); machineDebug("MASTER: Starting child server " "with shell command '%s'", command); safe_fork(command); machineDebug("MASTER: Frames %d-%d assigned to new child %s", startFrame, startFrame + nFrames - 1, machineName[childNum]); } childState[childNum].startFrame = startFrame; childState[childNum].lastNumFrames = nFrames; childState[childNum].numFrames = childState[childNum].lastNumFrames; } *childStateP = childState; *numIoServersP = numIoServers;}static voidnoteFrameDone(const char * const combineHostName, int const combinePortNum, unsigned int const frameStart, unsigned int const frameEnd) {/*---------------------------------------------------------------------------- Tell the Combine server that frames 'frameStart' through 'frameEnd' are done.-----------------------------------------------------------------------------*/ int const negativeTwo = -2; int clientSocket; time_t tempTimeStart, tempTimeEnd; const char * error; struct hostent * hostEntP; time(&tempTimeStart); hostEntP = NULL; ConnectToSocket(combineHostName, combinePortNum, &hostEntP, &clientSocket, &error); if (error) errorExit("MASTER: Can't connect to Combine server to tell it frames " "are done. %s", error); WriteInt(clientSocket, negativeTwo); WriteInt(clientSocket, frameStart); WriteInt(clientSocket, frameEnd); close(clientSocket); time(&tempTimeEnd); IOtime += (tempTimeEnd-tempTimeStart);}static voidfeedTheChildren(struct scheduler * const schedulerP, struct childState childState[], int const masterSocket, const char * const combineHostName, int const combinePortNum, bool const forceIalign, unsigned int const framePatternLen, bool const goalTimeSpecified, unsigned int const goalTime) {/*---------------------------------------------------------------------------- Listen for children to tell us they have finished their assignments and give them new assignments, until all the frames have been assigned and all the children have finished. As children finish assignments, inform the combine server at 'combineHostName':'combinePortNum' of such. Note that the children got initial assigments when they were created. So the first thing we do is wait for them to finish those.-----------------------------------------------------------------------------*/ unsigned int numFinished; /* Number of child machines that have been excused because there is no more work for them. */ unsigned int framesDone; numFinished = 0; framesDone = 0; while (numFinished != schedulerP->numMachines) { int otherSock; int childNum; int seconds; float framesPerSecond; struct childState * csP; const char * error; unsigned int nextFrame;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -