📄 simple_pmi.c
字号:
return PMI_FAIL; } } return err;}/* ----------------------------------------------------------------------- */#ifdef USE_PMI_PORT/* * This code allows a program to contact a host/port for the PMI socket. */#include <errno.h>#if defined(HAVE_SYS_TYPES_H)#include <sys/types.h>#endif#include <sys/param.h>#include <sys/socket.h>/* sockaddr_in (Internet) */#include <netinet/in.h>/* TCP_NODELAY */#include <netinet/tcp.h>/* sockaddr_un (Unix) */#include <sys/un.h>/* defs of gethostbyname */#include <netdb.h>/* fcntl, F_GET/SETFL */#include <fcntl.h>/* This is really IP!? */#ifndef TCP#define TCP 0#endif/* stub for connecting to a specified host/port instead of using a specified fd inherited from a parent process */static int PMII_Connect_to_pm( char *hostname, int portnum ){ struct hostent *hp; struct sockaddr_in sa; int fd; int optval = 1; int q_wait = 1; hp = gethostbyname( hostname ); if (!hp) { PMIU_printf( 1, "Unable to get host entry for %s\n", hostname ); return -1; } memset( (void *)&sa, 0, sizeof(sa) ); /* POSIX might define h_addr_list only and node define h_addr */#ifdef HAVE_H_ADDR_LIST memcpy( (void *)&sa.sin_addr, (void *)hp->h_addr_list[0], hp->h_length);#else memcpy( (void *)&sa.sin_addr, (void *)hp->h_addr, hp->h_length);#endif sa.sin_family = hp->h_addrtype; sa.sin_port = htons( (unsigned short) portnum ); fd = socket( AF_INET, SOCK_STREAM, TCP ); if (fd < 0) { PMIU_printf( 1, "Unable to get AF_INET socket\n" ); return -1; } if (setsockopt( fd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval) )) { perror( "Error calling setsockopt:" ); } /* We wait here for the connection to succeed */ if (connect( fd, (struct sockaddr *)&sa, sizeof(sa) ) < 0) { switch (errno) { case ECONNREFUSED: PMIU_printf( 1, "connect failed with connection refused\n" ); /* (close socket, get new socket, try again) */ if (q_wait) close(fd); return -1; case EINPROGRESS: /* (nonblocking) - select for writing. */ break; case EISCONN: /* (already connected) */ break; case ETIMEDOUT: /* timed out */ PMIU_printf( 1, "connect failed with timeout\n" ); return -1; default: PMIU_printf( 1, "connect failed with errno %d\n", errno ); return -1; } } return fd;}static int PMII_Set_from_port( int fd, int id ){ char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE]; int err, rc; /* We start by sending a startup message to the server */ if (PMI_debug) { PMIU_printf( 1, "Writing initack to destination fd %d\n", fd ); } /* Handshake and initialize from a port */ rc = MPIU_Snprintf( buf, PMIU_MAXLINE, "cmd=initack pmiid=%d\n", id ); if (rc < 0) { return PMI_FAIL; } PMIU_printf( PMI_debug, "writing on fd %d line :%s:\n", fd, buf ); err = PMIU_writeline( fd, buf ); if (err) { PMIU_printf( 1, "Error in writeline initack\n" ); return -1; } /* cmd=initack */ buf[0] = 0; PMIU_printf( PMI_debug, "reading initack\n" ); err = PMIU_readline( fd, buf, PMIU_MAXLINE ); if (err < 0) { PMIU_printf( 1, "Error reading initack on %d\n", fd ); perror( "Error on readline:" ); return -1; } PMIU_parse_keyvals( buf ); PMIU_getval( "cmd", cmd, PMIU_MAXLINE ); if ( strcmp( cmd, "initack" ) ) { PMIU_printf( 1, "got unexpected input %s\n", buf ); return -1; } /* Read, in order, size, rank, and debug. Eventually, we'll want the handshake to include a version number */ /* size */ PMIU_printf( PMI_debug, "reading size\n" ); err = PMIU_readline( fd, buf, PMIU_MAXLINE ); if (err < 0) { PMIU_printf( 1, "Error reading size on %d\n", fd ); perror( "Error on readline:" ); return -1; } PMIU_parse_keyvals( buf ); PMIU_getval( "cmd", cmd, PMIU_MAXLINE ); if ( strcmp(cmd,"set")) { PMIU_printf( 1, "got unexpected command %s in %s\n", cmd, buf ); return -1; } /* cmd=set size=n */ PMIU_getval( "size", cmd, PMIU_MAXLINE ); PMI_size = atoi(cmd); /* rank */ PMIU_printf( PMI_debug, "reading rank\n" ); err = PMIU_readline( fd, buf, PMIU_MAXLINE ); if (err < 0) { PMIU_printf( 1, "Error reading rank on %d\n", fd ); perror( "Error on readline:" ); return -1; } PMIU_parse_keyvals( buf ); PMIU_getval( "cmd", cmd, PMIU_MAXLINE ); if ( strcmp(cmd,"set")) { PMIU_printf( 1, "got unexpected command %s in %s\n", cmd, buf ); return -1; } /* cmd=set rank=n */ PMIU_getval( "rank", cmd, PMIU_MAXLINE ); PMI_rank = atoi(cmd); PMIU_Set_rank( PMI_rank ); /* debug flag */ err = PMIU_readline( fd, buf, PMIU_MAXLINE ); if (err < 0) { PMIU_printf( 1, "Error reading debug on %d\n", fd ); return -1; } PMIU_parse_keyvals( buf ); PMIU_getval( "cmd", cmd, PMIU_MAXLINE ); if ( strcmp(cmd,"set")) { PMIU_printf( 1, "got unexpected command %s in %s\n", cmd, buf ); return -1; } /* cmd=set debug=n */ PMIU_getval( "debug", cmd, PMIU_MAXLINE ); PMI_debug = atoi(cmd); if (PMI_debug) { DBG_PRINTF( ("end of handshake, rank = %d, size = %d\n", PMI_rank, PMI_size )); DBG_PRINTF( ("Completed init\n" ) ); } return 0;}/* ------------------------------------------------------------------------- *//* * Singleton Init. * * MPI-2 allows processes to become MPI processes and then make MPI calls, * such as MPI_Comm_spawn, that require a process manager (this is different * than the much simpler case of allowing MPI programs to run with an * MPI_COMM_WORLD of size 1 without an mpiexec or process manager). * * The process starts when either the client or the process manager contacts * the other. If the client starts, it sends a singinit command and * waits for the server to respond with its own singinit command. * If the server start, it send a singinit command and waits for the * client to respond with its own singinit command * * client sends singinit with these required values * pmi_version=<value of PMI_VERSION> * pmi_subversion=<value of PMI_SUBVERSION> * * and these optional values * stdio=[yes|no] * authtype=[none|shared|<other-to-be-defined>] * authstring=<string> * * server sends singinit with the same required and optional values as * above. * * At this point, the protocol is now the same in both cases, and has the * following components: * * server sends singinit_info with these required fields * versionok=[yes|no] * stdio=[yes|no] * kvsname=<string> * * The client then issues the init command (see PMII_getmaxes) * * cmd=init pmi_version=<val> pmi_subversion=<val> * * and expects to receive a * * cmd=response_to_init rc=0 pmi_version=<val> pmi_subversion=<val> * * (This is the usual init sequence). * *//* ------------------------------------------------------------------------- *//* This is a special routine used to re-initialize PMI when it is in the singleton init case. That is, the executable was started without mpiexec, and PMI_Init returned as if there was only one process. Note that PMI routines should not call PMII_singinit; they should call PMIi_InitIfSingleton(), which both connects to the process mangager and sets up the initial KVS connection entry.*/static int PMII_singinit(void){ int pid, rc; int singinit_listen_sock, stdin_sock, stdout_sock, stderr_sock; char *newargv[8], charpid[8], port_c[8]; struct sockaddr_in sin; socklen_t len; /* Create a socket on which to allow an mpiexec to connect back to us */ sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; sin.sin_port = htons(0); /* anonymous port */ singinit_listen_sock = socket(AF_INET, SOCK_STREAM, 0); rc = bind(singinit_listen_sock, (struct sockaddr *)&sin ,sizeof(sin)); len = sizeof(struct sockaddr_in); rc = getsockname( singinit_listen_sock, (struct sockaddr *) &sin, &len ); MPIU_Snprintf(port_c, sizeof(port_c), "%d",ntohs(sin.sin_port)); rc = listen(singinit_listen_sock, 5); PMIU_printf( PMI_debug_init, "Starting mpiexec with %s\n", port_c ); /* Launch the mpiexec process with the name of this port */ pid = fork(); if (pid < 0) { perror("PMII_singinit: fork failed"); exit(-1); } else if (pid == 0) { newargv[0] = "mpiexec"; newargv[1] = "-pmi_args"; newargv[2] = port_c; /* FIXME: Use a valid hostname */ newargv[3] = "default_interface"; /* default interface name, for now */ newargv[4] = "default_key"; /* default authentication key, for now */ MPIU_Snprintf(charpid, sizeof(charpid), "%d",getpid()); newargv[5] = charpid; newargv[6] = NULL; rc = execvp(newargv[0],newargv); perror("PMII_singinit: execv failed"); PMIU_printf(1, " This singleton init program attempted to access some feature\n"); PMIU_printf(1, " for which process manager support was required, e.g. spawn or universe_size.\n"); PMIU_printf(1, " But the necessary mpiexec is not in your path.\n"); return(-1); } else { char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE]; char *p; int connectStdio = 0; /* Allow one connection back from the created mpiexec program */ PMI_fd = accept_one_connection(singinit_listen_sock); if (PMI_fd < 0) { PMIU_printf( 1, "Failed to establish singleton init connection\n" ); return PMI_FAIL; } /* Execute the singleton init protocol */ rc = PMIU_readline( PMI_fd, buf, PMIU_MAXLINE ); PMIU_printf( PMI_debug_init, "Singinit: read %s\n", buf ); PMIU_parse_keyvals( buf ); PMIU_getval( "cmd", cmd, PMIU_MAXLINE ); if (strcmp( cmd, "singinit" ) != 0) { PMIU_printf( 1, "unexpected command from PM: %s\n", cmd ); return PMI_FAIL; } p = PMIU_getval( "authtype", cmd, PMIU_MAXLINE ); if (p && strcmp( cmd, "none" ) != 0) { PMIU_printf( 1, "unsupported authentication method %s\n", cmd ); return PMI_FAIL; } /* p = PMIU_getval( "authstring", cmd, PMIU_MAXLINE ); */ /* If we're successful, send back our own singinit */ rc = MPIU_Snprintf( buf, PMIU_MAXLINE, "cmd=singinit pmi_version=%d pmi_subversion=%d stdio=yes authtype=none\n", PMI_VERSION, PMI_SUBVERSION ); if (rc < 0) { return PMI_FAIL; } PMIU_printf( PMI_debug_init, "GetResponse with %s\n", buf ); rc = GetResponse( buf, "singinit_info", 0 ); if (rc != 0) { PMIU_printf( 1, "GetResponse failed\n" ); return PMI_FAIL; } p = PMIU_getval( "versionok", cmd, PMIU_MAXLINE ); if (p && strcmp( cmd, "yes" ) != 0) { PMIU_printf( 1, "Process manager needs a different PMI version\n" ); return PMI_FAIL; } p = PMIU_getval( "stdio", cmd, PMIU_MAXLINE ); if (p && strcmp( cmd, "yes" ) == 0) { PMIU_printf( PMI_debug_init, "PM agreed to connect stdio\n" ); connectStdio = 1; } p = PMIU_getval( "kvsname", singinit_kvsname, sizeof(singinit_kvsname) ); PMIU_printf( PMI_debug_init, "kvsname to use is %s\n", singinit_kvsname ); if (connectStdio) { PMIU_printf( PMI_debug_init, "Accepting three connections for stdin, out, err\n" ); stdin_sock = accept_one_connection(singinit_listen_sock); dup2(stdin_sock, 0); stdout_sock = accept_one_connection(singinit_listen_sock); dup2(stdout_sock,1); stderr_sock = accept_one_connection(singinit_listen_sock); dup2(stderr_sock,2); } PMIU_printf( PMI_debug_init, "Done with singinit handshake\n" ); } return 0;}/* Promote PMI to a fully initialized version if it was started as a singleton init */static int PMIi_InitIfSingleton(void){ int rc; static int firstcall = 1; if (PMI_initialized != SINGLETON_INIT_BUT_NO_PM || !firstcall) return 0; /* We only try to init as a singleton the first time */ firstcall = 0; /* First, start (if necessary) an mpiexec, connect to it, and start the singleton init handshake */ rc = PMII_singinit(); if (rc < 0) return(-1); PMI_initialized = SINGLETON_INIT_WITH_PM; /* do this right away */ PMI_size = 1; PMI_rank = 0; PMI_debug = 0; PMI_spawned = 0; PMII_getmaxes( &PMI_kvsname_max, &PMI_keylen_max, &PMI_vallen_max ); /* FIXME: We need to support a distinct kvsname for each process group */ PMI_KVS_Put( singinit_kvsname, cached_singinit_key, cached_singinit_val ); return 0;}static int accept_one_connection(int list_sock){ int gotit, new_sock; struct sockaddr_in from; socklen_t len; len = sizeof(from); gotit = 0; while ( ! gotit ) { new_sock = accept(list_sock, (struct sockaddr *)&from, &len); if (new_sock == -1) { if (errno == EINTR) /* interrupted? If so, try again */ continue; else { PMIU_printf(1, "accept failed in accept_one_connection\n"); exit(-1); } } else gotit = 1; } return(new_sock);}#endif/* end USE_PMI_PORT *//* Get the FD to use for PMI operations. If a port is used, rather than a pre-established FD (i.e., via pipe), this routine will handle the initial handshake. */static int getPMIFD( int *notset ){ char *p; /* Set the default */ PMI_fd = -1; p = getenv( "PMI_FD" ); if (p) { PMI_fd = atoi( p ); return 0; }#ifdef USE_PMI_PORT p = getenv( "PMI_PORT" ); if (p) { int portnum; char hostname[MAXHOSTNAME+1]; char *pn, *ph; int id = 0; /* Connect to the indicated port (in format hostname:portnumber) and get the fd for the socket */ /* Split p into host and port */ pn = p; ph = hostname; while (*pn && *pn != ':' && (ph - hostname) < MAXHOSTNAME) { *ph++ = *pn++; } *ph = 0; if (PMI_debug) { DBG_PRINTF( ("Connecting to %s\n", p) ); } if (*pn == ':') { portnum = atoi( pn+1 ); /* FIXME: Check for valid integer after : */ /* This routine only gets the fd to use to talk to the process manager. The handshake below is used to setup the initial values */ PMI_fd = PMII_Connect_to_pm( hostname, portnum ); if (PMI_fd < 0) { PMIU_printf( 1, "Unable to connect to %s on %d\n", hostname, portnum ); return -1; } } else { PMIU_printf( 1, "unable to decode hostport from %s\n", p ); return PMI_FAIL; } /* We should first handshake to get size, rank, debug. */ p = getenv( "PMI_ID" ); if (p) { id = atoi( p ); /* PMII_Set_from_port sets up the values that are delivered by enviroment variables when a separate port is not used */ PMII_Set_from_port( PMI_fd, id ); *notset = 0; } return 0; }#endif /* Singleton init case - its ok to return success with no fd set */ return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -