📄 vrun.c
字号:
printi("servers_monitor", "select interrupted (%s)", strerror(errno)); continue; } } for(i = 0; i < c->nb_cs; i++) { if( (c->cs[i].sock != -1) && FD_ISSET(c->cs[i].sock, &rfs) ) { ioctl(c->cs[i].sock, FIONREAD, &r); on_server_error("checkpoint server number %d", c->cs[i].sock); } else if(c->cs[i].sock != -1) FD_SET(c->cs[i].sock, &rfs); } for(i = 0; i < c->nb_el; i++) { if( (c->el[i].sock != -1) && FD_ISSET(c->el[i].sock, &rfs) ) { ioctl(c->el[i].sock, FIONREAD, &r); on_server_error("event logger", c->el[i].sock); } else if(c->el[i].sock != -1) FD_SET(c->el[i].sock, &rfs); } if(c->sc) { if( (c->sc->sock != -1) && FD_ISSET(c->sc->sock, &rfs) ) { ioctl(c->sc->sock, FIONREAD, &r); on_server_error("checkpoint scheduler", c->sc->sock); } else if(c->sc->sock != -1) FD_SET(c->sc->sock, &rfs); } } printi("servers_monitor", "no more servers to monitor, returning.");#undef on_server_error return NULL;}static int machine_greater(const struct machine *a, const struct machine *b){ return ( (a->missed < b->missed) || (a->used < b->used) );}static int lock_maximal_machine(struct machine ** array, int size){ int i_max, i_max_base, i; /* first find a free machine * We do it as long as necessary. An extreme case would be that * all the other threads have taken control of all the machines * (thus, there are less machiens than nodes). This may run forever * in a very unstable network. We take the chance. */ i_max = 0; for(;;) { if( pthread_mutex_trylock( &array[i_max]->lock ) == EBUSY ) { i_max = (i_max + 1) % size; continue; } /* victory ! */ break; } i_max_base = i_max; /* now, we compare with all the other free */ for(i = 1; i < size; i++) { if( pthread_mutex_trylock( &array[(i+i_max_base) % size]->lock ) == EBUSY ) continue; if( machine_greater( array[(i+i_max_base) % size], array[i_max] ) ) { pthread_mutex_unlock(&array[i_max]->lock); i_max = (i + i_max_base) % size; } else pthread_mutex_unlock(&array[(i+i_max_base) % size]->lock); } /* array[i_max] is locked. */ return i_max;}/** * initial exchange for establishing the connection with a server. * s : the socket for talking * c : the context... * port : IN/OUT value. * IN : the port to use (0) for autoselect. * OUT : the value chosen. */static int init_exchange_node(struct service *s, struct context *c, in_port_t *port){ int r; in_port_t p; /* !! PLEASE(2) !! * for sanity reasons, keep this always synchronized with * ../daemon/node.ggo -- self explanatory * and ../daemon/main.c -- search for the !! PLEASE !! commentary */ p = *port; send_intparam(s->sock, "n_procs", c->args->n_procs_arg); send_intparam(s->sock, "jobid", c->args->jobid_arg); send_intparam(s->sock, "port", p); send_strparam(s->sock, "working_dir", c->args->working_dir_arg); send_strparam(s->sock, "tmp_dir", c->args->tmp_dir_arg); send_strparam(s->sock, "helpers_dir", c->args->helpers_dir_arg); send_strparam(s->sock, "ckpt_server_ip", s->ncs?inet_ntoa(s->ncs->addr.sin_addr):"0.0.0.0"); send_intparam(s->sock, "ckpt_server_port", s->ncs?ntohs(s->ncs->addr.sin_port):0); send_intparam(s->sock, "ckpt_use_local_copy", c->args->ckpt_use_local_copy_flag); send_strparam(s->sock, "ckpt_scheduler_ip", s->nsc?inet_ntoa(s->nsc->addr.sin_addr):"0.0.0.0"); send_intparam(s->sock, "ckpt_scheduler_port", s->nsc?ntohs(s->nsc->addr.sin_port):0); send_strparam(s->sock, "event_logger_ip", s->nel?inet_ntoa(s->nel->addr.sin_addr):"0.0.0.0"); send_intparam(s->sock, "event_logger_port", s->nel?ntohs(s->nel->addr.sin_port):0); send_intparam(s->sock, "rank", s->nrank); send_intparam(s->sock, "restart", (s->nstatus == STATUS_RESTART)); send_strparam(s->sock, "command", c->command); send_intparam(s->sock, "additional_stats", 0); send_strparam(s->sock, "master_sched", c->args->master_sched_given?c->args->master_sched_arg:""); send_intparam(s->sock, "stdio_port", ntohs(c->stdio.sin_port)); send_intparam(s->sock, "stder_port", ntohs(c->stder.sin_port)); send_strparam(s->sock, "debug", c->args->debug_given?c->args->debug_arg:""); r = send_strparam(s->sock, "EOF", "EOF"); if(r < 0) return r; printi("node", "waiting for the port (%d bytes)", sizeof(in_port_t)); if( (r = urecv(s->sock, &p, sizeof(in_port_t), MSG_WAITALL)) != sizeof(in_port_t)) { printe("error while receiving port number (%d bytes received)", r); return -1; } printi("node", "port negociated : %u", ntohs(p)); *port = p; return 0;}static int allsamestatus(struct context *c, int s){ int i; int r = 1; for(i = 0; i < c->nb_nodes; i++) { pthread_mutex_lock(&c->nodes[i].lock); r &= c->nodes[i].nstatus == s; pthread_mutex_unlock(&c->nodes[i].lock); } return r;}static void *stdio_thread(void *_c){ struct context *c = (struct context *)_c; struct pollfd *polls; int nbpolls, maxpolls; int i, fd, rd; socklen_t rlen; struct sockaddr_in remote; char *buffer = NULL; int blen = 0;#ifdef DEBUG char dbg_select[128];#endif maxpolls = c->nb_nodes + c->nb_cs + c->nb_el + 5; nbpolls = 0; polls = (struct pollfd *)calloc(maxpolls, sizeof(struct pollfd)); polls[nbpolls].fd = c->stdio_sock; polls[nbpolls].events = POLLIN; nbpolls++; polls[nbpolls].fd = c->stdio_comm[0]; polls[nbpolls].events = POLLIN; nbpolls++; for(;;) { printi("stdio_poll", "testing the status"); if( allsamestatus(c, STATUS_EXITED) ) break;#ifdef DEBUG snprintf(dbg_select, 128, "polls=["); for(i = 0; i < nbpolls; i++) snprintf(dbg_select + strlen(dbg_select), 128-strlen(dbg_select), "%d:%02x:%02x ", polls[i].fd, polls[i].events, polls[i].revents); snprintf(dbg_select + strlen(dbg_select), 128-strlen(dbg_select), "]"); printi("stdio_select", "polling %s", dbg_select);#endif rlen = poll(polls, nbpolls, -1); pthread_testcancel(); if(rlen < 0) { printe("error in polling of stdio_thread"); continue; } i = 0; if( polls[i++].revents & POLLIN ) { printi("stdio_select", "connection socket signals new connection"); rlen = sizeof(remote); fd = accept(c->stdio_sock, (struct sockaddr*)&remote, &rlen); printi("stdio_select", "connection accepted : new socket is %d = %s:%d", fd, inet_ntoa(remote.sin_addr), ntohs(remote.sin_port)); if(fd < 0) printe("stdio_thread accepting connection"); else { polls[nbpolls].fd = fd; polls[nbpolls].events = POLLIN; polls[nbpolls].revents = 0; nbpolls++; } } if( polls[i++].revents & POLLIN ) { printi("stdio_select", "Other thread requests to check the status. Will do that next pass"); read(c->stdio_comm[0], &rd, sizeof(int)); } for(; i<nbpolls; i++) { if( polls[i].revents & POLLIN ) { if(ioctl(polls[i].fd, FIONREAD, &rlen) < 0) { printe("stdio_thread getting size of data to read"); goto error_on_nc; } if(rlen == 0) { printi("stdio", "stdio number %d/%d disconnected", i, polls[i].fd); goto error_on_nc; } if(rlen > blen) { blen = rlen; buffer = (char*)realloc(buffer, blen); } rd = urecv(polls[i].fd, buffer, rlen, 0); if(rd < rlen) { printe("stdio receiving %d bytes of data (received %d)", rlen, rd); goto error_on_nc; } rlen = write(1, buffer, rd); continue; error_on_nc: printi("stdio_select", "closing socket %d/%d", i, polls[i].fd); close(polls[i].fd); memcpy(&polls[i], &polls[nbpolls-1], sizeof(struct pollfd)); i--; nbpolls--; } } } printi("finalized_stdio", "bye!"); return NULL;}static void *stder_thread(void *_c){ struct context *c = (struct context *)_c; struct pollfd *polls; int nbpolls, maxpolls; int i, fd, rd; socklen_t rlen; struct sockaddr_in remote; char *buffer = NULL; int blen = 0;#ifdef DEBUG char dbg_select[128];#endif maxpolls = c->nb_nodes + c->nb_cs + c->nb_el + 5; nbpolls = 0; polls = (struct pollfd *)calloc(maxpolls, sizeof(struct pollfd)); polls[nbpolls].fd = c->stder_sock; polls[nbpolls].events = POLLIN; nbpolls++; polls[nbpolls].fd = c->stder_comm[0]; polls[nbpolls].events = POLLIN; nbpolls++; for(;;) { printi("stder_poll", "testing the status"); if( allsamestatus(c, STATUS_EXITED) ) break;#ifdef DEBUG snprintf(dbg_select, 128, "polls=["); for(i = 0; i < nbpolls; i++) snprintf(dbg_select + strlen(dbg_select), 128-strlen(dbg_select), "%d:%02x:%02x ", polls[i].fd, polls[i].events, polls[i].revents); snprintf(dbg_select + strlen(dbg_select), 128-strlen(dbg_select), "]"); printi("stder_select", "polling %s", dbg_select);#endif rlen = poll(polls, nbpolls, -1); pthread_testcancel(); if(rlen < 0) { printe("error in polling of stder_thread"); continue; } i = 0; if( polls[i++].revents & POLLIN ) { printi("stder_select", "connection socket signals new connection"); rlen = sizeof(remote); fd = accept(c->stder_sock, (struct sockaddr*)&remote, &rlen); printi("stder_select", "connection accepted : new socket is %d = %s:%d", fd, inet_ntoa(remote.sin_addr), ntohs(remote.sin_port)); if(fd < 0) printe("stder_thread accepting connection"); else { polls[nbpolls].fd = fd; polls[nbpolls].events = POLLIN; polls[nbpolls].revents = 0; nbpolls++; } } if( polls[i++].revents & POLLIN ) { printi("stder_select", "Other thread requests to check the status. Will do that next pass"); read(c->stder_comm[0], &rd, sizeof(int)); } for(; i<nbpolls; i++) { if( polls[i].revents & POLLIN ) { if(ioctl(polls[i].fd, FIONREAD, &rlen) < 0) { printe("stder_thread getting size of data to read"); goto error_on_nc; } if(rlen == 0) { printi("stder", "stder number %d/%d disconnected", i, polls[i].fd); goto error_on_nc; } if(rlen > blen) { blen = rlen; buffer = (char*)realloc(buffer, blen); } rd = urecv(polls[i].fd, buffer, rlen, 0); if(rd < rlen) { printe("stder receiving %d bytes of data (received %d)", rlen, rd); goto error_on_nc; } rlen = write(2, buffer, rd); continue; error_on_nc: printi("stder_select", "closing socket %d/%d", i, polls[i].fd); close(polls[i].fd); memcpy(&polls[i], &polls[nbpolls-1], sizeof(struct pollfd)); i--; nbpolls--; } } } printi("finalized_stder", "bye!"); return NULL;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -