⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 vrun.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
📖 第 1 页 / 共 4 页
字号:
  slen = sizeof(struct sockaddr_in);  if(getsockname(c->stder_sock, (struct sockaddr*)&server_stder, &slen) < 0)    qerror("get stder_sock name");  memcpy(&c->stder, &c->myself, sizeof(struct sockaddr_in));  c->stder.sin_port = server_stder.sin_port;  printi("init", "STDERR vrun bound on %s:%d", inet_ntoa(c->stder.sin_addr), ntohs(c->stder.sin_port));			  if( socketpair(AF_UNIX, SOCK_DGRAM, 0, c->stder_comm) == -1 )    qerror("stder_comm creation");#ifndef NO_CHECKPOINT  if( args->n_cs_given )    c->nb_cs = args->n_cs_arg;  else    c->nb_cs = 1 + (args->n_procs_arg / 5);  c->cs = (struct service*)calloc(c->nb_cs, sizeof(struct service));  for(i = 0; i < c->nb_cs; i++)    {      c->cs[i].kind = KIND_SERVER;      c->cs[i].sock = -1;      pthread_mutex_init(&c->cs[i].lock, NULL);    }  c->sc = (struct service*)calloc(1, sizeof(struct service));  c->sc->kind = KIND_SERVER;  c->sc->sock = -1;  pthread_mutex_init(&c->sc->lock, NULL);#endif#ifndef NO_EVENTLOGGER  c->nb_el = args->n_el_arg;  c->el = (struct service*)calloc(c->nb_el, sizeof(struct service));  for(i = 0; i < c->nb_el; i++)    {      c->el[i].kind = KIND_SERVER;      c->el[i].sock = -1;      pthread_mutex_init(&c->el[i].lock, NULL);    }#endif    pthread_mutex_init(&c->accept_list_lock, NULL);  pthread_cond_init(&c->accept_list_cond, NULL);  c->nb_nodes = args->n_procs_arg;  c->nodes = (struct service*)calloc(c->nb_nodes, sizeof(struct service));  for(i = 0; i < c->nb_nodes; i++)    {      c->nodes[i].kind = KIND_NODE;      c->nodes[i].nrank = i;      c->nodes[i].nstatus = STATUS_INITIALSTART;      c->nodes[i].sock = -1;      pthread_mutex_init(&c->nodes[i].lock, NULL);    }  return c;}static int send_intparam(int s, char *name, int value){  char line[strlen(name)+20];  sprintf(line, "%s=%d\n", name, value);  if(usend(s, line, strlen(line), 0) != strlen(line))    {      printe("error while sending %s (%s)", name, line);      return -1;    }  return 0;}static int send_strparam(int s, char *name, char *value){  char line[strlen(name)+strlen(value)+4];  sprintf(line, "%s=%s\n", name, value);  if(usend(s, line, strlen(line), 0) != strlen(line))    {      printe("error while sending %s (%s)", name, line);      return -1;    }    return 0;}/** * initial exchange for establishing the connection with a server. *   s : the socket for talking *   c : the context... *   port : IN/OUT value.  *          IN  : the port to use (0) for autoselect.  *          OUT : the value chosen. */static int init_exchange_server(struct service *s, struct context *c, in_port_t *port){  int r;  in_port_t p;  /* !! PLEASE !!   * for sanity reasons, keep this always synchronized with    *     server.ggo -- self explanatory   * and server.c   -- search for the !! PLEASE !! commentary   */  p = *port;  send_intparam(s->sock, "n_procs", c->args->n_procs_arg);  send_intparam(s->sock, "jobid", c->args->jobid_arg);  send_intparam(s->sock, "port", p);  send_strparam(s->sock, "working_dir", c->args->working_dir_arg);  send_strparam(s->sock, "tmp_dir", c->args->tmp_dir_arg);  send_strparam(s->sock, "helpers_dir", c->args->helpers_dir_arg);  send_strparam(s->sock, "master_sched", c->args->master_sched_given?c->args->master_sched_arg:"");  send_intparam(s->sock, "ckpt_timeout", c->args->ckpt_timeout_given?c->args->ckpt_timeout_arg:DEFAULT_CKPT_FREQ);  send_strparam(s->sock, "ckpt_strategy", c->args->ckpt_strategy_arg);  send_strparam(s->sock, "el_strategy", c->args->el_strategy_arg);  send_intparam(s->sock, "stdio_port", ntohs(c->stdio.sin_port));  send_intparam(s->sock, "stder_port", ntohs(c->stder.sin_port));  send_strparam(s->sock, "debug", c->args->debug_given?c->args->debug_arg:"");  r = send_strparam(s->sock, "EOF", "EOF");  if(r < 0)    return r;  printi("server", "waiting for the port (%d bytes)", sizeof(in_port_t));  if( (r = urecv(s->sock, &p, sizeof(in_port_t), MSG_WAITALL)) != sizeof(in_port_t))    {       perror("recv");      printe("error while receiving port number (%d bytes received)", r);       return -1;    }    printi("server", "port negociated : %u", ntohs(p));  *port = p;  return 0;}static void on_child_exit(int s){  siginterrupt(SIGCHLD, 1);  signal(SIGCHLD, on_child_exit);  return;}static int accept_list_pop(struct context *c, in_addr_t addr, struct sockaddr_in *remote){  int i;  int ret;  printi("accept_list", "%d want to pop a connection from %s", pthread_self(), 	 inet_ntoa(*(struct in_addr*)&addr));  pthread_mutex_lock(&c->accept_list_lock); test_again:  printi("accept_list", "%d is searching", pthread_self());  for(i = 0; i < c->accept_list_size; i++)    {      if( !memcmp(&addr, &c->accept_list[i].remote.sin_addr, sizeof(in_addr_t)) )	break;    }  if(i == c->accept_list_size)    {      printi("accept_list", "%d did not found. It is waiting", pthread_self());      pthread_cond_wait(&c->accept_list_cond, &c->accept_list_lock);      goto test_again;    }  ret = c->accept_list[i].fd;  memcpy(remote, &c->accept_list[i].remote, sizeof(struct sockaddr_in));  printi("accept_list", "%d found on %d/%d : %s:%d on fd %d", pthread_self(), i, c->accept_list_size, 	 inet_ntoa(remote->sin_addr), ntohs(remote->sin_port), ret);  memcpy(&c->accept_list[i], &c->accept_list[c->accept_list_size-1], sizeof(struct accept_list_elt));  c->accept_list_size--;  pthread_mutex_unlock(&c->accept_list_lock);  return ret;}typedef int (*init_exchange_t)(struct service *s, struct context *, in_port_t *);/* spawn return 1 if spawning succeeded *              0 if spawning failed * LOCK ON M IS ASSUMED TO BE TAKEN */static int spawn(struct context *c, struct machine *m, short suggested_port,		 char *cmdline, struct service *s, init_exchange_t init){  in_port_t port;  struct sockaddr_in remote;  int status;  char command[strlen(c->args->rsh_arg)+strlen(m->name)+	       strlen(c->args->working_dir_arg)+	       strlen(c->args->helpers_dir_arg)+	       strlen(cmdline)+128];    sprintf(command, "%s %s %s/%s --runtime-ip=%s --runtime-port=%d", 	  c->args->rsh_arg, m->name, c->args->helpers_dir_arg, cmdline,	  inet_ntoa(c->myself.sin_addr), ntohs(c->myself.sin_port));    if(fork())    {      if( (s->sock = accept_list_pop(c, m->ip, &remote)) < 0 )	{	  printi("spawn", "error while accepting connection from server %s (%s)", cmdline, strerror(errno));	  m->missed++;	  printi("spawn", "waiting for son to die");	  wait(&status);	  printi("spawn", "returning with fail");	  return 0;	}      printi("spawn", "waiting for son to die");      wait(&status);      printi("spawn", "son is dead, checking status");      if(WIFEXITED(status) && (WEXITSTATUS(status)==0))	{	  printi("spawn", "everything did go fine, trying to exchange");	  port = suggested_port;	  // Hack : port is sent in ASCII and received in BINARY	  // thus, here it is in machine order	  if( init(s, c, &port) < 0 )	    {	      printi("spawn", "error while handshaking");	      goto err_close;	    }	  // and here, it is in network order	  pthread_mutex_lock(&s->lock);	  memcpy(&s->addr, &remote, sizeof(struct sockaddr_in));	  s->addr.sin_port = port;	  pthread_mutex_unlock(&s->lock);	  m->used++;	  printi("spawn", "%s is now used %d times and free to be used again", m->name, m->used);	  return 1;	}      /* else */      printi("spawn", "%s %s with error %d (%s)", cmdline, 	     WIFEXITED(status)?"exited":"did not exit", WEXITSTATUS(status), strerror(errno));    err_close:      close(s->sock);      s->sock = -1;      memset(&s->addr, 0, sizeof(struct sockaddr_in));      m->missed++;      printi("spawn", "%s missed (it missed %d times since begining). Returning with fail", m->name, m->missed);      return 0;    }  else    {      /* son */      char *cmd = strdup(command);      char **argv;      int argc = 2;      int i;      for(i = 0; cmd[i]; i++)	if(cmd[i] == ' ')	  argc++;      argv = (char**)malloc(argc * sizeof(char *));      argv[0] = strtok(cmd, " ");      for(i = 1; i < argc; i++)	argv[i] = strtok(NULL, " ");      printi("spawn", "execing : %s", command);      execv(argv[0], argv);      printe("exec(%s)", command);      exit(1);    }}static void extended_help(){  cmdline_parser_print_help();  printf("  COMMAND is mandatory. If you have command arguments, precede COMMAND by '--'.\n");  #ifdef NO_CHECKPOINT  printf("  checkpoint server and checkpoint scheduler options are silently ignored\n");#endif#ifdef NO_EVENTLOGGER  printf("  event logger options are silently ignored\n");#endif}static void spawn_servers(struct context *c){  int i, j, k;  /* hack : spawn_servers is monothread   *  (it could be multi-thread. Evaluate the performance gain ?)   *  and we _should_ not see missed machines in the stables array   *  so, we don't sort the array inline, and we assume that no array    *  indice will change in this function. Also, we don't take locks.   */  i = 0;  if( c->sc )    {      for(; i < c->nb_stables; i++)	{	  printi("servers", "spawning sc");	  if( spawn(c, c->stables[i], 		    c->args->sc_port_given ? c->args->sc_port_arg : 0, 		    CSCHED, c->sc, init_exchange_server) )	    {	      printi("servers", "sc is %s", c->stables[i]->name);	      break;	    }	  printw("%s is in the stable machine list, but is not responding...", c->stables[i]->name);	}      if( i == c->nb_stables )	{	  printq("There is no responding stable machines! (%d tried)", c->nb_stables);	}    }  for(j = 0; j < c->nb_cs; j++)    {      for(k = 0; k < 2*c->nb_stables; k++)	{	  i = (i + 1) % c->nb_stables;	  printi("servers", "spawning cs %d", j);	  if( spawn(c, c->stables[i], 		    c->args->cs_port_given ? c->args->cs_port_arg : 0,		    CHKPTSERV, &(c->cs[j]), init_exchange_server) )	    {	      printi("servers", "cs[%d] is %s", j, c->stables[i]->name);	      break;	    }	  printw("%s is in the stable machine list, but is not responding...", c->stables[i]->name);	}      if(k == 2*c->nb_stables)	{	  printq("There is not enough responding stable machines for launching CS#%d", j);	}    }  for(j = 0; j < c->nb_el; j++)    {      for(k = 0; k < 2*c->nb_stables; k++)	{	  printi("servers", "spawning el %d", j);	  i = (i + 1) % c->nb_stables;	  if( spawn(c, c->stables[i], 		    c->args->el_port_given ? c->args->el_port_arg : 0,		    EVENTLOGGER, &(c->el[j]), init_exchange_server) )	    {	      printi("servers", "el[%d] is %s", j, c->stables[i]->name);	      break;	    }	  printw("%s is in the stable machine list, but is not responding...", c->stables[i]->name);	}      if(k == 2*c->nb_stables)	{	  printq("There is not enough responding stable machines for launching CS#%d", j);	}    }    /* context update: if stable is a reverse copy of unstable, or if   * stables machines are also in the unstable list, update the load counter   */  for(i = 0; (i < c->nb_stables); i++)    {      for(j = 0; j < c->nb_unstables; j++)	if(!strcmp( c->stables[i]->name, c->unstables[j]->name ) )	  c->unstables[j]->used += c->stables[i]->used;    }}static int recompute_servers_set(void *_c, fd_set *set){  struct context *c = (struct context*)_c;  int maxfd, i;  FD_ZERO(set);  maxfd = -1;  for(i = 0; i < c->nb_cs; i++)    if(c->cs[i].sock != -1)      {	FD_SET(c->cs[i].sock, set);	if( maxfd < c->cs[i].sock )	  maxfd = c->cs[i].sock;      }  for(i = 0; i < c->nb_el; i++)    if(c->el[i].sock != -1)      {	FD_SET(c->el[i].sock, set);	if( maxfd < c->el[i].sock )	  maxfd = c->el[i].sock;      }  if(c->sc && (c->sc->sock != -1))    {      FD_SET(c->sc->sock, set);      if( maxfd < c->sc->sock )	maxfd = c->sc->sock;    }  printi("select", "maxfd = %d", maxfd);  return maxfd;}static void *servers_monitor(void *_c){#define on_server_error(serverid, s) do { \  if(r == 0)\       printw(serverid " closed the connection. It died most certainly. Expect errors", i); \  else \       printw("data on control port of " serverid " (%d bytes to read). Closing connection. Expect errors", r); \  close(s); \  s = -1; \  maxfd = recompute_servers_set(c, &rfs); \} while(0)   struct context *c = (struct context*)_c;  fd_set rfs;  int i, maxfd, r;  maxfd = recompute_servers_set(c, &rfs);  for(;;)    {      if(maxfd == -1)	break;      r = select(maxfd+1, &rfs, NULL, NULL, NULL);      if(r == -1)	{	  if( (errno == EINTR) || (errno == EAGAIN) )	    {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -