📄 main.c
字号:
/* MPICH-V2 Copyright (C) 2002, 2003 Groupe Cluster et Grid, LRI, Universite de Paris Sud This file is part of MPICH-V2. MPICH-V2 is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. MPICH-V2 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with MPICH-V2; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA $Id: main.c,v 1.43 2006/01/23 20:39:47 herault Exp $*//** @file main.c the main of the daemon: parse args and call select loop */#include <stdlib.h>#include <unistd.h>#include <signal.h>#include <string.h>#include "config.h"#include "debug.h"#include "localmpi.h"#include "select.h"#include "checkpoint.h"#include "chl.h"#include "redirect.h"#include <netinet/in.h>#include <sys/socket.h>#include <arpa/inet.h>#include <sys/time.h>#include <sys/resource.h>#include <sys/times.h>#include <unistd.h>#include <netdb.h>char *get_current_dir_name(void);#include "cmdline_node.h"static int STARTTIME;static void display_additional_stats(void){ struct rusage myself; struct rusage children; getrusage(RUSAGE_SELF, &myself); getrusage(RUSAGE_CHILDREN, &children);#define timeval_tofloat(tv) ((float)tv.tv_sec + (float)tv.tv_usec/1000000.0) printf(" myself children total\n"); printf("page reclaims = %10ld %10ld %10ld\n", myself.ru_minflt, children.ru_minflt, myself.ru_minflt+children.ru_minflt); printf("page faults = %10ld %10ld %10ld\n", myself.ru_majflt, children.ru_majflt, myself.ru_majflt+children.ru_majflt); printf("user time = %5.4f %5.4f %5.4f\n", timeval_tofloat(myself.ru_utime), timeval_tofloat(children.ru_utime), timeval_tofloat(myself.ru_utime) + timeval_tofloat(children.ru_utime)); printf("system time = %5.4f %5.4f %5.4f\n", timeval_tofloat(myself.ru_stime), timeval_tofloat(children.ru_stime), timeval_tofloat(myself.ru_stime) + timeval_tofloat(children.ru_stime)); printf("absolute time = %lu\n", times(NULL) - STARTTIME);#undef timeval_tofloat}int main(int argc,char *argv[]) { pid_t spid; int RFmpi, WTmpi; struct sigaction sigHandler; struct gengetopt_args_info orig, *result; struct sockaddr_in el; struct sockaddr_in cs; struct sockaddr_in sc; struct sockaddr_in remote; int sock_to_dispatcher; int listen_sock; int exeargc, exeargs; char **exeargv; char *command; char myid[64]; socklen_t rlen; int i; char *env_value; int invoking; sigHandler.sa_handler = gestSignal; sigemptyset(&(sigHandler.sa_mask)); sigHandler.sa_flags = 0; if(sigaction(SIGTERM , &sigHandler , NULL) != 0) { printe("bad signal handler"); return(-1); } signal(SIGINT, gestSignal); // signal(SIGSEGV, gestSignal); cmdline_parser_init(&orig); if( cmdline_parser(argc, argv, &orig) != 0 ) { cmdline_parser_print_help(); exit(0); } if(orig.help_given) { cmdline_parser_print_help(); exit(0); } /* handle init_exchange_node here */ sock_to_dispatcher = socket(AF_INET, SOCK_STREAM, 0); if(sock_to_dispatcher < 0) { perror("socket"); exit(1); } remote.sin_family = AF_INET; remote.sin_port = htons(orig.runtime_port_arg); inet_aton(orig.runtime_ip_arg, &remote.sin_addr); if(connect(sock_to_dispatcher, (struct sockaddr*)&remote, sizeof(struct sockaddr_in)) < 0) { fprintf(stderr, "connect to %s:%d error: %s\n", inet_ntoa(remote.sin_addr), ntohs(remote.sin_port), strerror(errno)); exit(1); } daemon(1, 0); result = (struct gengetopt_args_info*)malloc(sizeof(struct gengetopt_args_info));#define set_intparam(name) do { \ if(orig.name##_given) \ {\ result->name##_arg = orig.name##_arg; \ recv_intparam(sock_to_dispatcher, #name); \ }\ else\ result->name##_arg = recv_intparam(sock_to_dispatcher, #name);\ } while(0)#define set_strparam(name) do { \ if(orig.name##_given) \ {\ result->name##_arg = strdup(orig.name##_arg); \ free(recv_strparam(sock_to_dispatcher, #name)); \ }\ else\ result->name##_arg = recv_strparam(sock_to_dispatcher, #name);\ } while(0)#define set_flagparam(name) do { \ if(orig.name##_flag) \ {\ result->name##_flag = orig.name##_flag; \ recv_intparam(sock_to_dispatcher, #name); \ }\ else\ result->name##_flag = recv_intparam(sock_to_dispatcher, #name);\ } while(0) result->runtime_ip_arg = strdup(orig.runtime_ip_arg); result->runtime_port_arg = orig.runtime_port_arg; /* !! PLEASE !! * for sanity reasons, keep this always synchronized with * node.ggo -- self explanatory * and mpirun/vrun.c -- search for the !! PLEASE(2) !! commentary */ set_intparam(n_procs); set_intparam(jobid); set_intparam(port); set_strparam(working_dir); set_strparam(tmp_dir); set_strparam(helpers_dir); set_strparam(ckpt_server_ip); set_intparam(ckpt_server_port); set_flagparam(ckpt_use_local_copy); set_strparam(ckpt_scheduler_ip); set_intparam(ckpt_scheduler_port); set_strparam(event_logger_ip); set_intparam(event_logger_port); set_intparam(rank); set_flagparam(restart); set_strparam(command); set_flagparam(additional_stats); set_strparam(master_sched); set_intparam(stdio_port); set_intparam(stder_port); set_strparam(debug); free(recv_strparam(sock_to_dispatcher, "EOF")); redirect_stdio(orig.runtime_ip_arg, result->stdio_port_arg, result->stder_port_arg); cmdline_parser_free(&orig); sprintf(myid, "daemon-%d", result->rank_arg); chdir(result->working_dir_arg);#if 1 { char fname[16]; int fd; FILE *f; sprintf(fname, "/tmp/chvXXXXXX"); fd = mkstemp(fname); f = fdopen(fd, "w"); fprintf(f, "running, daemon, wd = %s, debug = %s, myid = %s\n", result->working_dir_arg, result->debug_arg, myid);#define fprintf_intparam(i) fprintf(f, " "#i" : %d\n", result->i##_arg)#define fprintf_strparam(s) fprintf(f, " "#s" : %s\n", result->s##_arg)#define fprintf_flagparam(p) fprintf(f, " "#p" : %s\n", result->p##_flag?"yes":"no"); fprintf_intparam(n_procs); fprintf_intparam(jobid); fprintf_intparam(port); fprintf_strparam(working_dir); fprintf_strparam(tmp_dir); fprintf_strparam(helpers_dir); fprintf_strparam(ckpt_server_ip); fprintf_intparam(ckpt_server_port); fprintf_flagparam(ckpt_use_local_copy); fprintf_strparam(ckpt_scheduler_ip); fprintf_intparam(ckpt_scheduler_port); fprintf_strparam(event_logger_ip); fprintf_intparam(event_logger_port); fprintf_intparam(rank); fprintf_flagparam(restart); fprintf_strparam(command); fprintf_flagparam(additional_stats); fprintf_strparam(master_sched); fprintf_intparam(stdio_port); fprintf_strparam(debug); fclose(f); close(fd); }#endif initDebug(myid, result->debug_arg); /* this is used by libckpt, but it may be used by others. * 1. Since we use execvp for executing the command, PATH expansion is performed * We thus add result->helpers_dir_arg to the PATH * 2. It is also a good place to put all libraries -- we know libraries are NOT necessarily executable. */ env_value = getenv("PATH"); if(env_value) { env_value = strdup(env_value); env_value = (char*)realloc(env_value, strlen(env_value)+16+strlen(result->helpers_dir_arg)); memmove(env_value+strlen(result->helpers_dir_arg)+1, env_value, strlen(env_value)+1); memcpy(env_value, result->helpers_dir_arg, strlen(result->helpers_dir_arg)); env_value[strlen(result->helpers_dir_arg)] = ':'; } else env_value = strdup(result->helpers_dir_arg); setenv("PATH", env_value, 1); free(env_value); env_value = getenv("LD_LIBRARY_PATH"); if(env_value) { env_value = strdup(env_value); env_value = (char*)realloc(env_value, strlen(env_value)+16+strlen(result->helpers_dir_arg)); memmove(env_value+strlen(result->helpers_dir_arg)+1, env_value, strlen(env_value)+1); memcpy(env_value, result->helpers_dir_arg, strlen(result->helpers_dir_arg)); env_value[strlen(result->helpers_dir_arg)] = ':'; } else env_value = strdup(result->helpers_dir_arg); setenv("LD_LIBRARY_PATH", env_value, 1); free(env_value); /* * End Of Environment games */ remote.sin_family = AF_INET; remote.sin_port = htons(result->port_arg); remote.sin_addr.s_addr = INADDR_ANY; listen_sock = socket(AF_INET, SOCK_STREAM, 0); if(listen_sock < 0) qerror("socket creation"); if(bind(listen_sock, (struct sockaddr*)&remote, sizeof(struct sockaddr_in)) < 0) qerror("socket binding"); if(listen(listen_sock, 5) < 0) qerror("socket listening"); rlen = sizeof(struct sockaddr_in); if(getsockname(listen_sock, (struct sockaddr*)&remote, &rlen) < 0) qerror("get socket name"); result->port_arg = ntohs(remote.sin_port); if( send(sock_to_dispatcher, &remote.sin_port, sizeof(short), 0) != sizeof(short) ) qerror("sending port to dispatcher"); printi("init", "vdaemon (pwd : %s)", get_current_dir_name() ); if(result->additional_stats_flag) { STARTTIME = times(NULL); atexit(display_additional_stats); } cs.sin_family = AF_INET; if( !inet_aton(result->ckpt_server_ip_arg, &cs.sin_addr) ) printe("checkpoint server (%s) is not a valid ip address", result->ckpt_server_ip_arg); cs.sin_port = htons(result->ckpt_server_port_arg); sc.sin_family = AF_INET; if( !inet_aton(result->ckpt_scheduler_ip_arg, &sc.sin_addr) ) printe("checkpoint scheduler (%s) is not a valid ip address", result->ckpt_scheduler_ip_arg); sc.sin_port = htons(result->ckpt_scheduler_port_arg); el.sin_family = AF_INET; if( !inet_aton(result->event_logger_ip_arg, &el.sin_addr) ) printe("event logger (%s) is not a valid ip address", result->event_logger_ip_arg); el.sin_port = htons(result->event_logger_port_arg); initstruct(result->jobid_arg, result->rank_arg, result->n_procs_arg, el, cs, sc, result->ckpt_use_local_copy_flag); localMPI_regeneratePipes(result->jobid_arg, result->rank_arg); exeargc = 0; command = strdup(result->command_arg); for(i=0; command[i]; i++) if(command[i] == ' ') exeargc++; /* safety of additionnal arguments from checkpoint and localmpi */ exeargs = exeargc+16; exeargv = (char**)calloc(exeargs, sizeof(char*)); exeargc = 0; while( (exeargv[exeargc] = strtok(exeargc==0?command:NULL, " ")) ) exeargc++; if(result->restart_flag) { printi("init", "vdaemon: set next call be a restart"); invoking = 0; } else { printi("init", "vdaemon: set next call be an invoke"); invoking = 1; } localMPI_setCommand(&exeargc, &exeargv, result->jobid_arg, result->rank_arg, result->n_procs_arg, result->debug_arg); exeargv[exeargc] = NULL; spid = ckptForkExec(exeargv, exeargc, exeargs, result->jobid_arg, result->rank_arg, invoking); if( spid > 0 ) { WTmpi = localMPI_OpenWriteToDriver(result->jobid_arg, result->rank_arg); RFmpi = localMPI_OpenReadFromDriver(result->jobid_arg, result->rank_arg); initcomm(RFmpi, WTmpi, listen_sock, sock_to_dispatcher); daemonselect(); localMPI_removePipes(result->jobid_arg, result->rank_arg); printi("finalized", "Now, returning 0"); } return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -