📄 parallel.c
字号:
unsigned int nFrames; machineDebug("MASTER: Listening for a connection..."); AcceptConnection(masterSocket, &otherSock, &error); if (error) errorExit("MASTER SERVER: " "Failed to accept next connection. %s", error); ReadInt(otherSock, &childNum); ReadInt(otherSock, &seconds); csP = &childState[childNum]; csP->numSeconds += seconds; csP->fps = (float)csP->numFrames / (float)csP->numSeconds; if (seconds != 0) framesPerSecond = (float)csP->lastNumFrames / (float)seconds; else framesPerSecond = (float)csP->lastNumFrames * 2.0; machineDebug("MASTER: Child %s FINISHED ASSIGNMENT. " "%f frames per second", machineName[childNum], framesPerSecond); noteFrameDone(combineHostName, combinePortNum, csP->startFrame, csP->startFrame + csP->lastNumFrames - 1); framesDone += csP->lastNumFrames; allocateMoreFrames(schedulerP, childNum, childState, forceIalign, framePatternLen, goalTimeSpecified, goalTime, &nextFrame, &nFrames); if (nFrames == 0) { WriteInt(otherSock, -1); WriteInt(otherSock, 0); ++numFinished; machineDebug("MASTER: NO MORE WORK FOR CHILD %s. " "(%d of %d children now done)", machineName[childNum], numFinished, numMachines); } else { WriteInt(otherSock, nextFrame); WriteInt(otherSock, nextFrame + nFrames - 1); machineDebug("MASTER: Frames %d-%d assigned to child %s", nextFrame, nextFrame + nFrames - 1, machineName[childNum]); csP->startFrame = nextFrame; csP->lastNumFrames = nFrames; csP->numFrames += csP->lastNumFrames; } close(otherSock); machineDebug("MASTER: %d/%d DONE; %d ARE ASSIGNED", framesDone, schedulerP->numFramesInJob, schedulerP->nextFrame - framesDone); }}static voidstopIoServers(const char * const hostName, int const ioPortNum[], unsigned int const numIoServers) { unsigned int childNum; IOhostName = hostName; for (childNum = 0; childNum < numIoServers; ++childNum) { ioPortNumber = ioPortNum[childNum]; EndIOServer(); }}static voidwaitForCombineServerToTerminate(int const masterSocket) { int otherSock; const char * error; machineDebug("MASTER SERVER: Waiting for combine server to terminate"); AcceptConnection(masterSocket, &otherSock, &error); if (error) errorExit("MASTER SERVER: " "Failed to accept connection expected from a " "terminating combine server. %s", error); { int dummy; ReadInt(otherSock, &dummy); } close(otherSock);} static voidprintFinalStats(FILE * const statfileP, time_t const startUpBegin, time_t const startUpEnd, time_t const shutDownBegin, time_t const shutDownEnd, unsigned int const numChildren, struct childState const childState[], unsigned int const numFrames) { unsigned int pass; FILE * fileP; for (pass = 0; pass < 2; ++pass) { if (pass == 0) fileP = stdout; else fileP = statfileP; if (fileP) { unsigned int childNum; float totalFPS; fprintf(fileP, "\n\n"); fprintf(fileP, "PARALLEL SUMMARY\n"); fprintf(fileP, "----------------\n"); fprintf(fileP, "\n"); fprintf(fileP, "START UP TIME: %u seconds\n", (unsigned int)(startUpEnd - startUpBegin)); fprintf(fileP, "SHUT DOWN TIME: %u seconds\n", (unsigned int)(shutDownEnd - shutDownBegin)); fprintf(fileP, "%14.14s %8.8s %8.8s %12.12s %9.9s\n", "MACHINE", "Frames", "Seconds", "Frames/Sec", "Self Time"); fprintf(fileP, "%14.14s %8.8s %8.8s %12.12s %9.9s\n", "--------------", "--------", "--------", "------------", "---------"); totalFPS = 0.0; for (childNum = 0; childNum < numChildren; ++childNum) { float const localFPS = (float)childState[childNum].numFrames / childState[childNum].numSeconds; fprintf(fileP, "%14.14s %8u %8u %12.4f %8u\n", machineName[childNum], childState[childNum].numFrames, childState[childNum].numSeconds, localFPS, (unsigned int)((float)numFrames/localFPS)); totalFPS += localFPS; } fprintf(fileP, "%14.14s %8.8s %8.8s %12.12s %9.9s\n", "--------------", "--------", "--------", "------------", "---------"); fprintf(fileP, "%14s %8.8s %8u %12.4f\n", "OPTIMAL", "", (unsigned int)((float)numFrames/totalFPS), totalFPS); { unsigned int const diffTime = shutDownEnd - startUpBegin; fprintf(fileP, "%14s %8.8s %8u %12.4f\n", "ACTUAL", "", diffTime, (float)numFrames / diffTime); } fprintf(fileP, "\n\n"); } }}voidMasterServer(struct inputSource * const inputSourceP, const char * const paramFileName, const char * const outputFileName) {/*---------------------------------------------------------------------------- Execute the master server function. Start all the other servers.-----------------------------------------------------------------------------*/ const char *hostName; int portNum; int masterSocket; /* The file descriptor for the socket on which the master listens */ int ioPortNum[MAX_IO_SERVERS]; int combinePortNum, decodePortNum; struct childState * childState; /* malloc'ed */ unsigned int numIoServers; time_t startUpBegin, startUpEnd; time_t shutDownBegin, shutDownEnd; const char * error; struct scheduler scheduler; time(&startUpBegin); scheduler.nextFrame = 0; scheduler.numFramesInJob = inputSourceP->numInputFiles; scheduler.numMachines = numMachines; PrintStartStats(startUpBegin, FALSE, 0, 0, inputSourceP); hostName = getHostName(); hostEntry = gethostbyname(hostName); if (hostEntry == NULL) errorExit("Could not find host name '%s' in database", hostName); CreateListeningSocket(&masterSocket, &portNum, &error); if (error) errorExit("Unable to create socket on which to listen. %s", error); if (debugSockets) fprintf(stdout, "---MASTER USING PORT %d\n", portNum); startCombineServer(encoder_name, numMachines, hostName, portNum, inputSourceP->numInputFiles, paramFileName, masterSocket, &combinePortNum); if (referenceFrame == DECODED_FRAME) startDecodeServer(encoder_name, numMachines, hostName, portNum, inputSourceP->numInputFiles, paramFileName, masterSocket, &decodePortNum); startChildren(&scheduler, encoder_name, hostName, portNum, paramFileName, parallelPerfect, forceIalign, framePatternLen, parallelTestFrames, niceProcesses, masterSocket, combinePortNum, decodePortNum, ioPortNum, &numIoServers, &childState); time(&startUpEnd); feedTheChildren(&scheduler, childState, masterSocket, hostName, combinePortNum, forceIalign, framePatternLen, parallelTimeChunks != -1, parallelTimeChunks); assert(scheduler.nextFrame == scheduler.numFramesInJob); time(&shutDownBegin); stopIoServers(hostName, ioPortNum, numIoServers); waitForCombineServerToTerminate(masterSocket); close(masterSocket); time(&shutDownEnd); printFinalStats(statFile, startUpBegin, startUpEnd, shutDownBegin, shutDownEnd, numMachines, childState, inputSourceP->numInputFiles); if (statFile) fclose(statFile); free(childState); strfree(hostName);}voidNotifyMasterDone(const char * const masterHostName, int const masterPortNum, int const childNum, unsigned int const seconds, boolean * const moreWorkToDoP, int * const nextFrameStartP, int * const nextFrameEndP) {/*---------------------------------------------------------------------------- Tell the master, at 'masterHostName':'masterPortNum' that child number 'childNum' has finished its assignment, and the decoding took 'seconds' wall clock seconds. Get the next assignment, if any, from the master. If the master gives us a new assignment, return *moreWorkToDoP == TRUE and the frames the master wants us to do as *nextFrameStartP and nextFrameEndP. Otherwise (there is no more work for machine 'childNum' to do), return *moreWorkToDoP == FALSE.-----------------------------------------------------------------------------*/ int clientSocket; time_t tempTimeStart, tempTimeEnd; const char * error; machineDebug("CHILD: NOTIFYING MASTER Machine %d assignment complete", childNum); time(&tempTimeStart); ConnectToSocket(masterHostName, masterPortNum, &hostEntry, &clientSocket, &error); if (error) errorExit("CHILD: Can't connect to master to tell him we've finished " "our assignment. %s", error); WriteInt(clientSocket, childNum); WriteInt(clientSocket, seconds); ReadInt(clientSocket, nextFrameStartP); ReadInt(clientSocket, nextFrameEndP); *moreWorkToDoP = (*nextFrameStartP >= 0); if (*moreWorkToDoP) machineDebug("CHILD: Master says next assignment: start %d end %d", *nextFrameStartP, *nextFrameEndP); else machineDebug("CHILD: Master says no more work for us."); close(clientSocket); time(&tempTimeEnd); IOtime += (tempTimeEnd-tempTimeStart);}voidDecodeServer(int const numInputFiles, const char * const decodeFileName, const char * const masterHostName, int const masterPortNum) {/*---------------------------------------------------------------------------- Execute the decode server. The decode server handles transfer of decoded frames to/from processes. It is necessary only if referenceFrame == DECODED_FRAME. Communicate to the master at hostname 'masterHostName':'masterPortNum'.-----------------------------------------------------------------------------*/ int otherSock; int decodePortNum; int frameReady; boolean *ready; int *waitMachine; int *waitPort; int *waitList; int slaveNumber; int slavePort; int waitPtr; struct hostent *nullHost = NULL; int clientSocket; const char * error; /* should keep list of port numbers to notify when frames become ready */ ready = (boolean *) calloc(numInputFiles, sizeof(boolean)); waitMachine = (int *) calloc(numInputFiles, sizeof(int)); waitPort = (int *) malloc(numMachines*sizeof(int)); waitList = (int *) calloc(numMachines, sizeof(int)); CreateListeningSocket(&decodeServerSocket, &decodePortNum, &error); if (error) errorExit("Unable to create socket on which to listen. %s", error); machineDebug("DECODE SERVER LISTENING ON PORT %d", decodePortNum); TransmitPortNum(masterHostName, masterPortNum, decodePortNum); frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean)); memset((char *)frameDone, 0, numInputFiles*sizeof(boolean)); /* wait for ready signals and requests */ while ( TRUE ) { const char * error; AcceptConnection(decodeServerSocket, &otherSock, &error); if (error) errorExit("DECODE SERVER: " "Failed to accept next connection. %s", error); ReadInt(otherSock, &frameReady); if ( frameReady == -2 ) { ReadInt(otherSock, &frameReady); machineDebug("DECODE SERVER: REQUEST FOR FRAME %d", frameReady); /* now respond if it's ready yet */ WriteInt(otherSock, frameDone[frameReady]); if ( ! frameDone[frameReady] ) { /* read machine number, port number */ ReadInt(otherSock, &slaveNumber); ReadInt(otherSock, &slavePort); machineDebug("DECODE SERVER: WAITING: SLAVE %d, PORT %d", slaveNumber, slavePort); waitPort[slaveNumber] = slavePort; if ( waitMachine[frameReady] == 0 ) { waitMachine[frameReady] = slaveNumber+1; } else { /* someone already waiting for this frame */ /* follow list of waiters to the end */ waitPtr = waitMachine[frameReady]-1; while ( waitList[waitPtr] != 0 ) { waitPtr = waitList[waitPtr]-1; } waitList[waitPtr] = slaveNumber+1; waitList[slaveNumber] = 0; } } } else { frameDone[frameReady] = TRUE; machineDebug("DECODE SERVER: FRAME %d READY", frameReady); if ( waitMachine[frameReady] ) { /* need to notify one or more machines it's ready */ waitPtr = waitMachine[frameReady]-1; while ( waitPtr >= 0 ) { con
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -