📄 vrun.c
字号:
static void *nodes_monitor(void *_nmp){ int i, r, u, n, c, p, ack, maxfd = -1, exited; struct node_monitor_parameter_t *nmp = (struct node_monitor_parameter_t*)_nmp; fd_set rfs; char *buffer = NULL; int bsize = 0; printi("nodes_monitor", "node_monitor %d is handling %d nodes", pthread_self(), nmp->nb_nodes); FD_ZERO(&rfs); for(;;) { /* Here, I spawn all non-running nodes in round-robin */ for(i = 0; i < nmp->nb_nodes; i++) { pthread_mutex_lock(&nmp->nodes[i]->lock); exited = (nmp->nodes[i]->nstatus == STATUS_EXITED); pthread_mutex_unlock(&nmp->nodes[i]->lock); if( (nmp->nodes[i]->sock == -1) && !exited ) { printi("nodes_monitor", "node %d (monitored by %d) is not exited but not alive.", nmp->nodes[i]->nrank, pthread_self()); for(;;) { u = lock_maximal_machine(nmp->c->unstables, nmp->c->nb_unstables); printi("nodes", "spawning node %d", i); if( nmp->nodes[i]->nstatus == STATUS_RESTART ) printi("nodes", "restarting node of rank %d", nmp->nodes[i]->nrank); if ( spawn(nmp->c, nmp->c->unstables[u], 0, VDAEMON, nmp->nodes[i], init_exchange_node) ) { printi("nodes_monitor", "choose %s for rank %d", nmp->c->unstables[u]->name, nmp->nodes[i]->nrank); pthread_mutex_unlock( &nmp->c->unstables[u]->lock ); /* Now, we give the array of nodes to the newly connected * node. We _have_ to put synchronization barrier * in order to ensure that a node at least (exactly one, * in fact) will have a complete list of all other nodes. */ for(n = 0; n < nmp->c->nb_nodes; n++) { pthread_mutex_lock(&nmp->c->nodes[n].lock); send_intparam(nmp->nodes[i]->sock, "node_rank", n); send_strparam(nmp->nodes[i]->sock, "node_ip", inet_ntoa(nmp->c->nodes[n].addr.sin_addr)); send_intparam(nmp->nodes[i]->sock, "node_port", ntohs(nmp->c->nodes[n].addr.sin_port)); pthread_mutex_unlock(&nmp->c->nodes[n].lock); } if( send_strparam(nmp->nodes[i]->sock, "EOF", "EOF") < 0 ) { /* Argh, big failure */ close(nmp->nodes[i]->sock); nmp->nodes[i]->sock = -1; memset(&nmp->nodes[i]->addr, 0, sizeof(struct sockaddr_in)); continue; } /* starting from now, we consider that the node started at least one */ pthread_mutex_lock(&nmp->nodes[i]->lock); nmp->nodes[i]->nstatus = STATUS_RESTART; pthread_mutex_unlock(&nmp->nodes[i]->lock); FD_SET(nmp->nodes[i]->sock, &rfs); if(nmp->nodes[i]->sock > maxfd) maxfd = nmp->nodes[i]->sock; break; } else { printi("nodes_monitor", "for an unknown reason, %s did not fit for node of rank %d", nmp->c->unstables[u]->name, nmp->nodes[i]->nrank); pthread_mutex_unlock( &nmp->c->unstables[u]->lock ); } } } } for(i = 0; i < nmp->nb_nodes; i++) { pthread_mutex_lock(&nmp->nodes[i]->lock); if( nmp->nodes[i]->nstatus == STATUS_EXITING ) { printi("finalize", "sending acknowledge of exit to %d", nmp->nodes[i]->nrank); usend(nmp->nodes[i]->sock, &ack, sizeof(int), 0); nmp->nodes[i]->nstatus = STATUS_WAITING_FOR_CLOSE; } pthread_mutex_unlock(&nmp->nodes[i]->lock); } while( maxfd > -1 ) { /* and now, the select, which detects deads and ends */ printi("nodes_monitor", "%d enters in select (maxfd = %d)", pthread_self(), maxfd); r = select(maxfd+1, &rfs, NULL, NULL, NULL); if(r < 0) { if(errno == EAGAIN) printi("nodes_monitor", "we received a AGAIN error. Looping"); else if(errno == EINTR) printi("nodes_monitor", "we receveid some signal. Looping"); else qerror("monitor %d : select returned with error", pthread_self()); } else break; } for(i = 0; i < nmp->nb_nodes; i++) { if(nmp->nodes[i]->sock != -1) { if( FD_ISSET(nmp->nodes[i]->sock, &rfs) ) { r=-1; ioctl(nmp->nodes[i]->sock, FIONREAD, &r); if(r <= 0) { pthread_mutex_lock(&nmp->nodes[i]->lock); if( nmp->nodes[i]->nstatus == STATUS_WAITING_FOR_CLOSE ) { nmp->nodes[i]->nstatus = STATUS_EXITED; printi("finalize", "node number %d enter STATUS_EXITED", nmp->nodes[i]->nrank); } pthread_mutex_unlock(&nmp->nodes[i]->lock); printi("finalize", "requesting the stdio and err thread to check the status, in case stdio descriptor closed before dispatcher fd"); write(nmp->c->stdio_comm[1], &r, sizeof(int)); write(nmp->c->stder_comm[1], &r, sizeof(int)); err: printi("nodes_monitor", "node %d broke connection", i); close(nmp->nodes[i]->sock); FD_CLR(nmp->nodes[i]->sock, &rfs); printi("nodes_monitor", "%d removes %d from the rfs", pthread_self(), nmp->nodes[i]->sock); nmp->nodes[i]->sock = -1; /* recompute maxfd */ maxfd = -1; for(p = 0; p < nmp->nb_nodes; p++) if( nmp->nodes[p]->sock > maxfd ) maxfd = nmp->nodes[p]->sock; printi("nodes_monitor", "%d's maxfd = %d", pthread_self(), maxfd); pthread_mutex_lock(&nmp->nodes[i]->lock); memset(&nmp->nodes[i]->addr, 0, sizeof(struct sockaddr_in)); pthread_mutex_unlock(&nmp->nodes[i]->lock); } else { if(bsize < r+1) { bsize = r+1; buffer = (char*)realloc(buffer, bsize); } r = urecv(nmp->nodes[i]->sock, buffer, r, 0); if(r < 0) goto err; printi("nodes_monitor", "received %d bytes", r); for(c = 0; c < r; c++) { if( !isprint(buffer[c]) ) { int *msg = (int*)(buffer + c); if( ntohl(*msg) == FINALIZE_MSG ) { printi("nodes_monitor", "process %d of rank %d has finished", i, nmp->nodes[i]->nrank); pthread_mutex_lock(&nmp->nodes[i]->lock); nmp->nodes[i]->nstatus = STATUS_FINISHED; pthread_mutex_unlock(&nmp->nodes[i]->lock); if(allsamestatus(nmp->c, STATUS_FINISHED)) { printi("finalize", "all rank has the status FINISHED. Passing to status EXITING"); for(p = 0; p < nmp->c->nb_nodes; p++) { pthread_mutex_lock(&nmp->c->nodes[p].lock); nmp->c->nodes[p].nstatus = STATUS_EXITING; if(nmp->c->nodes[p].sock != -1) { ack=1; usend(nmp->c->nodes[p].sock, &ack, sizeof(int), 0); nmp->c->nodes[p].nstatus = STATUS_WAITING_FOR_CLOSE; } pthread_mutex_unlock(&nmp->c->nodes[p].lock); } } break; } } } buffer[c] = 0; printf("%s", buffer); } } else FD_SET(nmp->nodes[i]->sock, &rfs); } } if(allsamestatus(nmp->c, STATUS_EXITED)) { printi("finalize", "all rank handled by %d has the status EXITED. Game Over", pthread_self()); break; } } printi("nodes_monitor", "nodes monitor number %d returns (no more node to monitor)", pthread_self()); return NULL;}static void arguments_set_default(struct gengetopt_args_info *args, char *cmd){ if(!args->working_dir_given) args->working_dir_arg = get_current_dir_name(); if(!args->helpers_dir_given) { char *cwd = get_current_dir_name(); char *c, *dirname = strdup(cmd); int fd; for(c = dirname + strlen(dirname); (c > dirname) && (*c != '/'); c--) ; if(c == dirname) { dirname[0] = '.'; dirname[1] = 0; } else *c = 0; fd = open(dirname, O_RDONLY); if(fd < 0) { fprintf(stderr, "%s is not a directory, or acces permission error (%s)\n", dirname, strerror(errno)); exit(0); } if(fchdir(fd) < 0) { fprintf(stderr, "could not determine absolute path of %s (%s)\n", dirname, strerror(errno)); exit(0); } free(dirname); close(fd); args->helpers_dir_arg = get_current_dir_name(); chdir(cwd); free(cwd); } if(!args->jobid_given) args->jobid_arg = getpid();}int main(int argc, char *argv[]){ struct gengetopt_args_info args; struct context *c; pthread_t accept_list_id, stdio_id, stder_id, servers_monitor_id; struct node_monitor_parameter_t **nmps; int i, cmdlen; cmdline_parser_init (&args); if(cmdline_parser(argc, argv, &args) != 0) { extended_help(); exit(0); } if(args.help_given) { extended_help(); exit(0); } if(args.inputs_num == 0) { printf("\nNo command given\n\n"); extended_help(); exit(0); } arguments_set_default(&args, argv[0]); if(args.debug_given) initDebug("vrun", args.debug_arg); else initDebug("vrun", ""); c = build_context(&args); if(c == NULL) exit(0); cmdlen = 16; for(i = 0; i < args.inputs_num; i++) cmdlen += 1 + strlen(args.inputs[i]); c->command = (char*)calloc(1, cmdlen); for(i = 0; i < args.inputs_num; i++) sprintf(c->command+strlen(c->command), "%s ", args.inputs[i]); c->command[strlen(c->command)-1] = 0; siginterrupt(SIGCHLD, 1); signal(SIGCHLD, on_child_exit); siginterrupt(SIGPIPE, 0); signal(SIGPIPE, SIG_IGN); pthread_create(&stdio_id, NULL, stdio_thread, (void*)c); pthread_create(&stder_id, NULL, stder_thread, (void*)c); pthread_create(&accept_list_id, NULL, accept_list_thread, (void*)c); spawn_servers(c); pthread_create(&servers_monitor_id, NULL, servers_monitor, (void*)c); /* * now, let's distribute the servers among the nodes * simple round-robin strategy. */ for(i =0; i < c->nb_nodes; i++) { if( c->sc ) c->nodes[i].nsc = c->sc; if( c->nb_cs > 0 ) c->nodes[i].ncs = &(c->cs[i % c->nb_cs]); if( c->nb_el > 0 ) c->nodes[i].nel = &(c->el[i % c->nb_el]); } /* * no need to spawn the nodes before creating the monitors: * the monitors will spawn them anyway */ nmps = (struct node_monitor_parameter_t **)calloc(c->nb_nodes, sizeof(struct node_monitor_parameter_t)); for(i = 0; i < c->nb_nodes; i++) { nmps[i] = (struct node_monitor_parameter_t *)calloc(1, sizeof(struct node_monitor_parameter_t)); nmps[i]->nb_nodes = 1; nmps[i]->nodes = (struct service**)calloc(1, sizeof(struct service*)); nmps[i]->nodes[0] = &c->nodes[i]; nmps[i]->c = c; pthread_create(&nmps[i]->monitor, NULL, nodes_monitor, (void*)nmps[i]); } /* the nodes will complete before the servers (hopefully) */ for(i = 0; i < c->nb_nodes; i++) pthread_join(nmps[i]->monitor, NULL); printi("finalized", "no more child. Cleaning up"); pthread_cancel(servers_monitor_id); for(i = 0; i < c->nb_cs; i++) if( c->cs[i].sock != -1) { close(c->cs[i].sock); c->cs[i].sock = -1; } for(i = 0; i < c->nb_el; i++) if(c->el[i].sock != -1) { close(c->el[i].sock); c->el[i].sock = -1; } if(c->sc && c->sc->sock != -1) { close(c->sc->sock); c->sc->sock = -1; } printi("finalized", "waiting for servers_monitor"); pthread_join(servers_monitor_id, NULL); // Just to avoid some apparent buffer overflows in pthread_cancel ?!? // pthread_cancel(accept_list_id); close(c->listen_sock); printi("finalized", "waiting for accept_list"); // pthread_join(accept_list_id, NULL); printi("finalized", "waiting for stdio"); pthread_join(stdio_id, NULL); close(c->stdio_sock); printi("finalized", "waiting for stder"); pthread_join(stder_id, NULL); close(c->stder_sock); // Just to avoid some apparent buffer overflows in pthread_cancel ?!? return 0; printi("finalized", "freeing resources"); for(i = 0 ; i < c->nb_stables; i++) free(c->stables[i]); free(c->stables); for(i = 0 ; i < c->nb_unstables; i++) free(c->unstables[i]); free(c->unstables); for(i = 0; i < c->nb_nodes; i++) { free(nmps[i]->nodes); free(nmps[i]); } free(nmps); if(c->el) free(c->el); if(c->cs) free(c->cs); if(c->sc) free(c->sc); free(c); cmdline_parser_free (&args); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -