📄 vrun.c
字号:
#include "config.h"#include "debug.h"#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <stdio.h>#include <pthread.h>#include <netdb.h>#include <unistd.h>#include <string.h>#include <sys/wait.h>#include <sys/stat.h>#include <sys/ioctl.h>#include <fcntl.h>#include <ctype.h>#include <poll.h>/* #include "vrun.h" */#define FINALIZE_MSG 1#define DEFAULT_CKPT_FREQ 5extern int h_errno;char *get_current_dir_name(void);#include "cmdline_vrun.h"#define CHKPTSERV "checkpointserver"#define EVENTLOGGER "eventlogger"#define CSCHED "csched"#define VDAEMON "vdaemon"struct service { struct sockaddr_in addr; int sock; pthread_mutex_t lock; unsigned char kind; union { struct { } server; struct { int _rank; int _status; struct service *_cs; struct service *_sc; struct service *_el; } node; } u;};#define nrank u.node._rank#define nstatus u.node._status#define ncs u.node._cs#define nsc u.node._sc#define nel u.node._el#define KIND_NODE 1#define KIND_SERVER 2#define STATUS_INITIALSTART 0#define STATUS_RESTART 1#define STATUS_FINISHED 2#define STATUS_EXITING 3#define STATUS_WAITING_FOR_CLOSE 4#define STATUS_EXITED 5struct machine { char *name; in_addr_t ip; int used; int missed; pthread_mutex_t lock;};struct accept_list_elt { struct sockaddr_in remote; int fd;};struct context { int listen_sock; int stdio_sock; int stder_sock; int stdio_comm[2]; int stder_comm[2]; struct gengetopt_args_info *args; struct sockaddr_in myself; struct sockaddr_in stdio; struct sockaddr_in stder; char *command; pthread_mutex_t accept_list_lock; pthread_cond_t accept_list_cond; int accept_list_size; int accept_list_malloced; struct accept_list_elt *accept_list; int nb_stables; struct machine **stables; int nb_unstables; struct machine **unstables; int nb_cs; struct service *cs; struct service *sc; int nb_el; struct service *el; int nb_nodes; struct service *nodes;};struct node_monitor_parameter_t { int nb_nodes; struct service **nodes; struct context *c; pthread_t monitor;};static int usend(int sock, void *buf, int size, int flag){ int n, written = 0; while( written < size ) { n = send( sock, buf+written, size-written, MSG_NOSIGNAL | flag ); if( n < 0 ) { if((errno == EINTR) || (errno == EAGAIN)) continue;#ifdef DEBUG printe( "Writing %d/%d bytes on %d before error", written, size, sock);#endif return n; } written += n; } return written;}static int urecv(int sock, void *buf, int size, int flag ){ int n, rd = 0; while(rd < size) { n = recv(sock, buf + rd, size - rd, MSG_WAITALL | flag); if(n == 0) { /** change mode to synchronous, since this 0 may be not disconnect */ int fdflags = fcntl(sock, F_GETFL); int nflags = (fdflags & (~(O_ASYNC|O_NONBLOCK))) | (O_SYNC); again: fcntl(sock, F_SETFL, nflags); n = recv(sock, buf + rd, size - rd, MSG_WAITALL | flag); /** back to old mode */ fcntl(sock, F_SETFL, fdflags); if(n == 0) { /** this is a disconnection (sure) */ return 0; } if(n < 0) { if(errno == EINTR) goto again; printe("SyncReading %d/%d bytes on fd %d before error", rd, size, sock); return -1; } } if(n < 0) { if((errno == EINTR)|| (errno == EAGAIN)) continue; printe("SyncReading %d/%d bytes on fd %d before error", rd, size, sock); return -1; } rd += n; } return rd;}static void *accept_list_thread(void *_c){ struct context *c = (struct context *)_c; struct sockaddr_in remote; int r; socklen_t rlen; int failed=0; pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); for(;;) { rlen = sizeof(struct sockaddr_in); remote.sin_addr.s_addr = INADDR_ANY; r = accept(c->listen_sock, (struct sockaddr*)&remote, &rlen); pthread_testcancel(); printi("accept_list", "%d accepted a connection with %s:%d on %d", pthread_self(), inet_ntoa(remote.sin_addr), ntohs(remote.sin_port), r); if( (r < 0) && ( remote.sin_addr.s_addr == INADDR_ANY ) ) { printe("accept failed (%d times) in the accept_list_thread", failed); if(failed++ > 100) printq("maximum failed reached. Fatal error."); continue; } pthread_mutex_lock(&c->accept_list_lock); if( c->accept_list_size + 1 > c->accept_list_malloced ) { c->accept_list_malloced = 1+c->accept_list_size; c->accept_list = (struct accept_list_elt*) realloc(c->accept_list, c->accept_list_malloced*sizeof(struct accept_list_elt)); } memcpy(&c->accept_list[c->accept_list_size].remote, &remote, sizeof(struct sockaddr_in)); c->accept_list[c->accept_list_size].fd = r; c->accept_list_size++; pthread_cond_broadcast(&c->accept_list_cond); printi("accept", "appended to the accept_list"); pthread_mutex_unlock(&c->accept_list_lock); } return NULL;}static void parse_machine_file(struct context *c, FILE *f, int stable){ int i; char line[1024]; struct hostent *h; while(fgets(line, 1024, f)) { for(i = 0; line[i] && (line[i] != ':') && (line[i] != '\n'); i++) ; line[i] = 0; h = gethostbyname(line); if(h == NULL) { printi("init", "%s is not solvable by gethostbyname (%s) -- ignored", line, hstrerror(h_errno)); continue; } if(stable) { i = c->nb_stables; c->nb_stables++; c->stables = (struct machine**)realloc(c->stables, c->nb_stables*sizeof(struct machine*)); c->stables[i] = (struct machine*)calloc(1, sizeof(struct machine)); c->stables[i]->name = strdup(line); memcpy(&c->stables[i]->ip, h->h_addr, h->h_length); c->stables[i]->missed = 0; pthread_mutex_init(&c->stables[i]->lock, NULL); } else { i = c->nb_unstables; c->nb_unstables++; c->unstables = (struct machine**)realloc(c->unstables, c->nb_unstables*sizeof(struct machine*)); c->unstables[i] = (struct machine*)calloc(1, sizeof(struct machine)); c->unstables[i]->name = strdup(line); memcpy(&c->unstables[i]->ip, h->h_addr, h->h_length); c->unstables[i]->missed = 0; pthread_mutex_init(&c->unstables[i]->lock, NULL); } }}static struct context *build_context(struct gengetopt_args_info *args){ struct context *c; char myname[512]; struct hostent *h; FILE *f; int i; socklen_t slen; struct sockaddr_in server, server_stdio, server_stder; f = fopen(args->machines_file_arg, "r"); if(f == NULL) { perror(args->machines_file_arg); return NULL; } c = (struct context*)calloc(1, sizeof(struct context)); c->args = args; parse_machine_file(c, f, 0); fclose(f); if(args->stable_machines_file_given) { f = fopen(args->stable_machines_file_arg, "r"); if(f == NULL) { perror(args->stable_machines_file_arg); printe(" unable to open %s", args->stable_machines_file_arg); printw(" (ignored: unstable machines are used as stable machines)"); goto err; } else { parse_machine_file(c, f, 1); fclose(f); } } else { err: /* we use the reverse unstables machines as stables */ c->stables = (struct machine**)calloc(c->nb_unstables, sizeof(struct machine*)); for(i = 0; i < c->nb_unstables; i++) { c->stables[c->nb_unstables-i-1] = (struct machine*)calloc(1, sizeof(struct machine)); memcpy(c->stables[c->nb_unstables-i-1], c->unstables[i], sizeof(struct machine)); } c->nb_stables = c->nb_unstables; } if(gethostname(myname, 512) < 0) qerror("gethostname"); printi("init", "I am %s", myname); if( (h = gethostbyname(myname)) == NULL ) { printe("gethostbyname(%s) failed (%s)", myname, hstrerror(h_errno)); return NULL; } memcpy(&c->myself.sin_addr, h->h_addr, h->h_length); c->listen_sock = socket(AF_INET, SOCK_STREAM, 0); if(c->listen_sock < 0) qerror("listen_sock creation"); if( args->runtime_port_given ) { memset(&server, 0, sizeof(struct sockaddr_in)); server.sin_addr.s_addr = INADDR_ANY; server.sin_family = AF_INET; server.sin_port = htons( args->runtime_port_arg ); } else { memset(&server, 0, sizeof(struct sockaddr_in)); server.sin_addr.s_addr = INADDR_ANY; server.sin_family = AF_INET; } if(bind(c->listen_sock, (struct sockaddr*)&server, sizeof(struct sockaddr_in)) < 0) qerror("listen_sock binding"); if(listen(c->listen_sock, 5) < 0) qerror("listen_sock listening"); slen = sizeof(struct sockaddr_in); if(getsockname(c->listen_sock, (struct sockaddr*)&server, &slen) < 0) qerror("get listen_sock name"); c->myself.sin_port = server.sin_port; printi("init", "vrun bound on %s:%d", inet_ntoa(c->myself.sin_addr), ntohs(server.sin_port)); c->stdio_sock = socket(AF_INET, SOCK_STREAM, 0); if(c->stdio_sock < 0) qerror("stdio_sock creation"); if( args->stdio_port_given ) { memset(&server_stdio, 0, sizeof(struct sockaddr_in)); server_stdio.sin_addr.s_addr = INADDR_ANY; server_stdio.sin_family = AF_INET; server_stdio.sin_port = htons( args->stdio_port_arg ); } else { memset(&server_stdio, 0, sizeof(struct sockaddr_in)); server_stdio.sin_addr.s_addr = INADDR_ANY; server_stdio.sin_family = AF_INET; } if(bind(c->stdio_sock, (struct sockaddr*)&server_stdio, sizeof(struct sockaddr_in)) < 0) qerror("stdio_sock binding"); if(listen(c->stdio_sock, 5) < 0) qerror("stdio_sock listening"); slen = sizeof(struct sockaddr_in); if(getsockname(c->stdio_sock, (struct sockaddr*)&server_stdio, &slen) < 0) qerror("get stdio_sock name"); memcpy(&c->stdio, &c->myself, sizeof(struct sockaddr_in)); c->stdio.sin_port = server_stdio.sin_port; printi("init", "STDIO vrun bound on %s:%d", inet_ntoa(c->stdio.sin_addr), ntohs(c->stdio.sin_port)); if( socketpair(AF_UNIX, SOCK_DGRAM, 0, c->stdio_comm) == -1 ) qerror("stdio_comm creation"); c->stder_sock = socket(AF_INET, SOCK_STREAM, 0); if(c->stder_sock < 0) qerror("stder_sock creation"); if( args->stder_port_given ) { memset(&server_stder, 0, sizeof(struct sockaddr_in)); server_stder.sin_addr.s_addr = INADDR_ANY; server_stder.sin_family = AF_INET; server_stder.sin_port = htons( args->stder_port_arg ); } else { memset(&server_stder, 0, sizeof(struct sockaddr_in)); server_stder.sin_addr.s_addr = INADDR_ANY; server_stder.sin_family = AF_INET; } if(bind(c->stder_sock, (struct sockaddr*)&server_stder, sizeof(struct sockaddr_in)) < 0) qerror("stder_sock binding"); if(listen(c->stder_sock, 5) < 0) qerror("stder_sock listening");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -