📄 parallel.c
字号:
/*===========================================================================* * parallel.c * * Procedures to make encoder run in parallel * *===========================================================================*//* COPYRIGHT INFORMATION IS AT THE END OF THIS FILE *//*==============* * HEADER FILES * *==============*/#define _XOPEN_SOURCE 500 /* Make sure stdio.h contains pclose() *//* _ALL_SOURCE is needed on AIX to make the C library include the socket services (e.g. define struct sockaddr) Note that AIX standards.h actually sets feature declaration macros such as _XOPEN_SOURCE, unless they are already set.*/#define _ALL_SOURCE/* On AIX, pm_config.h includes standards.h, which expects to be included after feature declaration macros such as _XOPEN_SOURCE. So we include pm_config.h as late as possible.*/#include <stdarg.h>#include <time.h>#include <unistd.h>#include <stdio.h>#include <errno.h>#include <string.h>#include <signal.h>#include <netdb.h>#include <assert.h>#include <sys/types.h>#include <sys/times.h>#include <sys/utsname.h>#include "mallocvar.h"#include "nstring.h"#include "all.h"#include "param.h"#include "mpeg.h"#include "prototypes.h"#include "readframe.h"#include "fsize.h"#include "combine.h"#include "frames.h"#include "input.h"#include "psocket.h"#include "frametype.h"#include "parallel.h"struct childState { boolean finished; unsigned int startFrame; unsigned int numFrames; unsigned int lastNumFrames; unsigned int numSeconds; float fps;};struct scheduler { /* This tracks the state of the subsystem that determines the assignments for the children */ unsigned int nextFrame; /* The next frame that needs to be assigned to a child */ unsigned int numFramesInJob; /* Total number of frames in the whole run of Ppmtompeg */ unsigned int numMachines;};#define MAX_IO_SERVERS 10#ifndef SOMAXCONN#define SOMAXCONN 5#endif/*==================* * CONSTANTS * *==================*/#define TERMINATE_PID_SIGNAL SIGTERM /* signal used to terminate forked childs */#ifndef MAXARGS#define MAXARGS 1024 /* Max Number of arguments in safe_fork command */#endif/*==================* * STATIC VARIABLES * *==================*/static char rsh[256];static struct hostent *hostEntry = NULL;static boolean *frameDone;static int outputServerSocket;static int decodeServerSocket;static boolean parallelPerfect = FALSE;static int current_max_forked_pid=0;/*==================* * GLOBAL VARIABLES * *==================*/extern int yuvHeight, yuvWidth;extern char statFileName[256];extern FILE *statFile;extern boolean debugMachines;extern boolean debugSockets;int parallelTestFrames = 10;int parallelTimeChunks = 60;const char *IOhostName;int ioPortNumber;int decodePortNumber;boolean niceProcesses = FALSE;boolean forceIalign = FALSE;int machineNumber = -1;boolean remoteIO = FALSE;boolean separateConversion;time_t IOtime = 0;extern char encoder_name[];int ClientPid[MAX_MACHINES+4];/*=====================* * INTERNAL PROCEDURES * *=====================*/static const char *getHostName(void) {/*---------------------------------------------------------------------------- Return the host name of this system.-----------------------------------------------------------------------------*/ struct utsname utsname; int rc; rc = uname(&utsname); if (rc < 0) pm_error("Unable to find out host name. " "uname() failed with errno %d (%s)", errno, strerror(errno)); return strdup(utsname.nodename);}static void GNU_PRINTF_ATTRmachineDebug(const char format[], ...) { va_list args; va_start(args, format); if (debugMachines) { const char * const hostname = getHostName(); fprintf(stderr, "%s: ---", hostname); strfree(hostname); vfprintf(stderr, format, args); fputc('\n', stderr); } va_end(args);}static void GNU_PRINTF_ATTRerrorExit(const char format[], ...) { const char * const hostname = getHostName(); va_list args; va_start(args, format); fprintf(stderr, "%s: FATAL ERROR. ", hostname); strfree(hostname); vfprintf(stderr, format, args); fputc('\n', stderr); exit(1); va_end(args);}static voidTransmitPortNum(const char * const hostName, int const portNum, int const newPortNum) {/*---------------------------------------------------------------------------- Transmit the port number 'newPortNum' to the master on port 'portNum' of host 'hostName'.-----------------------------------------------------------------------------*/ int clientSocket; const char * error; ConnectToSocket(hostName, portNum, &hostEntry, &clientSocket, &error); if (error) errorExit("Can't connect in order to transmit port number. %s", error); WriteInt(clientSocket, newPortNum); close(clientSocket);}static voidreadYUVDecoded(int const socketFd, unsigned int const Fsize_x, unsigned int const Fsize_y, MpegFrame * const frameP) { unsigned int y; for (y = 0; y < Fsize_y; ++y) /* Y */ ReadBytes(socketFd, (unsigned char *)frameP->decoded_y[y], Fsize_x); for (y = 0; y < (Fsize_y >> 1); ++y) /* U */ ReadBytes(socketFd, (unsigned char *)frameP->decoded_cb[y], (Fsize_x >> 1)); for (y = 0; y < (Fsize_y >> 1); ++y) /* V */ ReadBytes(socketFd, (unsigned char *)frameP->decoded_cr[y], (Fsize_x >> 1));}static voidwriteYUVDecoded(int const socketFd, unsigned int const Fsize_x, unsigned int const Fsize_y, MpegFrame * const frameP) { unsigned int y; for (y = 0; y < Fsize_y; ++y) /* Y */ WriteBytes(socketFd, (unsigned char *)frameP->decoded_y[y], Fsize_x); for (y = 0; y < (Fsize_y >> 1); ++y) /* U */ WriteBytes(socketFd, (unsigned char *)frameP->decoded_cb[y], (Fsize_x >> 1)); for (y = 0; y < (Fsize_y >> 1); ++y) /* V */ WriteBytes(socketFd, (unsigned char *)frameP->decoded_cr[y], (Fsize_x >> 1));}static voidwriteYUVOrig(int const socketFd, unsigned int const Fsize_x, unsigned int const Fsize_y, MpegFrame * const frameP) { unsigned int y; for (y = 0; y < Fsize_y; ++y) /* Y */ WriteBytes(socketFd, (unsigned char *)frameP->orig_y[y], Fsize_x); for (y = 0; y < (Fsize_y >> 1); ++y) /* U */ WriteBytes(socketFd, (unsigned char *)frameP->orig_cb[y], (Fsize_x >> 1)); for (y = 0; y < (Fsize_y >> 1); ++y) /* V */ WriteBytes(socketFd, (unsigned char *)frameP->orig_cr[y], (Fsize_x >> 1));}static voidreadYUVOrig(int const socketFd, unsigned int const Fsize_x, unsigned int const Fsize_y, MpegFrame * const frameP) { unsigned int y; for (y = 0; y < Fsize_y; ++y) /* Y */ ReadBytes(socketFd, (unsigned char *)frameP->orig_y[y], Fsize_x); for (y = 0; y < (Fsize_y >> 1); ++y) /* U */ ReadBytes(socketFd, (unsigned char *)frameP->orig_cb[y], (Fsize_x >> 1)); for (y = 0; y < (Fsize_y >> 1); ++y) /* V */ ReadBytes(socketFd, (unsigned char *)frameP->orig_cr[y], (Fsize_x >> 1));}/*===========================================================================* * * EndIOServer * * called by the master process -- tells the I/O server to commit * suicide * * RETURNS: nothing * * SIDE EFFECTS: none * *===========================================================================*/static void EndIOServer(){ /* send signal to IO server: -1 as frame number */ GetRemoteFrame(NULL, -1);}/*===========================================================================* * * NotifyDecodeServerReady * * called by a slave to the Decode Server to tell it a decoded frame * is ready and waiting * * RETURNS: nothing * * SIDE EFFECTS: none * *===========================================================================*/voidNotifyDecodeServerReady(int const id) { int clientSocket; time_t tempTimeStart, tempTimeEnd; const char * error; time(&tempTimeStart); ConnectToSocket(IOhostName, decodePortNumber, &hostEntry, &clientSocket, &error); if (error) errorExit("CHILD: Can't connect to decode server to tell it a frame " "is ready. %s", error); WriteInt(clientSocket, id); close(clientSocket); time(&tempTimeEnd); IOtime += (tempTimeEnd-tempTimeStart);}/*===========================================================================* * * WaitForDecodedFrame * * blah blah blah * * RETURNS: nothing * * SIDE EFFECTS: none * *===========================================================================*/void WaitForDecodedFrame(id)int id;{ int const negativeTwo = -2; int clientSocket; int ready; const char * error; /* wait for a decoded frame */ if ( debugSockets ) { fprintf(stdout, "WAITING FOR DECODED FRAME %d\n", id); } ConnectToSocket(IOhostName, decodePortNumber, &hostEntry, &clientSocket, &error); if (error) errorExit("CHILD: Can't connect to decode server " "to get decoded frame. %s", error); /* first, tell DecodeServer we're waiting for this frame */ WriteInt(clientSocket, negativeTwo); WriteInt(clientSocket, id); ReadInt(clientSocket, &ready); if ( ! ready ) { int waitSocket; int waitPort; int otherSock; const char * error; /* it's not ready; set up a connection and wait for decode server */ CreateListeningSocket(&waitSocket, &waitPort, &error); if (error) errorExit("Unable to create socket on which to listen for " "decoded frame. %s", error); /* tell decode server where we are */ WriteInt(clientSocket, machineNumber); WriteInt(clientSocket, waitPort); close(clientSocket); if ( debugSockets ) { fprintf(stdout, "SLAVE: WAITING ON SOCKET %d\n", waitPort); fflush(stdout); } AcceptConnection(waitSocket, &otherSock, &error); if (error)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -