⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 parallel.c

📁 linux下将各类格式图片转换工具
💻 C
📖 第 1 页 / 共 5 页
字号:
                  "Failed to accept next connection.  %s", error);    ReadInt(otherSock, combinePortNumP);    close(otherSock);    machineDebug("MASTER:  Combine port number = %d", *combinePortNumP);}static voidstartDecodeServer(const char * const encoderName,                  unsigned int const numMachines,                  const char * const masterHostName,                  int          const masterPortNum,                  unsigned int const numInputFiles,                  const char * const paramFileName,                  int          const masterSocket,                  int *        const decodePortNumP) {    char         command[1024];    int          otherSock;    const char * error;    snprintf(command, sizeof(command),              "%s %s -max_machines %d -decode_server %s %d %d %s",             encoder_name,              debugMachines ? "-debug_machines" : "",             numMachines, masterHostName, masterPortNum,             numInputFiles, paramFileName);    machineDebug("MASTER: Starting decode server with shell command '%s'",                 command);    safe_fork(command);    machineDebug("MASTER: Listening for connection back from "                 "new Decode server");    AcceptConnection(masterSocket, &otherSock, &error);    if (error)        errorExit("MASTER SERVER: "                  "Failed to accept connection back from the new "                  "decode server.  %s", error);    ReadInt(otherSock, decodePortNumP);        close(otherSock);        machineDebug("MASTER:  Decode port number = %d", *decodePortNumP);}static voidstartIoServer(const char *   const encoderName,              unsigned int   const numChildren,              const char *   const masterHostName,              int            const masterPortNum,              int            const masterSocket,              const char *   const paramFileName,              int *          const ioPortNumP) {                  char         command[1024];    int          otherSock;    const char * error;        sprintf(command, "%s -max_machines %d -io_server %s %d %s",            encoderName, numChildren, masterHostName, masterPortNum,            paramFileName);    machineDebug("MASTER: Starting I/O server with remote shell command '%s'",                 command);    safe_fork(command);        machineDebug("MASTER: Listening for connection back from "                 "new I/O server");    AcceptConnection(masterSocket, &otherSock, &error);    if (error)        errorExit("MASTER SERVER: "                  "Failed to accept connection back from the new "                  "I/O server.  %s", error);        ReadInt(otherSock, ioPortNumP);    close(otherSock);        machineDebug("MASTER:  I/O port number = %d", *ioPortNumP);}            static voidextendToEndOfPattern(unsigned int * const nFramesP,                     unsigned int   const startFrame,                     unsigned int   const framePatternLen,                     unsigned int   const numFramesInStream) {    assert(framePatternLen >= 1);            while (startFrame + *nFramesP < numFramesInStream &&           (startFrame + *nFramesP) % framePatternLen != 0)        ++(*nFramesP);}static voidallocateInitialFrames(struct scheduler * const schedulerP,                      boolean            const parallelPerfect,                      boolean            const forceIalign,                      unsigned int       const framePatternLen,                      unsigned int       const parallelTestFrames,                      unsigned int       const childNum,                      unsigned int *     const startFrameP,                      unsigned int *     const nFramesP) {/*----------------------------------------------------------------------------   Choose which frames, to hand out to the new child numbered 'childNum'.-----------------------------------------------------------------------------*/    unsigned int const framesPerChild =         MAX(1, ((schedulerP->numFramesInJob - schedulerP->nextFrame) /                (schedulerP->numMachines - childNum)));    unsigned int nFrames;    if (parallelPerfect)        nFrames = framesPerChild;    else {        assert(parallelTestFrames >= 1);        nFrames = MIN(parallelTestFrames, framesPerChild);    }    if (forceIalign)        extendToEndOfPattern(&nFrames, schedulerP->nextFrame,                             framePatternLen, schedulerP->numFramesInJob);    nFrames = MIN(nFrames, schedulerP->numFramesInJob - schedulerP->nextFrame);    *startFrameP = schedulerP->nextFrame;    *nFramesP = nFrames;    schedulerP->nextFrame += nFrames;}static floattaperedGoalTime(struct childState const childState[],                unsigned int      const remainingFrameCount) {            float        goalTime;    float        allChildrenFPS;    float        remainingJobTime;        /* How long we expect it to be before the whole movie is encoded*/    float        sum;    int          numMachinesToEstimate;    unsigned int childNum;        /* frames left = lastFrameInStream - startFrame + 1 */    for (childNum = 0, sum = 0.0, numMachinesToEstimate = 0;          childNum < numMachines; ++childNum) {        if (!childState[childNum].finished) {            if (childState[childNum].fps < 0.0 )                ++numMachinesToEstimate;            else                sum += childState[childNum].fps;        }    }        allChildrenFPS = (float)numMachines *        (sum/(float)(numMachines-numMachinesToEstimate));        remainingJobTime = (float)remainingFrameCount/allChildrenFPS;        goalTime = MAX(5.0, remainingJobTime/2);    return goalTime;}static voidallocateMoreFrames(struct scheduler * const schedulerP,                   unsigned int       const childNum,                   struct childState  const childState[],                   bool               const forceIalign,                   unsigned int       const framePatternLen,                   bool               const goalTimeSpecified,                   unsigned int       const goalTimeArg,                   unsigned int *     const startFrameP,                   unsigned int *     const nFramesP) {/*----------------------------------------------------------------------------   Decide which frames should be child 'childNum''s next assignment,   given the state/history of all children is childState[].   The lowest numbered frame which needs yet to be encoded is frame   number 'startFrame' and 'lastFrameInStream' is the highest.   The allocation always starts at the lowest numbered frame that   hasn't yet been allocated and is sequential.  We return as   *startFrameP the frame number of the first frame in the allocation   and as *nFramesP the number of frames.   If 'goalTimeSpecified' is true, we try to make the assignment take   'goalTimeArg' seconds.  If 'goalTimeSpecified' is not true, we choose   a goal time ourselves, which is based on how long we think it will   take for all the children to finish all the remaining frames.-----------------------------------------------------------------------------*/    float goalTime;        /* Number of seconds we want the assignment to take.  We size the           assignment to try to meet this goal.        */    unsigned int nFrames;    float avgFps;    if (!goalTimeSpecified) {        goalTime = taperedGoalTime(childState,                                   schedulerP->numFramesInJob -                                    schedulerP->nextFrame);            pm_message("MASTER: ASSIGNING %s %.2f seconds of work",                   machineName[childNum], goalTime);    } else        goalTime = goalTimeArg;        if (childState[childNum].numSeconds != 0)        avgFps = (float)childState[childNum].numFrames /             childState[childNum].numSeconds;    else        avgFps = 0.1;       /* arbitrary small value */    nFrames = MAX(1u, (unsigned int)(goalTime * avgFps + 0.5));        nFrames = MIN(nFrames,                   schedulerP->numFramesInJob - schedulerP->nextFrame);    if (forceIalign)        extendToEndOfPattern(&nFrames, schedulerP->nextFrame,                             framePatternLen, schedulerP->numFramesInJob);    *startFrameP = schedulerP->nextFrame;    *nFramesP = nFrames;    schedulerP->nextFrame += nFrames;}static voidstartChildren(struct scheduler *   const schedulerP,              const char *         const encoderName,              const char *         const masterHostName,              int                  const masterPortNum,              const char *         const paramFileName,              boolean              const parallelPerfect,              boolean              const forceIalign,              unsigned int         const framePatternLen,              unsigned int         const parallelTestFrames,              boolean              const beNice,              int                  const masterSocket,              int                  const combinePortNum,              int                  const decodePortNum,              int *                const ioPortNum,              unsigned int *       const numIoServersP,              struct childState ** const childStateP) {/*----------------------------------------------------------------------------   Start up the children.  Tell them to work for the master at   'masterHostName':'masterPortNum'.   Start I/O servers (as processes on this system) as required and return   the port numbers of the TCP ports on which they listen as   ioPortNum[] and the number of them as *numIoServersP.   Give each of the children some initial work to do.  This may be just   a small amount for timing purposes.   We access and manipulate the various global variables that represent   the state of the children, and the scheduler structure.-----------------------------------------------------------------------------*/    struct childState * childState;  /* malloc'ed */    unsigned int childNum;    unsigned int numIoServers;    unsigned int childrenLeftCurrentIoServer;        /* The number of additional children we can hook up to the           current I/O server before reaching our maximum children per           I/O server.  0 if there is no current I/O server.        */    MALLOCARRAY_NOFAIL(childState, schedulerP->numMachines);    childrenLeftCurrentIoServer = 0;  /* No current I/O server yet */        numIoServers = 0;  /* None created yet */    for (childNum = 0; childNum < schedulerP->numMachines; ++childNum) {        char command[1024];        unsigned int startFrame;        unsigned int nFrames;        childState[childNum].fps        = -1.0;  /* illegal value as flag */        childState[childNum].numSeconds = 0;        allocateInitialFrames(schedulerP, parallelPerfect, forceIalign,                              framePatternLen, parallelTestFrames,                              childNum, &startFrame, &nFrames);        if (nFrames == 0) {            childState[childNum].finished = TRUE;            machineDebug("MASTER: No more frames; not starting child '%s'",                         machineName[childNum]);        } else {            childState[childNum].finished   = FALSE;                    if (remote[childNum]) {                if (childrenLeftCurrentIoServer == 0) {                    startIoServer(encoderName, schedulerP->numMachines,                                   masterHostName, masterPortNum, masterSocket,                                  paramFileName, &ioPortNum[numIoServers++]);                                        childrenLeftCurrentIoServer = SOMAXCONN;                }                --childrenLeftCurrentIoServer;            }             snprintf(command, sizeof(command),                     "%s %s -l %s %s "                     "%s %s -child %s %d %d %d %d %d %d "                     "-frames %d %d %s",                     rsh,                     machineName[childNum], userName[childNum],                     beNice ? "nice" : "",                     executable[childNum],                     debugMachines ? "-debug_machines" : "",                     masterHostName, masterPortNum,                      remote[childNum] ? ioPortNum[numIoServers-1] : 0,                     combinePortNum, decodePortNum, childNum,                     remote[childNum] ? 1 : 0,                     startFrame, startFrame + nFrames - 1,                     remote[childNum] ?                          remoteParamFile[childNum] : paramFileName            );                    machineDebug("MASTER: Starting child server "                         "with shell command '%s'", command);            safe_fork(command);            machineDebug("MASTER: Frames %d-%d assigned to new child %s",                         startFrame, startFrame + nFrames - 1,                          machineName[childNum]);        }        childState[childNum].startFrame = startFrame;        childState[childNum].lastNumFrames = nFrames;        childState[childNum].numFrames = childState[childNum].lastNumFrames;    }    *childStateP   = childState;    *numIoServersP = numIoServers;}static voidnoteFrameDone(const char * const combineHostName,              int          const combinePortNum,              unsigned int const frameStart,               unsigned int const frameEnd) {/*----------------------------------------------------------------------------   Tell the Combine server that frames 'frameStart' through 'frameEnd'   are done.-----------------------------------------------------------------------------*/    int const negativeTwo = -2;    int clientSocket;    time_t  tempTimeStart, tempTimeEnd;    const char * error;    struct hostent * hostEntP;    time(&tempTimeStart);    hostEntP = NULL;    ConnectToSocket(combineHostName, combinePortNum, &hostEntP,                    &clientSocket, &error);        if (error)        errorExit("MASTER: Can't connect to Combine server to tell it frames "                  "are done.  %s", error);    WriteInt(clientSocket, negativeTwo);    WriteInt(clientSocket, frameStart);    WriteInt(clientSocket, frameEnd);    close(clientSocket);    time(&tempTimeEnd);    IOtime += (tempTimeEnd-tempTimeStart);}static voidfeedTheChildren(struct scheduler * const schedulerP,                struct childState        childState[],                int                const masterSocket,                const char *       const combineHostName,                int                const combinePortNum,                bool               const forceIalign,                unsigned int       const framePatternLen,                bool               const goalTimeSpecified,                unsigned int       const goalTime) {/*----------------------------------------------------------------------------   Listen for children to tell us they have finished their assignments   and give them new assignments, until all the frames have been assigned   and all the children have finished.   As children finish assignments, inform the combine server at   'combineHostName':'combinePortNum' of such.   Note that the children got initial assigments when they were created.   So the first thing we do is wait for them to finish those.-----------------------------------------------------------------------------*/    unsigned int numFinished;        /* Number of child machines that have been excused because there           is no more work for them.        */    unsigned int framesDone;    numFinished = 0;    framesDone = 0;    while (numFinished != schedulerP->numMachines) {        int                 otherSock;        int                 childNum;        int                 seconds;        float               framesPerSecond;        struct childState * csP;        const char *        error;        unsigned int nextFrame;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -