📄 simple_pmi.c
字号:
int PMI_Args_to_keyval(int *argcp, char *((*argvp)[]), PMI_keyval_t **keyvalp, int *size){ return ( 0 );}int PMI_Parse_option(int num_args, char *args[], int *num_parsed, PMI_keyval_t **keyvalp, int *size){ if (num_args < 1) return PMI_ERR_INVALID_NUM_ARGS; if (args == NULL) return PMI_ERR_INVALID_ARGS; if (num_parsed == NULL) return PMI_ERR_INVALID_NUM_PARSED; if (keyvalp == NULL) return PMI_ERR_INVALID_KEYVALP; if (size == NULL) return PMI_ERR_INVALID_SIZE; *num_parsed = 0; *keyvalp = NULL; *size = 0; return PMI_SUCCESS;}int PMI_Free_keyvals(PMI_keyval_t keyvalp[], int size){ int i; if (size < 0) return PMI_ERR_INVALID_ARG; if (keyvalp == NULL && size > 0) return PMI_ERR_INVALID_ARG; if (size == 0) return PMI_SUCCESS; /* free stuff */ for (i=0; i<size; i++) { MPIU_Free(keyvalp[i].key); MPIU_Free(keyvalp[i].val); } MPIU_Free(keyvalp); return PMI_SUCCESS;}/********************* Internal routines not part of PMI interface *****************//* get a keyval pair by specific index */static int PMII_iter( const char *kvsname, const int idx, int *next_idx, char *key, int key_len, char *val, int val_len){ char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE]; int rc; /* FIXME: Check for tempbuf too short */ MPIU_Snprintf( buf, PMIU_MAXLINE, "cmd=getbyidx kvsname=%s idx=%d\n", kvsname, idx ); PMIU_writeline( PMI_fd, buf ); PMIU_readline( PMI_fd, buf, PMIU_MAXLINE ); PMIU_parse_keyvals( buf ); PMIU_getval( "cmd", cmd, PMIU_MAXLINE ); if ( strncmp( cmd, "getbyidx_results", PMIU_MAXLINE ) != 0 ) { PMIU_printf( 1, "got unexpected response to getbyidx :%s:\n", buf ); return( PMI_FAIL ); } else { PMIU_getval( "rc", buf, PMIU_MAXLINE ); rc = atoi( buf ); if ( rc == 0 ) { PMIU_getval( "nextidx", buf, PMIU_MAXLINE ); *next_idx = atoi( buf ); PMIU_getval( "key", key, PMI_keylen_max ); PMIU_getval( "val", val, PMI_vallen_max ); return( PMI_SUCCESS ); } else { PMIU_getval( "reason", buf, PMIU_MAXLINE ); if ( strncmp( buf, "no_more_keyvals", PMIU_MAXLINE ) == 0 ) { key[0] = '\0'; return( PMI_SUCCESS ); } else { PMIU_printf( 1, "iter failed; reason = %s\n", buf ); return( PMI_FAIL ); } } }}/* to get all maxes in one message */static int PMII_getmaxes( int *kvsname_max, int *keylen_max, int *vallen_max ){ char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE], errmsg[PMIU_MAXLINE]; MPIU_Snprintf( buf, PMIU_MAXLINE, "cmd=init pmi_version=%d pmi_subversion=%d\n", PMI_VERSION, PMI_SUBVERSION ); PMIU_writeline( PMI_fd, buf ); PMIU_readline( PMI_fd, buf, PMIU_MAXLINE ); PMIU_parse_keyvals( buf ); PMIU_getval( "cmd", cmd, PMIU_MAXLINE ); if ( strncmp( cmd, "response_to_init", PMIU_MAXLINE ) != 0 ) { MPIU_Snprintf(errmsg, PMIU_MAXLINE, "got unexpected response to init :%s:\n", buf ); PMI_Abort( -1, errmsg ); } else { char buf1[PMIU_MAXLINE]; PMIU_getval( "rc", buf, PMIU_MAXLINE ); if ( strncmp( buf, "0", PMIU_MAXLINE ) != 0 ) { PMIU_getval( "pmi_version", buf, PMIU_MAXLINE ); PMIU_getval( "pmi_subversion", buf1, PMIU_MAXLINE ); MPIU_Snprintf(errmsg, PMIU_MAXLINE, "pmi_version mismatch; client=%d.%d mgr=%s.%s\n", PMI_VERSION, PMI_SUBVERSION, buf, buf1 ); PMI_Abort( -1, errmsg ); } } PMIU_writeline( PMI_fd, "cmd=get_maxes\n" ); PMIU_readline( PMI_fd, buf, PMIU_MAXLINE ); PMIU_parse_keyvals( buf ); PMIU_getval( "cmd", cmd, PMIU_MAXLINE ); if ( strncmp( cmd, "maxes", PMIU_MAXLINE ) != 0 ) { PMIU_printf( 1, "got unexpected response to get_maxes :%s:\n", buf ); return( -1 ); } else { PMIU_getval( "kvsname_max", buf, PMIU_MAXLINE ); *kvsname_max = atoi( buf ); PMIU_getval( "keylen_max", buf, PMIU_MAXLINE ); *keylen_max = atoi( buf ); PMIU_getval( "vallen_max", buf, PMIU_MAXLINE ); *vallen_max = atoi( buf ); return( 0 ); }}#ifdef USE_PMI_PORT/* * This code allows a program to contact a host/port for the PMI socket. */#include <errno.h>#if defined(HAVE_SYS_TYPES_H)#include <sys/types.h>#endif#include <sys/param.h>#include <sys/socket.h>/* sockaddr_in (Internet) */#include <netinet/in.h>/* TCP_NODELAY */#include <netinet/tcp.h>/* sockaddr_un (Unix) */#include <sys/un.h>/* defs of gethostbyname */#include <netdb.h>/* fcntl, F_GET/SETFL */#include <fcntl.h>/* This is really IP!? */#ifndef TCP#define TCP 0#endif/* stub for connecting to a specified host/port instead of using a specified fd inherited from a parent process */static int PMII_Connect_to_pm( char *hostname, int portnum ){ struct hostent *hp; struct sockaddr_in sa; int fd; int optval = 1; int q_wait = 1; hp = gethostbyname( hostname ); if (!hp) { return -1; } bzero( (void *)&sa, sizeof(sa) ); /* POSIX might define h_addr_list only and node define h_addr */#ifdef HAVE_H_ADDR_LIST bcopy( (void *)hp->h_addr_list[0], (void *)&sa.sin_addr, hp->h_length);#else bcopy( (void *)hp->h_addr, (void *)&sa.sin_addr, hp->h_length);#endif sa.sin_family = hp->h_addrtype; sa.sin_port = htons( (unsigned short) portnum ); fd = socket( AF_INET, SOCK_STREAM, TCP ); if (fd < 0) { return -1; } if (setsockopt( fd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval) )) { perror( "Error calling setsockopt:" ); } /* We wait here for the connection to succeed */ if (connect( fd, (struct sockaddr *)&sa, sizeof(sa) ) < 0) { switch (errno) { case ECONNREFUSED: /* (close socket, get new socket, try again) */ if (q_wait) close(fd); return -1; case EINPROGRESS: /* (nonblocking) - select for writing. */ break; case EISCONN: /* (already connected) */ break; case ETIMEDOUT: /* timed out */ return -1; default: return -1; } } return fd;}static int PMII_Set_from_port( int fd, int id ){ char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE]; int err; /* We start by sending a startup message to the server */ if (PMI_debug) { PMIU_printf( 1, "Writing initack to destination fd %d\n", fd ); } /* Handshake and initialize from a port */ /* FIXME: Check for tempbuf too short */ MPIU_Snprintf( buf, PMIU_MAXLINE, "cmd=initack pmiid=%d\n", id ); PMIU_printf( PMI_debug, "writing on fd %d line :%s:\n", fd, buf ); err = PMIU_writeline( fd, buf ); if (err) { PMIU_printf( 1, "Error in writeline initack\n" ); return -1; } /* cmd=initack */ buf[0] = 0; PMIU_printf( PMI_debug, "reading initack\n" ); err = PMIU_readline( fd, buf, PMIU_MAXLINE ); if (err < 0) { PMIU_printf( 1, "Error reading initack on %d\n", fd ); perror( "Error on readline:" ); return -1; } PMIU_parse_keyvals( buf ); PMIU_getval( "cmd", cmd, PMIU_MAXLINE ); if ( strcmp( cmd, "initack" ) ) { PMIU_printf( 1, "got unexpected input %s\n", buf ); return -1; } /* Read, in order, size, rank, and debug. Eventually, we'll want the handshake to include a version number */ /* size */ PMIU_printf( PMI_debug, "reading size\n" ); err = PMIU_readline( fd, buf, PMIU_MAXLINE ); if (err < 0) { PMIU_printf( 1, "Error reading size on %d\n", fd ); perror( "Error on readline:" ); return -1; } PMIU_parse_keyvals( buf ); PMIU_getval( "cmd", cmd, PMIU_MAXLINE ); if ( strcmp(cmd,"set")) { PMIU_printf( 1, "got unexpected command %s in %s\n", cmd, buf ); return -1; } /* cmd=set size=n */ PMIU_getval( "size", cmd, PMIU_MAXLINE ); PMI_size = atoi(cmd); /* rank */ PMIU_printf( PMI_debug, "reading rank\n" ); err = PMIU_readline( fd, buf, PMIU_MAXLINE ); if (err < 0) { PMIU_printf( 1, "Error reading rank on %d\n", fd ); perror( "Error on readline:" ); return -1; } PMIU_parse_keyvals( buf ); PMIU_getval( "cmd", cmd, PMIU_MAXLINE ); if ( strcmp(cmd,"set")) { PMIU_printf( 1, "got unexpected command %s in %s\n", cmd, buf ); return -1; } /* cmd=set rank=n */ PMIU_getval( "rank", cmd, PMIU_MAXLINE ); PMI_rank = atoi(cmd); PMIU_Set_rank( PMI_rank ); /* debug flag */ err = PMIU_readline( fd, buf, PMIU_MAXLINE ); if (err < 0) { PMIU_printf( 1, "Error reading debug on %d\n", fd ); return -1; } PMIU_parse_keyvals( buf ); PMIU_getval( "cmd", cmd, PMIU_MAXLINE ); if ( strcmp(cmd,"set")) { PMIU_printf( 1, "got unexpected command %s in %s\n", cmd, buf ); return -1; } /* cmd=set debug=n */ PMIU_getval( "debug", cmd, PMIU_MAXLINE ); PMI_debug = atoi(cmd); if (PMI_debug) { DBG_PRINTF( ("end of handshake, rank = %d, size = %d\n", PMI_rank, PMI_size )); DBG_PRINTF( ("Completed init\n" ) ); } return 0;}static int PMII_singinit(){ int pid, rc; int singinit_listen_sock, pmi_sock, stdin_sock, stdout_sock, stderr_sock; char *newargv[8], charpid[8], port_c[8]; struct sockaddr_in sin; socklen_t len; sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; sin.sin_port = htons(0); /* anonymous port */ singinit_listen_sock = socket(AF_INET, SOCK_STREAM, 0); rc = bind(singinit_listen_sock, (struct sockaddr *)&sin ,sizeof(sin)); len = sizeof(struct sockaddr_in); rc = getsockname( singinit_listen_sock, (struct sockaddr *) &sin, &len ); MPIU_Snprintf(port_c, 8, "%d",ntohs(sin.sin_port)); rc = listen(singinit_listen_sock, 5); pid = fork(); if (pid < 0) { perror("PMII_singinit: fork failed"); exit(-1); } else if (pid == 0) { newargv[0] = "mpiexec"; newargv[1] = "-pmi_args"; newargv[2] = port_c; /* FIXME: Use a valid hostname */ newargv[3] = "default_interface"; /* default interface name, for now */ newargv[4] = "default_key"; /* default authentication key, for now */ MPIU_Snprintf(charpid, 8, "%d",getpid()); newargv[5] = charpid; newargv[6] = NULL; rc = execvp(newargv[0],newargv); perror("PMII_singinit: execv failed"); PMIU_printf(1, " This singleton init program attempted to access some feature\n"); PMIU_printf(1, " for which process manager support was required, e.g. spawn or universe_size.\n"); PMIU_printf(1, " But, the necessary mpiexec is not in your path.\n"); return(-1); } else { pmi_sock = accept_one_connection(singinit_listen_sock); PMI_fd = pmi_sock; stdin_sock = accept_one_connection(singinit_listen_sock); dup2(stdin_sock, 0); stdout_sock = accept_one_connection(singinit_listen_sock); dup2(stdout_sock,1); stderr_sock = accept_one_connection(singinit_listen_sock); dup2(stderr_sock,2); } return(0);}static int accept_one_connection(int list_sock){ int gotit, new_sock; struct sockaddr_in from; socklen_t len; len = sizeof(from); gotit = 0; while ( ! gotit ) { new_sock = accept(list_sock, (struct sockaddr *)&from, &len); if (new_sock == -1) { if (errno == EINTR) /* interrupted? If so, try again */ continue; else { PMIU_printf(1, "accept failed in accept_one_connection\n"); exit(-1); } } else gotit = 1; } return(new_sock);}#endif/* end USE_PMI_PORT */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -