📄 vrun.c
字号:
slen = sizeof(struct sockaddr_in); if(getsockname(c->stder_sock, (struct sockaddr*)&server_stder, &slen) < 0) qerror("get stder_sock name"); memcpy(&c->stder, &c->myself, sizeof(struct sockaddr_in)); c->stder.sin_port = server_stder.sin_port; printi("init", "STDERR vrun bound on %s:%d", inet_ntoa(c->stder.sin_addr), ntohs(c->stder.sin_port)); if( socketpair(AF_UNIX, SOCK_DGRAM, 0, c->stder_comm) == -1 ) qerror("stder_comm creation");#ifndef NO_CHECKPOINT if( args->n_cs_given ) c->nb_cs = args->n_cs_arg; else c->nb_cs = 1 + (args->n_procs_arg / 5); c->cs = (struct service*)calloc(c->nb_cs, sizeof(struct service)); for(i = 0; i < c->nb_cs; i++) { c->cs[i].kind = KIND_SERVER; c->cs[i].sock = -1; pthread_mutex_init(&c->cs[i].lock, NULL); } c->sc = (struct service*)calloc(1, sizeof(struct service)); c->sc->kind = KIND_SERVER; c->sc->sock = -1; pthread_mutex_init(&c->sc->lock, NULL);#endif#ifndef NO_EVENTLOGGER c->nb_el = args->n_el_arg; c->el = (struct service*)calloc(c->nb_el, sizeof(struct service)); for(i = 0; i < c->nb_el; i++) { c->el[i].kind = KIND_SERVER; c->el[i].sock = -1; pthread_mutex_init(&c->el[i].lock, NULL); }#endif pthread_mutex_init(&c->accept_list_lock, NULL); pthread_cond_init(&c->accept_list_cond, NULL); c->nb_nodes = args->n_procs_arg; c->nodes = (struct service*)calloc(c->nb_nodes, sizeof(struct service)); for(i = 0; i < c->nb_nodes; i++) { c->nodes[i].kind = KIND_NODE; c->nodes[i].nrank = i; c->nodes[i].nstatus = STATUS_INITIALSTART; c->nodes[i].sock = -1; pthread_mutex_init(&c->nodes[i].lock, NULL); } return c;}static int send_intparam(int s, char *name, int value){ char line[strlen(name)+20]; sprintf(line, "%s=%d\n", name, value); if(usend(s, line, strlen(line), 0) != strlen(line)) { printe("error while sending %s (%s)", name, line); return -1; } return 0;}static int send_strparam(int s, char *name, char *value){ char line[strlen(name)+strlen(value)+4]; sprintf(line, "%s=%s\n", name, value); if(usend(s, line, strlen(line), 0) != strlen(line)) { printe("error while sending %s (%s)", name, line); return -1; } return 0;}/** * initial exchange for establishing the connection with a server. * s : the socket for talking * c : the context... * port : IN/OUT value. * IN : the port to use (0) for autoselect. * OUT : the value chosen. */static int init_exchange_server(struct service *s, struct context *c, in_port_t *port){ int r; in_port_t p; /* !! PLEASE !! * for sanity reasons, keep this always synchronized with * server.ggo -- self explanatory * and server.c -- search for the !! PLEASE !! commentary */ p = *port; send_intparam(s->sock, "n_procs", c->args->n_procs_arg); send_intparam(s->sock, "jobid", c->args->jobid_arg); send_intparam(s->sock, "port", p); send_strparam(s->sock, "working_dir", c->args->working_dir_arg); send_strparam(s->sock, "tmp_dir", c->args->tmp_dir_arg); send_strparam(s->sock, "helpers_dir", c->args->helpers_dir_arg); send_strparam(s->sock, "master_sched", c->args->master_sched_given?c->args->master_sched_arg:""); send_intparam(s->sock, "ckpt_timeout", c->args->ckpt_timeout_given?c->args->ckpt_timeout_arg:DEFAULT_CKPT_FREQ); send_strparam(s->sock, "ckpt_strategy", c->args->ckpt_strategy_arg); send_strparam(s->sock, "el_strategy", c->args->el_strategy_arg); send_intparam(s->sock, "stdio_port", ntohs(c->stdio.sin_port)); send_intparam(s->sock, "stder_port", ntohs(c->stder.sin_port)); send_strparam(s->sock, "debug", c->args->debug_given?c->args->debug_arg:""); r = send_strparam(s->sock, "EOF", "EOF"); if(r < 0) return r; printi("server", "waiting for the port (%d bytes)", sizeof(in_port_t)); if( (r = urecv(s->sock, &p, sizeof(in_port_t), MSG_WAITALL)) != sizeof(in_port_t)) { perror("recv"); printe("error while receiving port number (%d bytes received)", r); return -1; } printi("server", "port negociated : %u", ntohs(p)); *port = p; return 0;}static void on_child_exit(int s){ siginterrupt(SIGCHLD, 1); signal(SIGCHLD, on_child_exit); return;}static int accept_list_pop(struct context *c, in_addr_t addr, struct sockaddr_in *remote){ int i; int ret; printi("accept_list", "%d want to pop a connection from %s", pthread_self(), inet_ntoa(*(struct in_addr*)&addr)); pthread_mutex_lock(&c->accept_list_lock); test_again: printi("accept_list", "%d is searching", pthread_self()); for(i = 0; i < c->accept_list_size; i++) { if( !memcmp(&addr, &c->accept_list[i].remote.sin_addr, sizeof(in_addr_t)) ) break; } if(i == c->accept_list_size) { printi("accept_list", "%d did not found. It is waiting", pthread_self()); pthread_cond_wait(&c->accept_list_cond, &c->accept_list_lock); goto test_again; } ret = c->accept_list[i].fd; memcpy(remote, &c->accept_list[i].remote, sizeof(struct sockaddr_in)); printi("accept_list", "%d found on %d/%d : %s:%d on fd %d", pthread_self(), i, c->accept_list_size, inet_ntoa(remote->sin_addr), ntohs(remote->sin_port), ret); memcpy(&c->accept_list[i], &c->accept_list[c->accept_list_size-1], sizeof(struct accept_list_elt)); c->accept_list_size--; pthread_mutex_unlock(&c->accept_list_lock); return ret;}typedef int (*init_exchange_t)(struct service *s, struct context *, in_port_t *);/* spawn return 1 if spawning succeeded * 0 if spawning failed * LOCK ON M IS ASSUMED TO BE TAKEN */static int spawn(struct context *c, struct machine *m, short suggested_port, char *cmdline, struct service *s, init_exchange_t init){ in_port_t port; struct sockaddr_in remote; int status; char command[strlen(c->args->rsh_arg)+strlen(m->name)+ strlen(c->args->working_dir_arg)+ strlen(c->args->helpers_dir_arg)+ strlen(cmdline)+128]; sprintf(command, "%s %s %s/%s --runtime-ip=%s --runtime-port=%d", c->args->rsh_arg, m->name, c->args->helpers_dir_arg, cmdline, inet_ntoa(c->myself.sin_addr), ntohs(c->myself.sin_port)); if(fork()) { if( (s->sock = accept_list_pop(c, m->ip, &remote)) < 0 ) { printi("spawn", "error while accepting connection from server %s (%s)", cmdline, strerror(errno)); m->missed++; printi("spawn", "waiting for son to die"); wait(&status); printi("spawn", "returning with fail"); return 0; } printi("spawn", "waiting for son to die"); wait(&status); printi("spawn", "son is dead, checking status"); if(WIFEXITED(status) && (WEXITSTATUS(status)==0)) { printi("spawn", "everything did go fine, trying to exchange"); port = suggested_port; // Hack : port is sent in ASCII and received in BINARY // thus, here it is in machine order if( init(s, c, &port) < 0 ) { printi("spawn", "error while handshaking"); goto err_close; } // and here, it is in network order pthread_mutex_lock(&s->lock); memcpy(&s->addr, &remote, sizeof(struct sockaddr_in)); s->addr.sin_port = port; pthread_mutex_unlock(&s->lock); m->used++; printi("spawn", "%s is now used %d times and free to be used again", m->name, m->used); return 1; } /* else */ printi("spawn", "%s %s with error %d (%s)", cmdline, WIFEXITED(status)?"exited":"did not exit", WEXITSTATUS(status), strerror(errno)); err_close: close(s->sock); s->sock = -1; memset(&s->addr, 0, sizeof(struct sockaddr_in)); m->missed++; printi("spawn", "%s missed (it missed %d times since begining). Returning with fail", m->name, m->missed); return 0; } else { /* son */ char *cmd = strdup(command); char **argv; int argc = 2; int i; for(i = 0; cmd[i]; i++) if(cmd[i] == ' ') argc++; argv = (char**)malloc(argc * sizeof(char *)); argv[0] = strtok(cmd, " "); for(i = 1; i < argc; i++) argv[i] = strtok(NULL, " "); printi("spawn", "execing : %s", command); execv(argv[0], argv); printe("exec(%s)", command); exit(1); }}static void extended_help(){ cmdline_parser_print_help(); printf(" COMMAND is mandatory. If you have command arguments, precede COMMAND by '--'.\n"); #ifdef NO_CHECKPOINT printf(" checkpoint server and checkpoint scheduler options are silently ignored\n");#endif#ifdef NO_EVENTLOGGER printf(" event logger options are silently ignored\n");#endif}static void spawn_servers(struct context *c){ int i, j, k; /* hack : spawn_servers is monothread * (it could be multi-thread. Evaluate the performance gain ?) * and we _should_ not see missed machines in the stables array * so, we don't sort the array inline, and we assume that no array * indice will change in this function. Also, we don't take locks. */ i = 0; if( c->sc ) { for(; i < c->nb_stables; i++) { printi("servers", "spawning sc"); if( spawn(c, c->stables[i], c->args->sc_port_given ? c->args->sc_port_arg : 0, CSCHED, c->sc, init_exchange_server) ) { printi("servers", "sc is %s", c->stables[i]->name); break; } printw("%s is in the stable machine list, but is not responding...", c->stables[i]->name); } if( i == c->nb_stables ) { printq("There is no responding stable machines! (%d tried)", c->nb_stables); } } for(j = 0; j < c->nb_cs; j++) { for(k = 0; k < 2*c->nb_stables; k++) { i = (i + 1) % c->nb_stables; printi("servers", "spawning cs %d", j); if( spawn(c, c->stables[i], c->args->cs_port_given ? c->args->cs_port_arg : 0, CHKPTSERV, &(c->cs[j]), init_exchange_server) ) { printi("servers", "cs[%d] is %s", j, c->stables[i]->name); break; } printw("%s is in the stable machine list, but is not responding...", c->stables[i]->name); } if(k == 2*c->nb_stables) { printq("There is not enough responding stable machines for launching CS#%d", j); } } for(j = 0; j < c->nb_el; j++) { for(k = 0; k < 2*c->nb_stables; k++) { printi("servers", "spawning el %d", j); i = (i + 1) % c->nb_stables; if( spawn(c, c->stables[i], c->args->el_port_given ? c->args->el_port_arg : 0, EVENTLOGGER, &(c->el[j]), init_exchange_server) ) { printi("servers", "el[%d] is %s", j, c->stables[i]->name); break; } printw("%s is in the stable machine list, but is not responding...", c->stables[i]->name); } if(k == 2*c->nb_stables) { printq("There is not enough responding stable machines for launching CS#%d", j); } } /* context update: if stable is a reverse copy of unstable, or if * stables machines are also in the unstable list, update the load counter */ for(i = 0; (i < c->nb_stables); i++) { for(j = 0; j < c->nb_unstables; j++) if(!strcmp( c->stables[i]->name, c->unstables[j]->name ) ) c->unstables[j]->used += c->stables[i]->used; }}static int recompute_servers_set(void *_c, fd_set *set){ struct context *c = (struct context*)_c; int maxfd, i; FD_ZERO(set); maxfd = -1; for(i = 0; i < c->nb_cs; i++) if(c->cs[i].sock != -1) { FD_SET(c->cs[i].sock, set); if( maxfd < c->cs[i].sock ) maxfd = c->cs[i].sock; } for(i = 0; i < c->nb_el; i++) if(c->el[i].sock != -1) { FD_SET(c->el[i].sock, set); if( maxfd < c->el[i].sock ) maxfd = c->el[i].sock; } if(c->sc && (c->sc->sock != -1)) { FD_SET(c->sc->sock, set); if( maxfd < c->sc->sock ) maxfd = c->sc->sock; } printi("select", "maxfd = %d", maxfd); return maxfd;}static void *servers_monitor(void *_c){#define on_server_error(serverid, s) do { \ if(r == 0)\ printw(serverid " closed the connection. It died most certainly. Expect errors", i); \ else \ printw("data on control port of " serverid " (%d bytes to read). Closing connection. Expect errors", r); \ close(s); \ s = -1; \ maxfd = recompute_servers_set(c, &rfs); \} while(0) struct context *c = (struct context*)_c; fd_set rfs; int i, maxfd, r; maxfd = recompute_servers_set(c, &rfs); for(;;) { if(maxfd == -1) break; r = select(maxfd+1, &rfs, NULL, NULL, NULL); if(r == -1) { if( (errno == EINTR) || (errno == EAGAIN) ) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -