⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 parallel.c

📁 linux下将各类格式图片转换工具
💻 C
📖 第 1 页 / 共 5 页
字号:
                      bool *               const doneP,                      unsigned char **     const bigBufferP,                      unsigned int *       const bigBufferSizeP) {    int       otherSock;    int command;    const char * error;        AcceptConnection(serverSocket, &otherSock, &error);    if (error)        errorExit("I/O SERVER: Failed to accept next connection.  %s", error);    ReadInt(otherSock, &command);    switch (command) {    case -1:        *doneP = TRUE;        break;    case -2:        decodedFrameToDisk(otherSock);        break;    case -3:        decodedFrameFromDisk(otherSock);        break;    case -4:        routeFromSocketToDisk(otherSock, bigBufferP, bigBufferSizeP);        break;    default:        readFrameWriteToSocket(inputSourceP, otherSock, command);    }        close(otherSock);} voidIoServer(struct inputSource * const inputSourceP,         const char *         const parallelHostName,          int                  const portNum) {/*----------------------------------------------------------------------------   Execute an I/O server.   An I/O server is the partner on the master machine of a child process   on a "remote" system.  "Remote" here doesn't just mean on another system.   It means on a system that isn't even in the same cluster -- specifically,   a system that doesn't have access to the same filesystem as the master.   The child process passes frame contents between it and the master via   the I/O server.-----------------------------------------------------------------------------*/    int       ioPortNum;    int       serverSocket;    boolean   done;    unsigned char   *bigBuffer;        /* A work buffer that we keep around permanently.  We increase           its size as needed, but never shrink it.        */    unsigned int bigBufferSize;        /* The current allocated size of bigBuffer[] */    const char * error;    bigBufferSize = 0;  /* Start with no buffer */    bigBuffer = NULL;        /* once we get IO port num, should transmit it to parallel server */    CreateListeningSocket(&serverSocket, &ioPortNum, &error);    if (error)        errorExit("Unable to create socket on which to listen for "                  "reports from children.  %s", error);    if (debugSockets)        fprintf(stdout, "====I/O USING PORT %d\n", ioPortNum);    TransmitPortNum(parallelHostName, portNum, ioPortNum);    if (separateConversion)        SetFileType(ioConversion);  /* for reading */    else        SetFileType(inputConversion);    done = FALSE;  /* initial value */    while (!done)        processNextConnection(serverSocket, inputSourceP,                              &done, &bigBuffer, &bigBufferSize);    close(serverSocket);    if ( debugSockets ) {        fprintf(stdout, "====I/O SERVER:  Shutting Down\n");    }}/*===========================================================================* * * SendRemoteFrame * *  called by a slave to the I/O server; sends an encoded frame *  to the server to be sent to disk * * RETURNS: nothing * * SIDE EFFECTS:    none * *===========================================================================*/voidSendRemoteFrame(int const frameNumber, BitBucket * const bb) {    int const negativeFour = -4;    int clientSocket;    time_t  tempTimeStart, tempTimeEnd;    const char * error;    time(&tempTimeStart);    ConnectToSocket(IOhostName, ioPortNumber, &hostEntry,                    &clientSocket, &error);    if (error)        errorExit("CHILD: Can't connect to I/O server to deliver results.  %s",                  error);    WriteInt(clientSocket, negativeFour);    WriteInt(clientSocket, frameNumber);        if (frameNumber != -1) {        /* send number of bytes */                WriteInt(clientSocket, (bb->totalbits+7)>>3);            /* now send the bytes themselves */        Bitio_WriteToSocket(bb, clientSocket);    }    close(clientSocket);    time(&tempTimeEnd);    IOtime += (tempTimeEnd-tempTimeStart);}voidGetRemoteFrame(MpegFrame * const frameP,                int         const frameNumber) {/*----------------------------------------------------------------------------   Get a frame from the I/O server.      This is intended for use by a child.-----------------------------------------------------------------------------*/    int           clientSocket;    const char * error;    Fsize_Note(frameNumber, yuvWidth, yuvHeight);    if (debugSockets) {        fprintf(stdout, "MACHINE %s REQUESTING connection for FRAME %d\n",                getenv("HOST"), frameNumber);        fflush(stdout);    }    ConnectToSocket(IOhostName, ioPortNumber, &hostEntry,                    &clientSocket, &error);    if (error)        errorExit("CHILD: Can't connect to I/O server to get a frame.  %s",                   error);    WriteInt(clientSocket, frameNumber);    if (frameNumber != -1) {        if (separateConversion) {            unsigned char buffer[1024];            /* This is by design the exact size of the data per message (except               the last message for a frame) the I/O server sends.            */            int numBytes;  /* Number of data bytes in message */            FILE * filePtr = pm_tmpfile();            /* read in stuff, write to file, perform local conversion */            do {                ReadInt(clientSocket, &numBytes);                if (numBytes > sizeof(buffer))                    errorExit("Invalid message received: numBytes = %d, "                              "which is greater than %d\n",                               numBytes, sizeof(numBytes));                ReadBytes(clientSocket, buffer, numBytes);                fwrite(buffer, 1, numBytes, filePtr);            } while ( numBytes == sizeof(buffer) );            fflush(filePtr);            rewind(filePtr);            ReadFrameFile(frameP, filePtr, slaveConversion);            fclose(filePtr);        } else {            Frame_AllocYCC(frameP);            if (debugSockets) {                fprintf(stdout, "MACHINE %s allocated YCC FRAME %d\n",                        getenv("HOST"), frameNumber);                fflush(stdout);            }            readYUVOrig(clientSocket, yuvWidth, yuvHeight, frameP);        }    }    WriteInt(clientSocket, 0);    close(clientSocket);    if ( debugSockets ) {        fprintf(stdout, "MACHINE %s READ COMPLETELY FRAME %d\n",                getenv("HOST"), frameNumber);        fflush(stdout);    }}struct combineControl {    unsigned int numFrames;};static voidgetAndProcessACombineConnection(int const outputServerSocket) {    int          otherSock;    int          command;    const char * error;    AcceptConnection(outputServerSocket, &otherSock, &error);    if (error)        errorExit("COMBINE SERVER: "                  "Failed to accept next connection.  %s", error);        ReadInt(otherSock, &command);        if (command == -2) {        /* this is notification from non-remote process that a           frame is done.            */        int frameStart, frameEnd;        ReadInt(otherSock, &frameStart);        ReadInt(otherSock, &frameEnd);                    machineDebug("COMBINE_SERVER: Frames %d - %d done",                     frameStart, frameEnd);        {            unsigned int i;            for (i = frameStart; i <= frameEnd; ++i)                frameDone[i] = TRUE;        }    } else        errorExit("COMBINE SERVER: Unrecognized command %d received.",                  command);    close(otherSock);}#define READ_ATTEMPTS 5 /* number of times (seconds) to retry an input file */static voidopenInputFile(const char * const fileName,              FILE **      const inputFilePP) {    FILE * inputFileP;    unsigned int attempts;        inputFileP = NULL;    attempts = 0;    while (!inputFileP && attempts < READ_ATTEMPTS) {        inputFileP = fopen(fileName, "rb");        if (inputFileP == NULL) {            pm_message("ERROR  Couldn't read frame file '%s' errno = %d (%s)"                       "attempt %d",                        fileName, errno, strerror(errno), attempts);            sleep(1);        }        ++attempts;    }    if (inputFileP == NULL)        pm_error("Unable to open file '%s' after %d attempts.",                  fileName, attempts);    *inputFilePP = inputFileP;}static voidwaitForOutputFile(void *        const inputHandle,                  unsigned int  const frameNumber,                  FILE **       const ifPP) {/*----------------------------------------------------------------------------   Keep handling output events until we get the specified frame number.   Open the file it's in and return the stream handle.-----------------------------------------------------------------------------*/    struct combineControl * const combineControlP = (struct combineControl *)        inputHandle;    if (frameNumber >= combineControlP->numFrames)        *ifPP = NULL;    else {        const char * fileName;        while (!frameDone[frameNumber]) {            machineDebug("COMBINE_SERVER: Waiting for frame %u done",                          frameNumber);            getAndProcessACombineConnection(outputServerSocket);        }        machineDebug("COMBINE SERVER: Wait for frame %u over", frameNumber);        asprintfN(&fileName, "%s.frame.%u", outputFileName, frameNumber);        openInputFile(fileName, ifPP);        strfree(fileName);    }}static voidunlinkFile(void *       const inputHandle,           unsigned int const frameNumber) {    if (!keepTempFiles) {        const char * fileName;        asprintfN(&fileName, "%s.frame.%u", outputFileName, frameNumber);        unlink(fileName);        strfree(fileName);    }}voidCombineServer(int          const numFrames,               const char * const masterHostName,               int          const masterPortNum) {/*----------------------------------------------------------------------------   Execute a combine server.   This handles combination of frames.-----------------------------------------------------------------------------*/  int    combinePortNum;  FILE * ofp;  const char * error;  struct combineControl combineControl;    /* once we get Combine port num, should transmit it to parallel server */    CreateListeningSocket(&outputServerSocket, &combinePortNum, &error);  if (error)      errorExit("Unable to create socket on which to listen.  %s", error);  machineDebug("COMBINE SERVER: LISTENING ON PORT %d", combinePortNum);    TransmitPortNum(masterHostName, masterPortNum, combinePortNum);    MALLOCARRAY_NOFAIL(frameDone, numFrames);  {      unsigned int i;      for (i = 0; i < numFrames; ++i)          frameDone[i] = FALSE;  }  ofp = pm_openw(outputFileName);    combineControl.numFrames = numFrames;  FramesToMPEG(ofp, &combineControl, &waitForOutputFile, &unlinkFile);  machineDebug("COMBINE SERVER: Shutting down");    /* tell Master server we are done */  TransmitPortNum(masterHostName, masterPortNum, combinePortNum);    close(outputServerSocket);  fclose(ofp);}/*=====================* * MASTER SERVER STUFF * *=====================*/static voidstartCombineServer(const char * const encoderName,                   unsigned int const numMachines,                   const char * const masterHostName,                   int          const masterPortNum,                   unsigned int const numInputFiles,                   const char * const paramFileName,                   int          const masterSocket,                   int *        const combinePortNumP) {    char         command[1024];    int          otherSock;    const char * error;    snprintf(command, sizeof(command),              "%s %s -max_machines %d -output_server %s %d %d %s",             encoderName,              debugMachines ? "-debug_machines" : "",             numMachines, masterHostName, masterPortNum,              numInputFiles, paramFileName);    machineDebug("MASTER: Starting combine server with shell command '%s'",                 command);    safe_fork(command);        machineDebug("MASTER: Listening for connection back from "                 "new Combine server");    AcceptConnection(masterSocket, &otherSock, &error);    if (error)        errorExit("MASTER SERVER: "

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -