⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 parallel.c

📁 linux下将各类格式图片转换工具
💻 C
📖 第 1 页 / 共 5 页
字号:
        unsigned int nFrames;        machineDebug("MASTER: Listening for a connection...");        AcceptConnection(masterSocket, &otherSock, &error);        if (error)            errorExit("MASTER SERVER: "                      "Failed to accept next connection.  %s", error);        ReadInt(otherSock, &childNum);        ReadInt(otherSock, &seconds);        csP = &childState[childNum];                csP->numSeconds += seconds;        csP->fps = (float)csP->numFrames / (float)csP->numSeconds;        if (seconds != 0)            framesPerSecond = (float)csP->lastNumFrames / (float)seconds;        else            framesPerSecond = (float)csP->lastNumFrames * 2.0;        machineDebug("MASTER: Child %s FINISHED ASSIGNMENT.  "                     "%f frames per second",                      machineName[childNum], framesPerSecond);        noteFrameDone(combineHostName, combinePortNum, csP->startFrame,                      csP->startFrame + csP->lastNumFrames - 1);        framesDone += csP->lastNumFrames;        allocateMoreFrames(schedulerP, childNum, childState,                           forceIalign, framePatternLen,                           goalTimeSpecified, goalTime,                           &nextFrame, &nFrames);        if (nFrames == 0) {            WriteInt(otherSock, -1);            WriteInt(otherSock, 0);            ++numFinished;            machineDebug("MASTER: NO MORE WORK FOR CHILD %s.  "                         "(%d of %d children now done)",                         machineName[childNum], numFinished, numMachines);        } else {            WriteInt(otherSock, nextFrame);            WriteInt(otherSock, nextFrame + nFrames - 1);            machineDebug("MASTER: Frames %d-%d assigned to child %s",                         nextFrame, nextFrame + nFrames - 1,                         machineName[childNum]);            csP->startFrame    = nextFrame;            csP->lastNumFrames = nFrames;            csP->numFrames    += csP->lastNumFrames;        }        close(otherSock);        machineDebug("MASTER: %d/%d DONE; %d ARE ASSIGNED",                     framesDone, schedulerP->numFramesInJob,                      schedulerP->nextFrame - framesDone);    }}static voidstopIoServers(const char * const hostName,              int          const ioPortNum[],              unsigned int const numIoServers) {    unsigned int childNum;    IOhostName = hostName;    for (childNum = 0; childNum < numIoServers; ++childNum) {        ioPortNumber = ioPortNum[childNum];        EndIOServer();    }}static voidwaitForCombineServerToTerminate(int const masterSocket) {    int otherSock;    const char * error;    machineDebug("MASTER SERVER: Waiting for combine server to terminate");    AcceptConnection(masterSocket, &otherSock, &error);    if (error)        errorExit("MASTER SERVER: "                  "Failed to accept connection expected from a "                  "terminating combine server.  %s", error);    {        int dummy;        ReadInt(otherSock, &dummy);    }    close(otherSock);}    static voidprintFinalStats(FILE *            const statfileP,                time_t            const startUpBegin,                time_t            const startUpEnd,                time_t            const shutDownBegin,                time_t            const shutDownEnd,                unsigned int      const numChildren,                struct childState const childState[],                unsigned int      const numFrames) {    unsigned int pass;    FILE * fileP;    for (pass = 0; pass < 2; ++pass) {        if (pass == 0)            fileP = stdout;        else            fileP = statfileP;        if (fileP) {            unsigned int childNum;            float totalFPS;            fprintf(fileP, "\n\n");            fprintf(fileP, "PARALLEL SUMMARY\n");            fprintf(fileP, "----------------\n");            fprintf(fileP, "\n");            fprintf(fileP, "START UP TIME:  %u seconds\n",                    (unsigned int)(startUpEnd - startUpBegin));            fprintf(fileP, "SHUT DOWN TIME:  %u seconds\n",                    (unsigned int)(shutDownEnd - shutDownBegin));                        fprintf(fileP,                     "%14.14s %8.8s %8.8s %12.12s %9.9s\n",                    "MACHINE", "Frames", "Seconds", "Frames/Sec",                    "Self Time");            fprintf(fileP,                     "%14.14s %8.8s %8.8s %12.12s %9.9s\n",                    "--------------", "--------", "--------", "------------",                    "---------");            totalFPS = 0.0;            for (childNum = 0; childNum < numChildren; ++childNum) {                float const localFPS =                     (float)childState[childNum].numFrames /                    childState[childNum].numSeconds;                fprintf(fileP, "%14.14s %8u %8u %12.4f %8u\n",                        machineName[childNum],                         childState[childNum].numFrames,                         childState[childNum].numSeconds,                        localFPS,                         (unsigned int)((float)numFrames/localFPS));                totalFPS += localFPS;            }            fprintf(fileP,                     "%14.14s %8.8s %8.8s %12.12s %9.9s\n",                    "--------------", "--------", "--------", "------------",                    "---------");            fprintf(fileP, "%14s %8.8s %8u %12.4f\n",                     "OPTIMAL", "",                     (unsigned int)((float)numFrames/totalFPS),                    totalFPS);                        {                unsigned int const diffTime = shutDownEnd - startUpBegin;                                fprintf(fileP, "%14s %8.8s %8u %12.4f\n",                         "ACTUAL", "", diffTime,                         (float)numFrames / diffTime);            }            fprintf(fileP, "\n\n");        }    }}voidMasterServer(struct inputSource * const inputSourceP,             const char *         const paramFileName,              const char *         const outputFileName) {/*----------------------------------------------------------------------------   Execute the master server function.   Start all the other servers.-----------------------------------------------------------------------------*/    const char *hostName;    int       portNum;    int       masterSocket;        /* The file descriptor for the socket on which the master listens */    int ioPortNum[MAX_IO_SERVERS];    int       combinePortNum, decodePortNum;    struct childState * childState;  /* malloc'ed */    unsigned int numIoServers;    time_t  startUpBegin, startUpEnd;    time_t  shutDownBegin, shutDownEnd;    const char * error;    struct scheduler scheduler;    time(&startUpBegin);    scheduler.nextFrame = 0;    scheduler.numFramesInJob = inputSourceP->numInputFiles;    scheduler.numMachines = numMachines;    PrintStartStats(startUpBegin, FALSE, 0, 0, inputSourceP);    hostName = getHostName();    hostEntry = gethostbyname(hostName);    if (hostEntry == NULL)        errorExit("Could not find host name '%s' in database", hostName);    CreateListeningSocket(&masterSocket, &portNum, &error);    if (error)        errorExit("Unable to create socket on which to listen.  %s", error);    if (debugSockets)        fprintf(stdout, "---MASTER USING PORT %d\n", portNum);    startCombineServer(encoder_name, numMachines, hostName, portNum,                       inputSourceP->numInputFiles,                        paramFileName, masterSocket,                        &combinePortNum);    if (referenceFrame == DECODED_FRAME)        startDecodeServer(encoder_name, numMachines, hostName, portNum,                          inputSourceP->numInputFiles,                           paramFileName, masterSocket,                          &decodePortNum);    startChildren(&scheduler, encoder_name, hostName, portNum,                  paramFileName, parallelPerfect, forceIalign,                  framePatternLen, parallelTestFrames,                   niceProcesses,                  masterSocket, combinePortNum, decodePortNum,                   ioPortNum, &numIoServers,                  &childState);    time(&startUpEnd);    feedTheChildren(&scheduler, childState,                    masterSocket, hostName, combinePortNum,                    forceIalign, framePatternLen,                    parallelTimeChunks != -1, parallelTimeChunks);    assert(scheduler.nextFrame == scheduler.numFramesInJob);    time(&shutDownBegin);    stopIoServers(hostName, ioPortNum, numIoServers);    waitForCombineServerToTerminate(masterSocket);    close(masterSocket);    time(&shutDownEnd);    printFinalStats(statFile, startUpBegin, startUpEnd,                    shutDownBegin, shutDownEnd, numMachines,                    childState, inputSourceP->numInputFiles);    if (statFile)        fclose(statFile);    free(childState);    strfree(hostName);}voidNotifyMasterDone(const char * const masterHostName,                  int          const masterPortNum,                  int          const childNum,                 unsigned int const seconds,                  boolean *    const moreWorkToDoP,                 int *        const nextFrameStartP,                 int *        const nextFrameEndP) {/*----------------------------------------------------------------------------   Tell the master, at 'masterHostName':'masterPortNum' that child   number 'childNum' has finished its assignment, and the decoding   took 'seconds' wall clock seconds.  Get the next assignment, if   any, from the master.   If the master gives us a new assignment, return *moreWorkToDoP ==   TRUE and the frames the master wants us to do as *nextFrameStartP   and nextFrameEndP.  Otherwise (there is no more work for machine   'childNum' to do), return *moreWorkToDoP == FALSE.-----------------------------------------------------------------------------*/    int    clientSocket;    time_t tempTimeStart, tempTimeEnd;    const char * error;    machineDebug("CHILD: NOTIFYING MASTER Machine %d assignment complete",                  childNum);    time(&tempTimeStart);        ConnectToSocket(masterHostName, masterPortNum, &hostEntry,                    &clientSocket, &error);    if (error)        errorExit("CHILD: Can't connect to master to tell him we've finished "                  "our assignment.  %s", error);    WriteInt(clientSocket, childNum);    WriteInt(clientSocket, seconds);    ReadInt(clientSocket, nextFrameStartP);    ReadInt(clientSocket, nextFrameEndP);    *moreWorkToDoP = (*nextFrameStartP >= 0);    if (*moreWorkToDoP)        machineDebug("CHILD: Master says next assignment: start %d end %d",                     *nextFrameStartP, *nextFrameEndP);    else        machineDebug("CHILD: Master says no more work for us.");    close(clientSocket);    time(&tempTimeEnd);    IOtime += (tempTimeEnd-tempTimeStart);}voidDecodeServer(int          const numInputFiles,              const char * const decodeFileName,              const char * const masterHostName,              int          const masterPortNum) {/*----------------------------------------------------------------------------   Execute the decode server.   The decode server handles transfer of decoded frames to/from processes.   It is necessary only if referenceFrame == DECODED_FRAME.   Communicate to the master at hostname 'masterHostName':'masterPortNum'.-----------------------------------------------------------------------------*/    int     otherSock;    int     decodePortNum;    int     frameReady;    boolean *ready;    int     *waitMachine;    int     *waitPort;    int     *waitList;    int     slaveNumber;    int     slavePort;    int     waitPtr;    struct hostent *nullHost = NULL;    int     clientSocket;    const char * error;    /* should keep list of port numbers to notify when frames become ready */    ready = (boolean *) calloc(numInputFiles, sizeof(boolean));    waitMachine = (int *) calloc(numInputFiles, sizeof(int));    waitPort = (int *) malloc(numMachines*sizeof(int));    waitList = (int *) calloc(numMachines, sizeof(int));    CreateListeningSocket(&decodeServerSocket, &decodePortNum, &error);    if (error)        errorExit("Unable to create socket on which to listen.  %s", error);    machineDebug("DECODE SERVER LISTENING ON PORT %d", decodePortNum);    TransmitPortNum(masterHostName, masterPortNum, decodePortNum);    frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));    memset((char *)frameDone, 0, numInputFiles*sizeof(boolean));    /* wait for ready signals and requests */    while ( TRUE ) {        const char * error;        AcceptConnection(decodeServerSocket, &otherSock, &error);        if (error)            errorExit("DECODE SERVER: "                      "Failed to accept next connection.  %s", error);        ReadInt(otherSock, &frameReady);        if ( frameReady == -2 ) {            ReadInt(otherSock, &frameReady);            machineDebug("DECODE SERVER:  REQUEST FOR FRAME %d", frameReady);            /* now respond if it's ready yet */            WriteInt(otherSock, frameDone[frameReady]);            if ( ! frameDone[frameReady] ) {                /* read machine number, port number */                ReadInt(otherSock, &slaveNumber);                ReadInt(otherSock, &slavePort);                machineDebug("DECODE SERVER: WAITING:  SLAVE %d, PORT %d",                             slaveNumber, slavePort);                waitPort[slaveNumber] = slavePort;                if ( waitMachine[frameReady] == 0 ) {                    waitMachine[frameReady] = slaveNumber+1;                } else {                    /* someone already waiting for this frame */                    /* follow list of waiters to the end */                    waitPtr = waitMachine[frameReady]-1;                    while ( waitList[waitPtr] != 0 ) {                        waitPtr = waitList[waitPtr]-1;                    }                    waitList[waitPtr] = slaveNumber+1;                    waitList[slaveNumber] = 0;                }            }        } else {            frameDone[frameReady] = TRUE;                        machineDebug("DECODE SERVER:  FRAME %d READY", frameReady);            if ( waitMachine[frameReady] ) {                /* need to notify one or more machines it's ready */                waitPtr = waitMachine[frameReady]-1;                while ( waitPtr >= 0 ) {                    con

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -