⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 vrun.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
📖 第 1 页 / 共 4 页
字号:
#include "config.h"#include "debug.h"#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <stdio.h>#include <pthread.h>#include <netdb.h>#include <unistd.h>#include <string.h>#include <sys/wait.h>#include <sys/stat.h>#include <sys/ioctl.h>#include <fcntl.h>#include <ctype.h>#include <poll.h>/* #include "vrun.h" */#define FINALIZE_MSG 1#define DEFAULT_CKPT_FREQ     5extern int h_errno;char *get_current_dir_name(void);#include "cmdline_vrun.h"#define CHKPTSERV "checkpointserver"#define EVENTLOGGER "eventlogger"#define CSCHED "csched"#define VDAEMON "vdaemon"struct service {  struct sockaddr_in addr;  int sock;  pthread_mutex_t lock;  unsigned char kind;  union {    struct { } server;    struct {      int _rank;      int _status;      struct service *_cs;      struct service *_sc;      struct service *_el;    } node;  } u;};#define nrank    u.node._rank#define nstatus  u.node._status#define ncs      u.node._cs#define nsc      u.node._sc#define nel      u.node._el#define KIND_NODE   1#define KIND_SERVER 2#define STATUS_INITIALSTART 0#define STATUS_RESTART      1#define STATUS_FINISHED     2#define STATUS_EXITING      3#define STATUS_WAITING_FOR_CLOSE 4#define STATUS_EXITED       5struct machine {  char *name;  in_addr_t ip;  int used;  int missed;  pthread_mutex_t lock;};struct accept_list_elt {  struct sockaddr_in remote;  int fd;};struct context {  int listen_sock;  int stdio_sock;  int stder_sock;  int stdio_comm[2];  int stder_comm[2];  struct gengetopt_args_info *args;  struct sockaddr_in myself;  struct sockaddr_in stdio;  struct sockaddr_in stder;  char *command;  pthread_mutex_t accept_list_lock;  pthread_cond_t accept_list_cond;  int accept_list_size;  int accept_list_malloced;  struct accept_list_elt *accept_list;  int nb_stables;  struct machine **stables;  int nb_unstables;  struct machine **unstables;  int nb_cs;  struct service *cs;  struct service *sc;  int nb_el;  struct service *el;  int nb_nodes;  struct service *nodes;};struct node_monitor_parameter_t {  int nb_nodes;  struct service **nodes;  struct context *c;  pthread_t monitor;};static int usend(int sock, void *buf, int size, int flag){    int n, written = 0;    while( written < size ) {	n = send( sock, buf+written, size-written, MSG_NOSIGNAL | flag );	if( n < 0 ) {	    if((errno == EINTR) || (errno == EAGAIN)) continue;#ifdef DEBUG	    printe( "Writing %d/%d bytes on %d  before error",		       written, size, sock);#endif	    return n;	}	written += n;    }    return written;}static int urecv(int sock, void *buf, int size, int flag ){  int n, rd = 0;    while(rd < size)  {    n = recv(sock, buf + rd, size - rd, MSG_WAITALL | flag);    if(n == 0)    {      /** change mode to synchronous, since this 0 may be not disconnect */      int fdflags = fcntl(sock, F_GETFL);      int nflags  = (fdflags & (~(O_ASYNC|O_NONBLOCK))) | (O_SYNC);    again:      fcntl(sock, F_SETFL, nflags);      n = recv(sock, buf + rd, size - rd, MSG_WAITALL | flag);      /** back to old mode */      fcntl(sock, F_SETFL, fdflags);      if(n == 0)	{	  /** this is a disconnection (sure) */	  return 0;	}      if(n < 0)	{	  if(errno == EINTR) 	    goto again;	  printe("SyncReading %d/%d bytes on fd %d before error", rd, size, sock);	  return -1;	}    }    if(n < 0)    {      if((errno == EINTR)|| (errno == EAGAIN)) continue;      printe("SyncReading %d/%d bytes on fd %d before error", rd, size, sock);      return -1;    }    rd += n;  }  return rd;}static void *accept_list_thread(void *_c){  struct context *c = (struct context *)_c;  struct sockaddr_in remote;  int r;  socklen_t rlen;  int failed=0;  pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);  for(;;)    {      rlen = sizeof(struct sockaddr_in);      remote.sin_addr.s_addr = INADDR_ANY;      r = accept(c->listen_sock, (struct sockaddr*)&remote, &rlen);      pthread_testcancel();            printi("accept_list", "%d accepted a connection with %s:%d on %d",	     pthread_self(),	     inet_ntoa(remote.sin_addr), ntohs(remote.sin_port), r);      if( (r < 0) && ( remote.sin_addr.s_addr == INADDR_ANY ) )	{	  printe("accept failed (%d times) in the accept_list_thread", failed);	  if(failed++ > 100)	    printq("maximum failed reached. Fatal error.");	  continue;	}	          pthread_mutex_lock(&c->accept_list_lock);      if( c->accept_list_size + 1 > c->accept_list_malloced )	{	  c->accept_list_malloced = 1+c->accept_list_size;	  c->accept_list = (struct accept_list_elt*)	    realloc(c->accept_list, 		    c->accept_list_malloced*sizeof(struct accept_list_elt));	}      memcpy(&c->accept_list[c->accept_list_size].remote, &remote, sizeof(struct sockaddr_in));      c->accept_list[c->accept_list_size].fd = r;      c->accept_list_size++;      pthread_cond_broadcast(&c->accept_list_cond);      printi("accept", "appended to the accept_list");      pthread_mutex_unlock(&c->accept_list_lock);    }  return NULL;}static void parse_machine_file(struct context *c, FILE *f, int stable){  int i;  char line[1024];  struct hostent *h;  while(fgets(line, 1024, f))    {      for(i = 0; line[i] && (line[i] != ':') && (line[i] != '\n'); i++) ;      line[i] = 0;      h = gethostbyname(line);      if(h == NULL)	{	  printi("init", "%s is not solvable by gethostbyname (%s) -- ignored",		 line, hstrerror(h_errno));	  continue;	}      if(stable)	{	  i = c->nb_stables;	  c->nb_stables++;	  c->stables = (struct machine**)realloc(c->stables, c->nb_stables*sizeof(struct machine*));	  c->stables[i] = (struct machine*)calloc(1, sizeof(struct machine));	  c->stables[i]->name = strdup(line);	  memcpy(&c->stables[i]->ip, h->h_addr, h->h_length);	  c->stables[i]->missed = 0;	  pthread_mutex_init(&c->stables[i]->lock, NULL);	}      else	{	  i = c->nb_unstables;	  c->nb_unstables++;	  c->unstables = (struct machine**)realloc(c->unstables, c->nb_unstables*sizeof(struct machine*));	  c->unstables[i] = (struct machine*)calloc(1, sizeof(struct machine));	  c->unstables[i]->name = strdup(line);	  memcpy(&c->unstables[i]->ip, h->h_addr, h->h_length);	  c->unstables[i]->missed = 0;	  pthread_mutex_init(&c->unstables[i]->lock, NULL);	}    }}static struct context *build_context(struct gengetopt_args_info *args){  struct context *c;  char myname[512];  struct hostent *h;  FILE *f;  int i;  socklen_t slen;  struct sockaddr_in server, server_stdio, server_stder;  f = fopen(args->machines_file_arg, "r");  if(f == NULL)    {      perror(args->machines_file_arg);      return NULL;    }  c = (struct context*)calloc(1, sizeof(struct context));  c->args = args;  parse_machine_file(c, f, 0);  fclose(f);  if(args->stable_machines_file_given)    {      f = fopen(args->stable_machines_file_arg, "r");      if(f == NULL)	{	  perror(args->stable_machines_file_arg);	  printe(" unable to open %s", args->stable_machines_file_arg);	  printw(" (ignored: unstable machines are used as stable machines)");	  goto err;	}      else	{	  parse_machine_file(c, f, 1);	  fclose(f);	}    }  else    {    err:      /* we use the reverse unstables machines as stables */      c->stables = (struct machine**)calloc(c->nb_unstables, sizeof(struct machine*));      for(i = 0; i < c->nb_unstables; i++)	{	  c->stables[c->nb_unstables-i-1] = (struct machine*)calloc(1, sizeof(struct machine));	  memcpy(c->stables[c->nb_unstables-i-1], c->unstables[i], sizeof(struct machine));	}      c->nb_stables = c->nb_unstables;    }  if(gethostname(myname, 512) < 0)    qerror("gethostname");  printi("init", "I am %s", myname);  if( (h = gethostbyname(myname)) == NULL )    {      printe("gethostbyname(%s) failed (%s)", myname, hstrerror(h_errno));      return NULL;    }  memcpy(&c->myself.sin_addr, h->h_addr, h->h_length);  c->listen_sock = socket(AF_INET, SOCK_STREAM, 0);  if(c->listen_sock < 0)    qerror("listen_sock creation");  if( args->runtime_port_given )    {      memset(&server, 0, sizeof(struct sockaddr_in));      server.sin_addr.s_addr = INADDR_ANY;      server.sin_family = AF_INET;      server.sin_port = htons( args->runtime_port_arg );    }  else    {      memset(&server, 0, sizeof(struct sockaddr_in));      server.sin_addr.s_addr = INADDR_ANY;      server.sin_family = AF_INET;    }  if(bind(c->listen_sock, (struct sockaddr*)&server, sizeof(struct sockaddr_in)) < 0)    qerror("listen_sock binding");    if(listen(c->listen_sock, 5) < 0)    qerror("listen_sock listening");  slen = sizeof(struct sockaddr_in);  if(getsockname(c->listen_sock, (struct sockaddr*)&server, &slen) < 0)    qerror("get listen_sock name");  c->myself.sin_port = server.sin_port;  printi("init", "vrun bound on %s:%d", inet_ntoa(c->myself.sin_addr), ntohs(server.sin_port));  c->stdio_sock = socket(AF_INET, SOCK_STREAM, 0);  if(c->stdio_sock < 0)    qerror("stdio_sock creation");  if( args->stdio_port_given )    {      memset(&server_stdio, 0, sizeof(struct sockaddr_in));      server_stdio.sin_addr.s_addr = INADDR_ANY;      server_stdio.sin_family = AF_INET;      server_stdio.sin_port = htons( args->stdio_port_arg );    }  else    {      memset(&server_stdio, 0, sizeof(struct sockaddr_in));      server_stdio.sin_addr.s_addr = INADDR_ANY;      server_stdio.sin_family = AF_INET;    }  if(bind(c->stdio_sock, (struct sockaddr*)&server_stdio, sizeof(struct sockaddr_in)) < 0)    qerror("stdio_sock binding");    if(listen(c->stdio_sock, 5) < 0)    qerror("stdio_sock listening");  slen = sizeof(struct sockaddr_in);  if(getsockname(c->stdio_sock, (struct sockaddr*)&server_stdio, &slen) < 0)    qerror("get stdio_sock name");  memcpy(&c->stdio, &c->myself, sizeof(struct sockaddr_in));  c->stdio.sin_port = server_stdio.sin_port;  printi("init", "STDIO vrun bound on %s:%d", inet_ntoa(c->stdio.sin_addr), ntohs(c->stdio.sin_port));  if( socketpair(AF_UNIX, SOCK_DGRAM, 0, c->stdio_comm) == -1 )    qerror("stdio_comm creation");  c->stder_sock = socket(AF_INET, SOCK_STREAM, 0);  if(c->stder_sock < 0)    qerror("stder_sock creation");  if( args->stder_port_given )    {      memset(&server_stder, 0, sizeof(struct sockaddr_in));      server_stder.sin_addr.s_addr = INADDR_ANY;      server_stder.sin_family = AF_INET;      server_stder.sin_port = htons( args->stder_port_arg );    }  else    {      memset(&server_stder, 0, sizeof(struct sockaddr_in));      server_stder.sin_addr.s_addr = INADDR_ANY;      server_stder.sin_family = AF_INET;    }  if(bind(c->stder_sock, (struct sockaddr*)&server_stder, sizeof(struct sockaddr_in)) < 0)    qerror("stder_sock binding");  if(listen(c->stder_sock, 5) < 0)    qerror("stder_sock listening");

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -