⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 parallel.c

📁 linux下将各类格式图片转换工具
💻 C
📖 第 1 页 / 共 5 页
字号:
/*===========================================================================* * parallel.c               *                          *  Procedures to make encoder run in parallel    *                              *===========================================================================*//* COPYRIGHT INFORMATION IS AT THE END OF THIS FILE *//*==============* * HEADER FILES * *==============*/#define _XOPEN_SOURCE 500 /* Make sure stdio.h contains pclose() *//* _ALL_SOURCE is needed on AIX to make the C library include the    socket services (e.g. define struct sockaddr)    Note that AIX standards.h actually sets feature declaration macros such   as _XOPEN_SOURCE, unless they are already set.*/#define _ALL_SOURCE/* On AIX, pm_config.h includes standards.h, which expects to be included   after feature declaration macros such as _XOPEN_SOURCE.  So we include   pm_config.h as late as possible.*/#include <stdarg.h>#include <time.h>#include <unistd.h>#include <stdio.h>#include <errno.h>#include <string.h>#include <signal.h>#include <netdb.h>#include <assert.h>#include <sys/types.h>#include <sys/times.h>#include <sys/utsname.h>#include "mallocvar.h"#include "nstring.h"#include "all.h"#include "param.h"#include "mpeg.h"#include "prototypes.h"#include "readframe.h"#include "fsize.h"#include "combine.h"#include "frames.h"#include "input.h"#include "psocket.h"#include "frametype.h"#include "parallel.h"struct childState {    boolean      finished;    unsigned int startFrame;    unsigned int numFrames;    unsigned int lastNumFrames;    unsigned int numSeconds;    float        fps;};struct scheduler {    /* This tracks the state of the subsystem that determines the assignments       for the children    */    unsigned int nextFrame;        /* The next frame that needs to be assigned to a child */    unsigned int numFramesInJob;        /* Total number of frames in the whole run of Ppmtompeg */    unsigned int numMachines;};#define MAX_IO_SERVERS  10#ifndef SOMAXCONN#define SOMAXCONN 5#endif/*==================* * CONSTANTS        * *==================*/#define TERMINATE_PID_SIGNAL    SIGTERM  /* signal used to terminate forked childs */#ifndef MAXARGS#define MAXARGS     1024   /* Max Number of arguments in safe_fork command */#endif/*==================* * STATIC VARIABLES * *==================*/static char rsh[256];static struct hostent *hostEntry = NULL;static boolean  *frameDone;static int  outputServerSocket;static int  decodeServerSocket;static boolean  parallelPerfect = FALSE;static  int current_max_forked_pid=0;/*==================* * GLOBAL VARIABLES * *==================*/extern int yuvHeight, yuvWidth;extern char statFileName[256];extern FILE *statFile;extern boolean debugMachines;extern boolean debugSockets;int parallelTestFrames = 10;int parallelTimeChunks = 60;const char *IOhostName;int ioPortNumber;int decodePortNumber;boolean niceProcesses = FALSE;boolean forceIalign = FALSE;int     machineNumber = -1;boolean remoteIO = FALSE;boolean separateConversion;time_t  IOtime = 0;extern char encoder_name[];int     ClientPid[MAX_MACHINES+4];/*=====================* * INTERNAL PROCEDURES * *=====================*/static const char *getHostName(void) {/*----------------------------------------------------------------------------   Return the host name of this system.-----------------------------------------------------------------------------*/    struct utsname utsname;    int rc;    rc = uname(&utsname);    if (rc < 0)        pm_error("Unable to find out host name.  "                 "uname() failed with errno %d (%s)", errno, strerror(errno));    return strdup(utsname.nodename);}static void GNU_PRINTF_ATTRmachineDebug(const char format[], ...) {    va_list args;    va_start(args, format);    if (debugMachines) {        const char * const hostname = getHostName();        fprintf(stderr, "%s: ---", hostname);        strfree(hostname);        vfprintf(stderr, format, args);        fputc('\n', stderr);    }    va_end(args);}static void GNU_PRINTF_ATTRerrorExit(const char format[], ...) {    const char * const hostname = getHostName();    va_list args;    va_start(args, format);    fprintf(stderr, "%s: FATAL ERROR.  ", hostname);    strfree(hostname);    vfprintf(stderr, format, args);    fputc('\n', stderr);    exit(1);    va_end(args);}static voidTransmitPortNum(const char * const hostName,                 int          const portNum,                 int          const newPortNum) {/*----------------------------------------------------------------------------   Transmit the port number 'newPortNum' to the master on port 'portNum'   of host 'hostName'.-----------------------------------------------------------------------------*/    int clientSocket;    const char * error;        ConnectToSocket(hostName, portNum, &hostEntry, &clientSocket, &error);        if (error)        errorExit("Can't connect in order to transmit port number.  %s",                  error);    WriteInt(clientSocket, newPortNum);        close(clientSocket);}static voidreadYUVDecoded(int          const socketFd,               unsigned int const Fsize_x,               unsigned int const Fsize_y,               MpegFrame *  const frameP) {        unsigned int y;        for (y = 0; y < Fsize_y; ++y)         /* Y */        ReadBytes(socketFd,                   (unsigned char *)frameP->decoded_y[y], Fsize_x);        for (y = 0; y < (Fsize_y >> 1); ++y)  /* U */        ReadBytes(socketFd,                   (unsigned char *)frameP->decoded_cb[y], (Fsize_x >> 1));        for (y = 0; y < (Fsize_y >> 1); ++y)  /* V */        ReadBytes(socketFd,                   (unsigned char *)frameP->decoded_cr[y], (Fsize_x >> 1));}static voidwriteYUVDecoded(int          const socketFd,                unsigned int const Fsize_x,                unsigned int const Fsize_y,                MpegFrame *  const frameP) {        unsigned int y;        for (y = 0; y < Fsize_y; ++y)         /* Y */        WriteBytes(socketFd,                   (unsigned char *)frameP->decoded_y[y], Fsize_x);        for (y = 0; y < (Fsize_y >> 1); ++y)  /* U */        WriteBytes(socketFd,                    (unsigned char *)frameP->decoded_cb[y], (Fsize_x >> 1));        for (y = 0; y < (Fsize_y >> 1); ++y)  /* V */        WriteBytes(socketFd,                    (unsigned char *)frameP->decoded_cr[y], (Fsize_x >> 1));}static voidwriteYUVOrig(int          const socketFd,             unsigned int const Fsize_x,             unsigned int const Fsize_y,             MpegFrame *  const frameP) {        unsigned int y;        for (y = 0; y < Fsize_y; ++y)         /* Y */        WriteBytes(socketFd,                   (unsigned char *)frameP->orig_y[y], Fsize_x);        for (y = 0; y < (Fsize_y >> 1); ++y)  /* U */        WriteBytes(socketFd,                    (unsigned char *)frameP->orig_cb[y], (Fsize_x >> 1));        for (y = 0; y < (Fsize_y >> 1); ++y)  /* V */        WriteBytes(socketFd,                    (unsigned char *)frameP->orig_cr[y], (Fsize_x >> 1));}static voidreadYUVOrig(int          const socketFd,            unsigned int const Fsize_x,            unsigned int const Fsize_y,            MpegFrame *  const frameP) {        unsigned int y;        for (y = 0; y < Fsize_y; ++y)         /* Y */        ReadBytes(socketFd,                   (unsigned char *)frameP->orig_y[y], Fsize_x);        for (y = 0; y < (Fsize_y >> 1); ++y)  /* U */        ReadBytes(socketFd,                   (unsigned char *)frameP->orig_cb[y], (Fsize_x >> 1));        for (y = 0; y < (Fsize_y >> 1); ++y)  /* V */        ReadBytes(socketFd,                   (unsigned char *)frameP->orig_cr[y], (Fsize_x >> 1));}/*===========================================================================* * * EndIOServer * *  called by the master process -- tells the I/O server to commit *  suicide * * RETURNS: nothing * * SIDE EFFECTS:    none * *===========================================================================*/static void  EndIOServer(){  /* send signal to IO server:  -1 as frame number */  GetRemoteFrame(NULL, -1);}/*===========================================================================* * * NotifyDecodeServerReady * *  called by a slave to the Decode Server to tell it a decoded frame *  is ready and waiting * * RETURNS: nothing * * SIDE EFFECTS:    none * *===========================================================================*/voidNotifyDecodeServerReady(int const id) {    int   clientSocket;    time_t  tempTimeStart, tempTimeEnd;    const char * error;        time(&tempTimeStart);        ConnectToSocket(IOhostName, decodePortNumber, &hostEntry, &clientSocket,                    &error);        if (error)        errorExit("CHILD: Can't connect to decode server to tell it a frame "                "is ready.  %s", error);        WriteInt(clientSocket, id);        close(clientSocket);        time(&tempTimeEnd);    IOtime += (tempTimeEnd-tempTimeStart);}/*===========================================================================* * * WaitForDecodedFrame * *  blah blah blah * * RETURNS: nothing * * SIDE EFFECTS:    none * *===========================================================================*/void  WaitForDecodedFrame(id)int id;{  int const negativeTwo = -2;  int   clientSocket;  int     ready;  const char * error;  /* wait for a decoded frame */  if ( debugSockets ) {    fprintf(stdout, "WAITING FOR DECODED FRAME %d\n", id);  }  ConnectToSocket(IOhostName, decodePortNumber, &hostEntry, &clientSocket,                  &error);  if (error)      errorExit("CHILD: Can't connect to decode server "                "to get decoded frame.  %s",                error);  /* first, tell DecodeServer we're waiting for this frame */  WriteInt(clientSocket, negativeTwo);  WriteInt(clientSocket, id);  ReadInt(clientSocket, &ready);  if ( ! ready ) {    int     waitSocket;    int     waitPort;    int     otherSock;    const char * error;    /* it's not ready; set up a connection and wait for decode server */    CreateListeningSocket(&waitSocket, &waitPort, &error);    if (error)        errorExit("Unable to create socket on which to listen for "                  "decoded frame.  %s", error);    /* tell decode server where we are */    WriteInt(clientSocket, machineNumber);    WriteInt(clientSocket, waitPort);    close(clientSocket);    if ( debugSockets ) {      fprintf(stdout, "SLAVE:  WAITING ON SOCKET %d\n", waitPort);      fflush(stdout);    }    AcceptConnection(waitSocket, &otherSock, &error);    if (error)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -