⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 main.c

📁 Path MPICH-V for MPICH the MPI Implementation
💻 C
字号:
/*  MPICH-V2  Copyright (C) 2002, 2003 Groupe Cluster et Grid, LRI, Universite de Paris Sud  This file is part of MPICH-V2.  MPICH-V2 is free software; you can redistribute it and/or modify  it under the terms of the GNU General Public License as published by  the Free Software Foundation; either version 2 of the License, or  (at your option) any later version.  MPICH-V2 is distributed in the hope that it will be useful,  but WITHOUT ANY WARRANTY; without even the implied warranty of  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the  GNU General Public License for more details.  You should have received a copy of the GNU General Public License  along with MPICH-V2; if not, write to the Free Software  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  $Id: main.c,v 1.43 2006/01/23 20:39:47 herault Exp $*//** @file main.c the main of the daemon: parse args and call select loop */#include <stdlib.h>#include <unistd.h>#include <signal.h>#include <string.h>#include "config.h"#include "debug.h"#include "localmpi.h"#include "select.h"#include "checkpoint.h"#include "chl.h"#include "redirect.h"#include <netinet/in.h>#include <sys/socket.h>#include <arpa/inet.h>#include <sys/time.h>#include <sys/resource.h>#include <sys/times.h>#include <unistd.h>#include <netdb.h>char *get_current_dir_name(void);#include "cmdline_node.h"static int STARTTIME;static void display_additional_stats(void){  struct rusage myself;  struct rusage children;  getrusage(RUSAGE_SELF, &myself);  getrusage(RUSAGE_CHILDREN, &children);#define timeval_tofloat(tv) ((float)tv.tv_sec + (float)tv.tv_usec/1000000.0)  printf("                myself    children   total\n");  printf("page reclaims = %10ld %10ld %10ld\n", myself.ru_minflt, children.ru_minflt, myself.ru_minflt+children.ru_minflt);  printf("page faults   = %10ld %10ld %10ld\n", myself.ru_majflt, children.ru_majflt, myself.ru_majflt+children.ru_majflt);  printf("user time     = %5.4f %5.4f %5.4f\n", timeval_tofloat(myself.ru_utime), timeval_tofloat(children.ru_utime),	 timeval_tofloat(myself.ru_utime) + timeval_tofloat(children.ru_utime));  printf("system time   = %5.4f %5.4f %5.4f\n", timeval_tofloat(myself.ru_stime), timeval_tofloat(children.ru_stime),	 timeval_tofloat(myself.ru_stime) + timeval_tofloat(children.ru_stime));  printf("absolute time = %lu\n", times(NULL) - STARTTIME);#undef timeval_tofloat}int main(int argc,char *argv[]) {  pid_t spid;  int RFmpi, WTmpi;  struct sigaction sigHandler;  struct gengetopt_args_info orig, *result;  struct sockaddr_in el;  struct sockaddr_in cs;  struct sockaddr_in sc;  struct sockaddr_in remote;  int sock_to_dispatcher;  int listen_sock;  int  exeargc, exeargs;  char **exeargv;  char *command;  char myid[64];  socklen_t rlen;  int i;  char *env_value;  int invoking;    sigHandler.sa_handler = gestSignal;  sigemptyset(&(sigHandler.sa_mask));  sigHandler.sa_flags = 0;  if(sigaction(SIGTERM , &sigHandler , NULL) != 0) {      printe("bad signal handler");      return(-1);  }  signal(SIGINT, gestSignal);  //  signal(SIGSEGV, gestSignal);    cmdline_parser_init(&orig);  if( cmdline_parser(argc, argv, &orig) != 0 )    {      cmdline_parser_print_help();      exit(0);    }  if(orig.help_given)    {      cmdline_parser_print_help();      exit(0);    }  /* handle init_exchange_node here */  sock_to_dispatcher = socket(AF_INET, SOCK_STREAM, 0);  if(sock_to_dispatcher < 0)    {      perror("socket");      exit(1);    }  remote.sin_family = AF_INET;  remote.sin_port = htons(orig.runtime_port_arg);  inet_aton(orig.runtime_ip_arg, &remote.sin_addr);  if(connect(sock_to_dispatcher, (struct sockaddr*)&remote, sizeof(struct sockaddr_in)) < 0)    {      fprintf(stderr, "connect to %s:%d error: %s\n", 	      inet_ntoa(remote.sin_addr), ntohs(remote.sin_port), strerror(errno));      exit(1);    }  daemon(1, 0);  result = (struct gengetopt_args_info*)malloc(sizeof(struct gengetopt_args_info));#define set_intparam(name) do { \    if(orig.name##_given) \       {\         result->name##_arg = orig.name##_arg; \         recv_intparam(sock_to_dispatcher, #name); \       }\    else\         result->name##_arg = recv_intparam(sock_to_dispatcher, #name);\  } while(0)#define set_strparam(name) do { \    if(orig.name##_given) \      {\         result->name##_arg = strdup(orig.name##_arg); \         free(recv_strparam(sock_to_dispatcher, #name)); \      }\    else\         result->name##_arg = recv_strparam(sock_to_dispatcher, #name);\  } while(0)#define set_flagparam(name) do { \    if(orig.name##_flag) \      {\         result->name##_flag = orig.name##_flag; \         recv_intparam(sock_to_dispatcher, #name); \      }\    else\         result->name##_flag = recv_intparam(sock_to_dispatcher, #name);\  } while(0)  result->runtime_ip_arg = strdup(orig.runtime_ip_arg);  result->runtime_port_arg = orig.runtime_port_arg;  /* !! PLEASE !!   * for sanity reasons, keep this always synchronized with    *     node.ggo        -- self explanatory   * and mpirun/vrun.c   -- search for the !! PLEASE(2) !! commentary   */    set_intparam(n_procs);  set_intparam(jobid);  set_intparam(port);  set_strparam(working_dir);  set_strparam(tmp_dir);  set_strparam(helpers_dir);  set_strparam(ckpt_server_ip);  set_intparam(ckpt_server_port);  set_flagparam(ckpt_use_local_copy);  set_strparam(ckpt_scheduler_ip);  set_intparam(ckpt_scheduler_port);  set_strparam(event_logger_ip);  set_intparam(event_logger_port);  set_intparam(rank);  set_flagparam(restart);  set_strparam(command);  set_flagparam(additional_stats);  set_strparam(master_sched);  set_intparam(stdio_port);  set_intparam(stder_port);  set_strparam(debug);  free(recv_strparam(sock_to_dispatcher, "EOF"));  redirect_stdio(orig.runtime_ip_arg, result->stdio_port_arg, result->stder_port_arg);  cmdline_parser_free(&orig);  sprintf(myid, "daemon-%d", result->rank_arg);  chdir(result->working_dir_arg);#if 1  {    char fname[16];    int fd;    FILE *f;    sprintf(fname, "/tmp/chvXXXXXX");    fd = mkstemp(fname);    f = fdopen(fd, "w");    fprintf(f, "running, daemon, wd = %s, debug = %s, myid = %s\n",	    result->working_dir_arg, result->debug_arg, myid);#define fprintf_intparam(i)  fprintf(f, "  "#i" : %d\n", result->i##_arg)#define fprintf_strparam(s)  fprintf(f, "  "#s" : %s\n", result->s##_arg)#define fprintf_flagparam(p) fprintf(f, "  "#p" : %s\n", result->p##_flag?"yes":"no");    fprintf_intparam(n_procs);    fprintf_intparam(jobid);    fprintf_intparam(port);    fprintf_strparam(working_dir);    fprintf_strparam(tmp_dir);    fprintf_strparam(helpers_dir);    fprintf_strparam(ckpt_server_ip);    fprintf_intparam(ckpt_server_port);    fprintf_flagparam(ckpt_use_local_copy);    fprintf_strparam(ckpt_scheduler_ip);    fprintf_intparam(ckpt_scheduler_port);    fprintf_strparam(event_logger_ip);    fprintf_intparam(event_logger_port);    fprintf_intparam(rank);    fprintf_flagparam(restart);    fprintf_strparam(command);    fprintf_flagparam(additional_stats);    fprintf_strparam(master_sched);    fprintf_intparam(stdio_port);    fprintf_strparam(debug);    fclose(f);    close(fd);  }#endif  initDebug(myid, result->debug_arg);  /* this is used by libckpt, but it may be used by others.   * 1. Since we use execvp for executing the command, PATH expansion is performed   *    We thus add result->helpers_dir_arg to the PATH   * 2. It is also a good place to put all libraries -- we know libraries are NOT necessarily executable.   */  env_value = getenv("PATH");  if(env_value)    {      env_value = strdup(env_value);      env_value = (char*)realloc(env_value, strlen(env_value)+16+strlen(result->helpers_dir_arg));      memmove(env_value+strlen(result->helpers_dir_arg)+1, env_value, strlen(env_value)+1);      memcpy(env_value, result->helpers_dir_arg, strlen(result->helpers_dir_arg));      env_value[strlen(result->helpers_dir_arg)] = ':';    }  else    env_value = strdup(result->helpers_dir_arg);  setenv("PATH", env_value, 1);  free(env_value);  env_value = getenv("LD_LIBRARY_PATH");  if(env_value)    {      env_value = strdup(env_value);      env_value = (char*)realloc(env_value, strlen(env_value)+16+strlen(result->helpers_dir_arg));      memmove(env_value+strlen(result->helpers_dir_arg)+1, env_value, strlen(env_value)+1);      memcpy(env_value, result->helpers_dir_arg, strlen(result->helpers_dir_arg));      env_value[strlen(result->helpers_dir_arg)] = ':';    }  else    env_value = strdup(result->helpers_dir_arg);  setenv("LD_LIBRARY_PATH", env_value, 1);  free(env_value);  /*    * End Of Environment games   */  remote.sin_family = AF_INET;  remote.sin_port = htons(result->port_arg);  remote.sin_addr.s_addr = INADDR_ANY;  listen_sock = socket(AF_INET, SOCK_STREAM, 0);  if(listen_sock < 0)    qerror("socket creation");  if(bind(listen_sock, (struct sockaddr*)&remote, sizeof(struct sockaddr_in)) < 0)    qerror("socket binding");    if(listen(listen_sock, 5) < 0)    qerror("socket listening");  rlen = sizeof(struct sockaddr_in);  if(getsockname(listen_sock, (struct sockaddr*)&remote, &rlen) < 0)    qerror("get socket name");    result->port_arg = ntohs(remote.sin_port);  if( send(sock_to_dispatcher, &remote.sin_port, sizeof(short), 0) != sizeof(short) )    qerror("sending port to dispatcher");  printi("init", "vdaemon (pwd : %s)", get_current_dir_name() );  if(result->additional_stats_flag)    {      STARTTIME = times(NULL);      atexit(display_additional_stats);    }    cs.sin_family = AF_INET;  if( !inet_aton(result->ckpt_server_ip_arg, &cs.sin_addr) )    printe("checkpoint server (%s) is not a valid ip address", result->ckpt_server_ip_arg);  cs.sin_port = htons(result->ckpt_server_port_arg);  sc.sin_family = AF_INET;  if( !inet_aton(result->ckpt_scheduler_ip_arg, &sc.sin_addr) )    printe("checkpoint scheduler (%s) is not a valid ip address", result->ckpt_scheduler_ip_arg);  sc.sin_port = htons(result->ckpt_scheduler_port_arg);  el.sin_family = AF_INET;  if( !inet_aton(result->event_logger_ip_arg, &el.sin_addr) )    printe("event logger (%s) is not a valid ip address", result->event_logger_ip_arg);  el.sin_port = htons(result->event_logger_port_arg);  initstruct(result->jobid_arg, result->rank_arg, result->n_procs_arg, el, cs, sc, result->ckpt_use_local_copy_flag);  localMPI_regeneratePipes(result->jobid_arg, result->rank_arg);  exeargc = 0;   command = strdup(result->command_arg);  for(i=0; command[i]; i++)    if(command[i] == ' ')      exeargc++;  /* safety of additionnal arguments from checkpoint and localmpi */  exeargs = exeargc+16;  exeargv = (char**)calloc(exeargs, sizeof(char*));  exeargc = 0;  while( (exeargv[exeargc] = strtok(exeargc==0?command:NULL, " ")) )     exeargc++;  if(result->restart_flag)   {     printi("init", "vdaemon: set next call be a restart");     invoking = 0;  }  else  {    printi("init", "vdaemon: set next call be an invoke");    invoking = 1;  }  localMPI_setCommand(&exeargc, &exeargv, result->jobid_arg, result->rank_arg, result->n_procs_arg, result->debug_arg);  exeargv[exeargc] = NULL;  spid = ckptForkExec(exeargv, exeargc, exeargs, result->jobid_arg, result->rank_arg, invoking);  if( spid > 0 )    {      WTmpi = localMPI_OpenWriteToDriver(result->jobid_arg, result->rank_arg);      RFmpi = localMPI_OpenReadFromDriver(result->jobid_arg, result->rank_arg);            initcomm(RFmpi, WTmpi, listen_sock, sock_to_dispatcher);      daemonselect();      localMPI_removePipes(result->jobid_arg, result->rank_arg);      printi("finalized", "Now, returning 0");    }    return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -