⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 vrun.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
📖 第 1 页 / 共 4 页
字号:
	      printi("servers_monitor", "select interrupted (%s)", strerror(errno));	      continue;	    }	}      for(i = 0; i < c->nb_cs; i++)	{	  if( (c->cs[i].sock != -1) && FD_ISSET(c->cs[i].sock, &rfs) )	    {	      ioctl(c->cs[i].sock, FIONREAD, &r);	      on_server_error("checkpoint server number %d", c->cs[i].sock);	    }	  else	    if(c->cs[i].sock != -1)	      FD_SET(c->cs[i].sock, &rfs);	}      for(i = 0; i < c->nb_el; i++)	{	  if( (c->el[i].sock != -1) && FD_ISSET(c->el[i].sock, &rfs) )	    {	      ioctl(c->el[i].sock, FIONREAD, &r);	      on_server_error("event logger", c->el[i].sock);	    }	  else	    if(c->el[i].sock != -1)	      FD_SET(c->el[i].sock, &rfs);	}      if(c->sc)	{	  if( (c->sc->sock != -1) && FD_ISSET(c->sc->sock, &rfs) )	    {	      ioctl(c->sc->sock, FIONREAD, &r);	      on_server_error("checkpoint scheduler", c->sc->sock);	    }	  else	    if(c->sc->sock != -1)	      FD_SET(c->sc->sock, &rfs);	}    }  printi("servers_monitor", "no more servers to monitor, returning.");#undef on_server_error  return NULL;}static int machine_greater(const struct machine *a, const struct machine *b){  return ( (a->missed < b->missed) || (a->used < b->used) );}static int lock_maximal_machine(struct machine ** array, int size){  int i_max, i_max_base, i;    /* first find a free machine   * We do it as long as necessary. An extreme case would be that   * all the other threads have taken control of all the machines    * (thus, there are less machiens than nodes). This may run forever   * in a very unstable network. We take the chance.   */  i_max = 0;  for(;;)    {      if( pthread_mutex_trylock( &array[i_max]->lock ) == EBUSY )	{	  i_max = (i_max + 1) % size;	  continue;	}      /* victory ! */      break;    }  i_max_base = i_max;  /* now, we compare with all the other free */  for(i = 1; i < size; i++)    {      if( pthread_mutex_trylock( &array[(i+i_max_base) % size]->lock ) == EBUSY )	continue;      if( machine_greater( array[(i+i_max_base) % size], array[i_max] ) )	{	  pthread_mutex_unlock(&array[i_max]->lock);	  i_max = (i + i_max_base) % size;	}      else	pthread_mutex_unlock(&array[(i+i_max_base) % size]->lock);    }  /* array[i_max] is locked. */  return i_max;}/** * initial exchange for establishing the connection with a server. *   s : the socket for talking *   c : the context... *   port : IN/OUT value.  *          IN  : the port to use (0) for autoselect.  *          OUT : the value chosen. */static int init_exchange_node(struct service *s, struct context *c, in_port_t *port){  int r;  in_port_t p;  /* !! PLEASE(2) !!   * for sanity reasons, keep this always synchronized with    *     ../daemon/node.ggo -- self explanatory   * and ../daemon/main.c   -- search for the !! PLEASE !! commentary   */  p = *port;  send_intparam(s->sock, "n_procs", c->args->n_procs_arg);  send_intparam(s->sock, "jobid", c->args->jobid_arg);  send_intparam(s->sock, "port", p);  send_strparam(s->sock, "working_dir", c->args->working_dir_arg);  send_strparam(s->sock, "tmp_dir", c->args->tmp_dir_arg);  send_strparam(s->sock, "helpers_dir", c->args->helpers_dir_arg);  send_strparam(s->sock, "ckpt_server_ip", 		s->ncs?inet_ntoa(s->ncs->addr.sin_addr):"0.0.0.0");  send_intparam(s->sock, "ckpt_server_port",		s->ncs?ntohs(s->ncs->addr.sin_port):0);  send_intparam(s->sock, "ckpt_use_local_copy", c->args->ckpt_use_local_copy_flag);  send_strparam(s->sock, "ckpt_scheduler_ip", 		s->nsc?inet_ntoa(s->nsc->addr.sin_addr):"0.0.0.0");  send_intparam(s->sock, "ckpt_scheduler_port",		s->nsc?ntohs(s->nsc->addr.sin_port):0);  send_strparam(s->sock, "event_logger_ip", 		s->nel?inet_ntoa(s->nel->addr.sin_addr):"0.0.0.0");  send_intparam(s->sock, "event_logger_port",		s->nel?ntohs(s->nel->addr.sin_port):0);  send_intparam(s->sock, "rank", s->nrank);  send_intparam(s->sock, "restart", (s->nstatus == STATUS_RESTART));  send_strparam(s->sock, "command", c->command);  send_intparam(s->sock, "additional_stats", 0);  send_strparam(s->sock, "master_sched", c->args->master_sched_given?c->args->master_sched_arg:"");  send_intparam(s->sock, "stdio_port", ntohs(c->stdio.sin_port));  send_intparam(s->sock, "stder_port", ntohs(c->stder.sin_port));  send_strparam(s->sock, "debug", c->args->debug_given?c->args->debug_arg:"");  r = send_strparam(s->sock, "EOF", "EOF");  if(r < 0)    return r;  printi("node", "waiting for the port (%d bytes)", sizeof(in_port_t));  if( (r = urecv(s->sock, &p, sizeof(in_port_t), MSG_WAITALL)) != sizeof(in_port_t))    {       printe("error while receiving port number (%d bytes received)", r);       return -1;    }    printi("node", "port negociated : %u", ntohs(p));  *port = p;  return 0;}static int allsamestatus(struct context *c, int s){  int i;  int r = 1;  for(i = 0; i < c->nb_nodes; i++)    {      pthread_mutex_lock(&c->nodes[i].lock);      r &= c->nodes[i].nstatus == s;      pthread_mutex_unlock(&c->nodes[i].lock);    }  return r;}static void *stdio_thread(void *_c){  struct context *c = (struct context *)_c;  struct pollfd *polls;  int nbpolls, maxpolls;  int i, fd, rd;  socklen_t rlen;  struct sockaddr_in remote;  char *buffer = NULL;  int blen = 0;#ifdef DEBUG  char dbg_select[128];#endif    maxpolls = c->nb_nodes + c->nb_cs + c->nb_el + 5;  nbpolls  = 0;  polls = (struct pollfd *)calloc(maxpolls, sizeof(struct pollfd));  polls[nbpolls].fd = c->stdio_sock;  polls[nbpolls].events = POLLIN;  nbpolls++;  polls[nbpolls].fd = c->stdio_comm[0];  polls[nbpolls].events = POLLIN;  nbpolls++;  for(;;)    {      printi("stdio_poll", "testing the status");      if( allsamestatus(c, STATUS_EXITED) )	break;#ifdef DEBUG      snprintf(dbg_select, 128, "polls=[");      for(i = 0; i < nbpolls; i++)	snprintf(dbg_select + strlen(dbg_select), 128-strlen(dbg_select), "%d:%02x:%02x ", 		 polls[i].fd, polls[i].events, polls[i].revents);      snprintf(dbg_select + strlen(dbg_select), 128-strlen(dbg_select), "]");      printi("stdio_select", "polling %s", dbg_select);#endif      rlen = poll(polls, nbpolls, -1);      pthread_testcancel();      if(rlen < 0)	{	  printe("error in polling of stdio_thread");	  continue;	}            i = 0;      if( polls[i++].revents & POLLIN )	{	  printi("stdio_select", "connection socket signals new connection");	  rlen = sizeof(remote);	  fd = accept(c->stdio_sock, (struct sockaddr*)&remote, &rlen);	  printi("stdio_select", "connection accepted : new socket is %d = %s:%d", 		 fd, inet_ntoa(remote.sin_addr), ntohs(remote.sin_port));	  if(fd < 0)	    printe("stdio_thread accepting connection");	  else	    {	      polls[nbpolls].fd = fd;	      polls[nbpolls].events = POLLIN;	      polls[nbpolls].revents = 0;	      nbpolls++;	    }	}      if( polls[i++].revents & POLLIN )	{	  printi("stdio_select", "Other thread requests to check the status. Will do that next pass");	  read(c->stdio_comm[0], &rd, sizeof(int));	}      for(; i<nbpolls; i++)	{	  if( polls[i].revents & POLLIN )	    {	      if(ioctl(polls[i].fd, FIONREAD, &rlen) < 0)		{		  printe("stdio_thread getting size of data to read");		  goto error_on_nc;		}	      	      if(rlen == 0)		{		  printi("stdio", "stdio number %d/%d disconnected", i, polls[i].fd);		  goto error_on_nc;		}	      	      if(rlen > blen)		{		  blen = rlen;		  buffer = (char*)realloc(buffer, blen);		}	      	      rd = urecv(polls[i].fd, buffer, rlen, 0);	      	      if(rd < rlen)		{		  printe("stdio receiving %d bytes of data (received %d)", rlen, rd);		  goto error_on_nc;		}	      	      rlen = write(1, buffer, rd);	      continue;	      	    error_on_nc:	      printi("stdio_select", "closing socket %d/%d", i, polls[i].fd);	      close(polls[i].fd);	      	      memcpy(&polls[i], &polls[nbpolls-1], sizeof(struct pollfd));	      i--;	      nbpolls--;	    }	}    }  printi("finalized_stdio", "bye!");  return NULL;}static void *stder_thread(void *_c){  struct context *c = (struct context *)_c;  struct pollfd *polls;  int nbpolls, maxpolls;  int i, fd, rd;  socklen_t rlen;  struct sockaddr_in remote;  char *buffer = NULL;  int blen = 0;#ifdef DEBUG  char dbg_select[128];#endif    maxpolls = c->nb_nodes + c->nb_cs + c->nb_el + 5;  nbpolls  = 0;  polls = (struct pollfd *)calloc(maxpolls, sizeof(struct pollfd));  polls[nbpolls].fd = c->stder_sock;  polls[nbpolls].events = POLLIN;  nbpolls++;  polls[nbpolls].fd = c->stder_comm[0];  polls[nbpolls].events = POLLIN;  nbpolls++;  for(;;)    {      printi("stder_poll", "testing the status");      if( allsamestatus(c, STATUS_EXITED) )	break;#ifdef DEBUG      snprintf(dbg_select, 128, "polls=[");      for(i = 0; i < nbpolls; i++)	snprintf(dbg_select + strlen(dbg_select), 128-strlen(dbg_select), "%d:%02x:%02x ", 		 polls[i].fd, polls[i].events, polls[i].revents);      snprintf(dbg_select + strlen(dbg_select), 128-strlen(dbg_select), "]");      printi("stder_select", "polling %s", dbg_select);#endif      rlen = poll(polls, nbpolls, -1);      pthread_testcancel();      if(rlen < 0)	{	  printe("error in polling of stder_thread");	  continue;	}            i = 0;      if( polls[i++].revents & POLLIN )	{	  printi("stder_select", "connection socket signals new connection");	  rlen = sizeof(remote);	  fd = accept(c->stder_sock, (struct sockaddr*)&remote, &rlen);	  printi("stder_select", "connection accepted : new socket is %d = %s:%d", 		 fd, inet_ntoa(remote.sin_addr), ntohs(remote.sin_port));	  if(fd < 0)	    printe("stder_thread accepting connection");	  else	    {	      polls[nbpolls].fd = fd;	      polls[nbpolls].events = POLLIN;	      polls[nbpolls].revents = 0;	      nbpolls++;	    }	}      if( polls[i++].revents & POLLIN )	{	  printi("stder_select", "Other thread requests to check the status. Will do that next pass");	  read(c->stder_comm[0], &rd, sizeof(int));	}      for(; i<nbpolls; i++)	{	  if( polls[i].revents & POLLIN )	    {	      if(ioctl(polls[i].fd, FIONREAD, &rlen) < 0)		{		  printe("stder_thread getting size of data to read");		  goto error_on_nc;		}	      	      if(rlen == 0)		{		  printi("stder", "stder number %d/%d disconnected", i, polls[i].fd);		  goto error_on_nc;		}	      	      if(rlen > blen)		{		  blen = rlen;		  buffer = (char*)realloc(buffer, blen);		}	      	      rd = urecv(polls[i].fd, buffer, rlen, 0);	      	      if(rd < rlen)		{		  printe("stder receiving %d bytes of data (received %d)", rlen, rd);		  goto error_on_nc;		}	      	      rlen = write(2, buffer, rd);	      continue;	      	    error_on_nc:	      printi("stder_select", "closing socket %d/%d", i, polls[i].fd);	      close(polls[i].fd);	      	      memcpy(&polls[i], &polls[nbpolls-1], sizeof(struct pollfd));	      i--;	      nbpolls--;	    }	}    }  printi("finalized_stder", "bye!");  return NULL;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -