⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 vrun.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
📖 第 1 页 / 共 4 页
字号:
static void *nodes_monitor(void *_nmp){  int i, r, u, n, c, p, ack, maxfd = -1, exited;  struct node_monitor_parameter_t *nmp = (struct node_monitor_parameter_t*)_nmp;  fd_set rfs;  char *buffer = NULL;  int bsize = 0;  printi("nodes_monitor", "node_monitor %d is handling %d nodes", pthread_self(), nmp->nb_nodes);  FD_ZERO(&rfs);  for(;;)    {      /* Here, I spawn all non-running nodes in  round-robin */      for(i = 0; i < nmp->nb_nodes; i++)	{	  pthread_mutex_lock(&nmp->nodes[i]->lock);	  exited = (nmp->nodes[i]->nstatus == STATUS_EXITED);	  pthread_mutex_unlock(&nmp->nodes[i]->lock);	  if( (nmp->nodes[i]->sock == -1) && !exited )	    {	      printi("nodes_monitor", "node %d (monitored by %d) is not exited but not alive.",		     nmp->nodes[i]->nrank, pthread_self());	      for(;;)		{		  u = lock_maximal_machine(nmp->c->unstables, nmp->c->nb_unstables);		  printi("nodes", "spawning node %d", i);		  if( nmp->nodes[i]->nstatus == STATUS_RESTART )		    printi("nodes", "restarting node of rank %d", nmp->nodes[i]->nrank);		  if ( spawn(nmp->c, nmp->c->unstables[u], 0, VDAEMON, 			     nmp->nodes[i], init_exchange_node) )		    {		      printi("nodes_monitor", "choose %s for rank %d", 			     nmp->c->unstables[u]->name, nmp->nodes[i]->nrank);		      pthread_mutex_unlock( &nmp->c->unstables[u]->lock );		      		      /* Now, we give the array of nodes to the newly connected		       * node. We _have_ to put synchronization barrier 		       * in order to ensure that a node at least (exactly one,		       * in fact) will have a complete list of all other nodes.		       */		      for(n = 0; n < nmp->c->nb_nodes; n++)			{			  pthread_mutex_lock(&nmp->c->nodes[n].lock);			  send_intparam(nmp->nodes[i]->sock,					"node_rank", n);			  send_strparam(nmp->nodes[i]->sock, 					"node_ip", 					inet_ntoa(nmp->c->nodes[n].addr.sin_addr));			  send_intparam(nmp->nodes[i]->sock,					"node_port",					ntohs(nmp->c->nodes[n].addr.sin_port));			  pthread_mutex_unlock(&nmp->c->nodes[n].lock);			}		      if( send_strparam(nmp->nodes[i]->sock, "EOF", "EOF") < 0 )			{			  /* Argh, big failure */			  close(nmp->nodes[i]->sock);			  nmp->nodes[i]->sock = -1;			  memset(&nmp->nodes[i]->addr, 0, sizeof(struct sockaddr_in));			  continue;			}		      /* starting from now, we consider that the node started at least one */		      pthread_mutex_lock(&nmp->nodes[i]->lock);		      nmp->nodes[i]->nstatus = STATUS_RESTART;		      pthread_mutex_unlock(&nmp->nodes[i]->lock);		      FD_SET(nmp->nodes[i]->sock, &rfs);		      if(nmp->nodes[i]->sock > maxfd)			maxfd = nmp->nodes[i]->sock;		      break;		    }		  else		    {		      printi("nodes_monitor", "for an unknown reason, %s did not fit for node of rank %d",			     nmp->c->unstables[u]->name, nmp->nodes[i]->nrank);		      pthread_mutex_unlock( &nmp->c->unstables[u]->lock );		    }		}	    }	}	      for(i = 0; i < nmp->nb_nodes; i++)	{	  pthread_mutex_lock(&nmp->nodes[i]->lock);	  if( nmp->nodes[i]->nstatus == STATUS_EXITING )	    {	      printi("finalize", "sending acknowledge of exit to %d", nmp->nodes[i]->nrank);	      usend(nmp->nodes[i]->sock, &ack, sizeof(int), 0);	      nmp->nodes[i]->nstatus = STATUS_WAITING_FOR_CLOSE;	    }	  pthread_mutex_unlock(&nmp->nodes[i]->lock);	}      while( maxfd > -1 )	{	  /* and now, the select, which detects deads and ends */	  printi("nodes_monitor", "%d enters in select (maxfd = %d)", pthread_self(), maxfd);	  r = select(maxfd+1, &rfs, NULL, NULL, NULL);	  if(r < 0)	    {	      if(errno == EAGAIN)		printi("nodes_monitor", "we received a AGAIN error. Looping");	      else if(errno == EINTR)		printi("nodes_monitor", "we receveid some signal. Looping");	      else		qerror("monitor %d : select returned with error", pthread_self());	    }	  else	    break;	}      for(i = 0; i < nmp->nb_nodes; i++)	{	  if(nmp->nodes[i]->sock != -1)	    {	      if( FD_ISSET(nmp->nodes[i]->sock, &rfs) )		{		  r=-1;		  ioctl(nmp->nodes[i]->sock, FIONREAD, &r);		  if(r <= 0)		    {		      pthread_mutex_lock(&nmp->nodes[i]->lock);		      if( nmp->nodes[i]->nstatus == STATUS_WAITING_FOR_CLOSE )			{			  nmp->nodes[i]->nstatus = STATUS_EXITED;			  printi("finalize", "node number %d enter STATUS_EXITED", nmp->nodes[i]->nrank);			}		      pthread_mutex_unlock(&nmp->nodes[i]->lock);		      printi("finalize", "requesting the stdio and err thread to check the status, in case stdio descriptor closed before dispatcher fd");		      write(nmp->c->stdio_comm[1], &r, sizeof(int));		      write(nmp->c->stder_comm[1], &r, sizeof(int));		    err:		      printi("nodes_monitor", "node %d broke connection", i);		      close(nmp->nodes[i]->sock);		      FD_CLR(nmp->nodes[i]->sock, &rfs);		      printi("nodes_monitor", "%d removes %d from the rfs", pthread_self(), nmp->nodes[i]->sock);		      nmp->nodes[i]->sock = -1;		      /* recompute maxfd */		      maxfd = -1;		      for(p = 0; p < nmp->nb_nodes; p++)			if( nmp->nodes[p]->sock > maxfd )			  maxfd = nmp->nodes[p]->sock;		      printi("nodes_monitor", "%d's maxfd = %d", pthread_self(), maxfd);		      pthread_mutex_lock(&nmp->nodes[i]->lock);		      memset(&nmp->nodes[i]->addr, 0, sizeof(struct sockaddr_in));		      pthread_mutex_unlock(&nmp->nodes[i]->lock);		    }		  else		    {		      if(bsize < r+1)			{			  bsize = r+1;			  buffer = (char*)realloc(buffer, bsize);			}		      r = urecv(nmp->nodes[i]->sock, buffer, r, 0);		      if(r < 0)			goto err;		      printi("nodes_monitor", "received %d bytes", r);		      for(c = 0; c < r; c++)			{			  if( !isprint(buffer[c]) )			    {			      int *msg = (int*)(buffer + c);			      if( ntohl(*msg) == FINALIZE_MSG )				{				  printi("nodes_monitor", "process %d of rank %d has finished", 					 i, nmp->nodes[i]->nrank);				  pthread_mutex_lock(&nmp->nodes[i]->lock);				  nmp->nodes[i]->nstatus = STATUS_FINISHED;				  pthread_mutex_unlock(&nmp->nodes[i]->lock);				  				  if(allsamestatus(nmp->c, STATUS_FINISHED))				    {				      printi("finalize", "all rank has the status FINISHED. Passing to status EXITING");				      for(p = 0; p < nmp->c->nb_nodes; p++)					{					  pthread_mutex_lock(&nmp->c->nodes[p].lock);					  nmp->c->nodes[p].nstatus = STATUS_EXITING;					  if(nmp->c->nodes[p].sock != -1)					    {					      ack=1;					      usend(nmp->c->nodes[p].sock, &ack, sizeof(int), 0);					      nmp->c->nodes[p].nstatus = STATUS_WAITING_FOR_CLOSE;					    }					  pthread_mutex_unlock(&nmp->c->nodes[p].lock);					}				    }				  break;				}			    }			}		      buffer[c] = 0;		      printf("%s", buffer);		    }		}	      else		FD_SET(nmp->nodes[i]->sock, &rfs);	    }	}      if(allsamestatus(nmp->c, STATUS_EXITED))	{	  printi("finalize", "all rank handled by %d has the status EXITED. Game Over", 		 pthread_self());	  break;	}    }  printi("nodes_monitor", "nodes monitor number %d returns (no more node to monitor)",	 pthread_self());    return NULL;}static void arguments_set_default(struct gengetopt_args_info *args,				  char *cmd){  if(!args->working_dir_given)    args->working_dir_arg = get_current_dir_name();  if(!args->helpers_dir_given)    {      char *cwd = get_current_dir_name();      char *c, *dirname = strdup(cmd);      int fd;      for(c = dirname + strlen(dirname); (c > dirname) && (*c != '/'); c--) ;      if(c == dirname)	{	  dirname[0] = '.';	  dirname[1] = 0;	}      else	*c = 0;      fd = open(dirname, O_RDONLY);      if(fd < 0)	{	  fprintf(stderr, "%s is not a directory, or acces permission error (%s)\n",		  dirname, strerror(errno));	  exit(0);	}      if(fchdir(fd) < 0)	{	  fprintf(stderr, "could not determine absolute path of %s (%s)\n",		  dirname, strerror(errno));	  exit(0);	}      free(dirname);      close(fd);            args->helpers_dir_arg = get_current_dir_name();      chdir(cwd);      free(cwd);    }  if(!args->jobid_given)    args->jobid_arg = getpid();}int main(int argc, char *argv[]){  struct gengetopt_args_info args;  struct context *c;  pthread_t accept_list_id, stdio_id, stder_id, servers_monitor_id;  struct node_monitor_parameter_t **nmps;  int i, cmdlen;  cmdline_parser_init (&args);  if(cmdline_parser(argc, argv, &args) != 0)    {      extended_help();      exit(0);    }  if(args.help_given)    {      extended_help();      exit(0);    }  if(args.inputs_num == 0)    {      printf("\nNo command given\n\n");      extended_help();      exit(0);    }  arguments_set_default(&args, argv[0]);  if(args.debug_given)    initDebug("vrun", args.debug_arg);  else    initDebug("vrun", "");  c = build_context(&args);  if(c == NULL)    exit(0);  cmdlen = 16;  for(i = 0; i < args.inputs_num; i++)    cmdlen += 1 + strlen(args.inputs[i]);  c->command = (char*)calloc(1, cmdlen);  for(i = 0; i < args.inputs_num; i++)    sprintf(c->command+strlen(c->command), "%s ", args.inputs[i]);  c->command[strlen(c->command)-1] = 0;  siginterrupt(SIGCHLD, 1);  signal(SIGCHLD, on_child_exit);  siginterrupt(SIGPIPE, 0);  signal(SIGPIPE, SIG_IGN);  pthread_create(&stdio_id, NULL, stdio_thread, (void*)c);  pthread_create(&stder_id, NULL, stder_thread, (void*)c);  pthread_create(&accept_list_id, NULL, accept_list_thread, (void*)c);  spawn_servers(c);  pthread_create(&servers_monitor_id, NULL, servers_monitor, (void*)c);  /*   * now, let's distribute the servers among the nodes   * simple round-robin strategy.   */  for(i =0; i < c->nb_nodes; i++)    {      if( c->sc )	c->nodes[i].nsc = c->sc;      if( c->nb_cs > 0 )	c->nodes[i].ncs = &(c->cs[i % c->nb_cs]);      if( c->nb_el > 0 )	c->nodes[i].nel = &(c->el[i % c->nb_el]);    }  /*    * no need to spawn the nodes before creating the monitors:    * the monitors will spawn them anyway   */  nmps = (struct node_monitor_parameter_t **)calloc(c->nb_nodes, sizeof(struct node_monitor_parameter_t));  for(i = 0; i < c->nb_nodes; i++)    {      nmps[i] = (struct node_monitor_parameter_t *)calloc(1, sizeof(struct node_monitor_parameter_t));      nmps[i]->nb_nodes = 1;      nmps[i]->nodes = (struct service**)calloc(1, sizeof(struct service*));      nmps[i]->nodes[0] = &c->nodes[i];      nmps[i]->c = c;      pthread_create(&nmps[i]->monitor, NULL, nodes_monitor, (void*)nmps[i]);    }  /* the nodes will complete before the servers (hopefully) */  for(i = 0; i < c->nb_nodes; i++)    pthread_join(nmps[i]->monitor, NULL);  printi("finalized", "no more child. Cleaning up");  pthread_cancel(servers_monitor_id);  for(i = 0; i < c->nb_cs; i++)    if( c->cs[i].sock != -1)       {	close(c->cs[i].sock);	c->cs[i].sock = -1;      }  for(i = 0; i < c->nb_el; i++)    if(c->el[i].sock != -1)      {	close(c->el[i].sock);	c->el[i].sock = -1;      }  if(c->sc && c->sc->sock != -1)    {      close(c->sc->sock);      c->sc->sock = -1;    }  printi("finalized", "waiting for servers_monitor");  pthread_join(servers_monitor_id, NULL);  // Just to avoid some apparent buffer overflows in pthread_cancel ?!?  // pthread_cancel(accept_list_id);  close(c->listen_sock);  printi("finalized", "waiting for accept_list");  // pthread_join(accept_list_id, NULL);  printi("finalized", "waiting for stdio");  pthread_join(stdio_id, NULL);  close(c->stdio_sock);  printi("finalized", "waiting for stder");  pthread_join(stder_id, NULL);  close(c->stder_sock);  // Just to avoid some apparent buffer overflows in pthread_cancel ?!?  return 0;    printi("finalized", "freeing resources");  for(i = 0 ; i < c->nb_stables; i++)    free(c->stables[i]);  free(c->stables);  for(i = 0 ; i < c->nb_unstables; i++)    free(c->unstables[i]);  free(c->unstables);  for(i = 0; i < c->nb_nodes; i++)    {      free(nmps[i]->nodes);      free(nmps[i]);    }  free(nmps);  if(c->el) free(c->el);  if(c->cs) free(c->cs);  if(c->sc) free(c->sc);  free(c);  cmdline_parser_free (&args);  return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -