⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 simple_pmi.c

📁 mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环境
💻 C
📖 第 1 页 / 共 3 页
字号:
int PMI_Args_to_keyval(int *argcp, char *((*argvp)[]), PMI_keyval_t **keyvalp, int *size){    return ( 0 );}int PMI_Parse_option(int num_args, char *args[], int *num_parsed, PMI_keyval_t **keyvalp, int *size){    if (num_args < 1)        return PMI_ERR_INVALID_NUM_ARGS;    if (args == NULL)        return PMI_ERR_INVALID_ARGS;    if (num_parsed == NULL)        return PMI_ERR_INVALID_NUM_PARSED;    if (keyvalp == NULL)        return PMI_ERR_INVALID_KEYVALP;    if (size == NULL)        return PMI_ERR_INVALID_SIZE;    *num_parsed = 0;    *keyvalp = NULL;    *size = 0;    return PMI_SUCCESS;}int PMI_Free_keyvals(PMI_keyval_t keyvalp[], int size){    int i;    if (size < 0)        return PMI_ERR_INVALID_ARG;    if (keyvalp == NULL && size > 0)        return PMI_ERR_INVALID_ARG;    if (size == 0)        return PMI_SUCCESS;    /* free stuff */    for (i=0; i<size; i++)    {	MPIU_Free(keyvalp[i].key);	MPIU_Free(keyvalp[i].val);    }    MPIU_Free(keyvalp);    return PMI_SUCCESS;}/********************* Internal routines not part of PMI interface *****************//* get a keyval pair by specific index */static int PMII_iter( const char *kvsname, const int idx, int *next_idx, char *key, int key_len, char *val, int val_len){    char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE];    int  rc;    /* FIXME: Check for tempbuf too short */    MPIU_Snprintf( buf, PMIU_MAXLINE, "cmd=getbyidx kvsname=%s idx=%d\n", kvsname, idx  );    PMIU_writeline( PMI_fd, buf );    PMIU_readline( PMI_fd, buf, PMIU_MAXLINE );    PMIU_parse_keyvals( buf );    PMIU_getval( "cmd", cmd, PMIU_MAXLINE );    if ( strncmp( cmd, "getbyidx_results", PMIU_MAXLINE ) != 0 ) {	PMIU_printf( 1, "got unexpected response to getbyidx :%s:\n", buf );	return( PMI_FAIL );    }    else {	PMIU_getval( "rc", buf, PMIU_MAXLINE );	rc = atoi( buf );	if ( rc == 0 ) {	    PMIU_getval( "nextidx", buf, PMIU_MAXLINE );	    *next_idx = atoi( buf );	    PMIU_getval( "key", key, PMI_keylen_max );	    PMIU_getval( "val", val, PMI_vallen_max );	    return( PMI_SUCCESS );	}	else {	    PMIU_getval( "reason", buf, PMIU_MAXLINE );	    if ( strncmp( buf, "no_more_keyvals", PMIU_MAXLINE ) == 0 ) {		key[0] = '\0';		return( PMI_SUCCESS );	    }	    else {		PMIU_printf( 1, "iter failed; reason = %s\n", buf );		return( PMI_FAIL );	    }	}    }}/* to get all maxes in one message */static int PMII_getmaxes( int *kvsname_max, int *keylen_max, int *vallen_max ){    char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE], errmsg[PMIU_MAXLINE];    MPIU_Snprintf( buf, PMIU_MAXLINE, "cmd=init pmi_version=%d pmi_subversion=%d\n",		   PMI_VERSION, PMI_SUBVERSION );    PMIU_writeline( PMI_fd, buf );    PMIU_readline( PMI_fd, buf, PMIU_MAXLINE );    PMIU_parse_keyvals( buf );    PMIU_getval( "cmd", cmd, PMIU_MAXLINE );    if ( strncmp( cmd, "response_to_init", PMIU_MAXLINE ) != 0 ) {	MPIU_Snprintf(errmsg, PMIU_MAXLINE, "got unexpected response to init :%s:\n", buf );	PMI_Abort( -1, errmsg );    }    else {	char buf1[PMIU_MAXLINE];        PMIU_getval( "rc", buf, PMIU_MAXLINE );        if ( strncmp( buf, "0", PMIU_MAXLINE ) != 0 ) {            PMIU_getval( "pmi_version", buf, PMIU_MAXLINE );            PMIU_getval( "pmi_subversion", buf1, PMIU_MAXLINE );	    MPIU_Snprintf(errmsg, PMIU_MAXLINE, "pmi_version mismatch; client=%d.%d mgr=%s.%s\n",		    PMI_VERSION, PMI_SUBVERSION, buf, buf1 );	    PMI_Abort( -1, errmsg );        }    }    PMIU_writeline( PMI_fd, "cmd=get_maxes\n" );    PMIU_readline( PMI_fd, buf, PMIU_MAXLINE );    PMIU_parse_keyvals( buf );    PMIU_getval( "cmd", cmd, PMIU_MAXLINE );    if ( strncmp( cmd, "maxes", PMIU_MAXLINE ) != 0 ) {	PMIU_printf( 1, "got unexpected response to get_maxes :%s:\n", buf );	return( -1 );    }    else {	PMIU_getval( "kvsname_max", buf, PMIU_MAXLINE );	*kvsname_max = atoi( buf );	PMIU_getval( "keylen_max", buf, PMIU_MAXLINE );	*keylen_max = atoi( buf );	PMIU_getval( "vallen_max", buf, PMIU_MAXLINE );	*vallen_max = atoi( buf );	return( 0 );    }}#ifdef USE_PMI_PORT/* * This code allows a program to contact a host/port for the PMI socket. */#include <errno.h>#if defined(HAVE_SYS_TYPES_H)#include <sys/types.h>#endif#include <sys/param.h>#include <sys/socket.h>/* sockaddr_in (Internet) */#include <netinet/in.h>/* TCP_NODELAY */#include <netinet/tcp.h>/* sockaddr_un (Unix) */#include <sys/un.h>/* defs of gethostbyname */#include <netdb.h>/* fcntl, F_GET/SETFL */#include <fcntl.h>/* This is really IP!? */#ifndef TCP#define TCP 0#endif/* stub for connecting to a specified host/port instead of using a    specified fd inherited from a parent process */static int PMII_Connect_to_pm( char *hostname, int portnum ){    struct hostent     *hp;    struct sockaddr_in sa;    int                fd;    int                optval = 1;    int                q_wait = 1;        hp = gethostbyname( hostname );    if (!hp) {	return -1;    }        bzero( (void *)&sa, sizeof(sa) );    /* POSIX might define h_addr_list only and node define h_addr */#ifdef HAVE_H_ADDR_LIST    bcopy( (void *)hp->h_addr_list[0], (void *)&sa.sin_addr, hp->h_length);#else    bcopy( (void *)hp->h_addr, (void *)&sa.sin_addr, hp->h_length);#endif    sa.sin_family = hp->h_addrtype;    sa.sin_port   = htons( (unsigned short) portnum );        fd = socket( AF_INET, SOCK_STREAM, TCP );    if (fd < 0) {	return -1;    }        if (setsockopt( fd, IPPROTO_TCP, TCP_NODELAY, 		    (char *)&optval, sizeof(optval) )) {	perror( "Error calling setsockopt:" );    }    /* We wait here for the connection to succeed */    if (connect( fd, (struct sockaddr *)&sa, sizeof(sa) ) < 0) {	switch (errno) {	case ECONNREFUSED:	    /* (close socket, get new socket, try again) */	    if (q_wait)		close(fd);	    return -1;	    	case EINPROGRESS: /*  (nonblocking) - select for writing. */	    break;	    	case EISCONN: /*  (already connected) */	    break;	    	case ETIMEDOUT: /* timed out */	    return -1;	default:	    return -1;	}    }    return fd;}static int PMII_Set_from_port( int fd, int id ){    char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE];    int err;    /* We start by sending a startup message to the server */    if (PMI_debug) {	PMIU_printf( 1, "Writing initack to destination fd %d\n", fd );    }    /* Handshake and initialize from a port */    /* FIXME: Check for tempbuf too short */    MPIU_Snprintf( buf, PMIU_MAXLINE, "cmd=initack pmiid=%d\n", id );    PMIU_printf( PMI_debug, "writing on fd %d line :%s:\n", fd, buf );    err = PMIU_writeline( fd, buf );    if (err) {	PMIU_printf( 1, "Error in writeline initack\n" );	return -1;    }    /* cmd=initack */    buf[0] = 0;    PMIU_printf( PMI_debug, "reading initack\n" );    err = PMIU_readline( fd, buf, PMIU_MAXLINE );    if (err < 0) {	PMIU_printf( 1, "Error reading initack on %d\n", fd );	perror( "Error on readline:" );	return -1;    }    PMIU_parse_keyvals( buf );    PMIU_getval( "cmd", cmd, PMIU_MAXLINE );    if ( strcmp( cmd, "initack" ) ) {	PMIU_printf( 1, "got unexpected input %s\n", buf );	return -1;    }        /* Read, in order, size, rank, and debug.  Eventually, we'll want        the handshake to include a version number */    /* size */    PMIU_printf( PMI_debug, "reading size\n" );    err = PMIU_readline( fd, buf, PMIU_MAXLINE );    if (err < 0) {	PMIU_printf( 1, "Error reading size on %d\n", fd );	perror( "Error on readline:" );	return -1;    }    PMIU_parse_keyvals( buf );    PMIU_getval( "cmd", cmd, PMIU_MAXLINE );    if ( strcmp(cmd,"set")) {	PMIU_printf( 1, "got unexpected command %s in %s\n", cmd, buf );	return -1;    }    /* cmd=set size=n */    PMIU_getval( "size", cmd, PMIU_MAXLINE );    PMI_size = atoi(cmd);    /* rank */    PMIU_printf( PMI_debug, "reading rank\n" );    err = PMIU_readline( fd, buf, PMIU_MAXLINE );    if (err < 0) {	PMIU_printf( 1, "Error reading rank on %d\n", fd );	perror( "Error on readline:" );	return -1;    }    PMIU_parse_keyvals( buf );    PMIU_getval( "cmd", cmd, PMIU_MAXLINE );    if ( strcmp(cmd,"set")) {	PMIU_printf( 1, "got unexpected command %s in %s\n", cmd, buf );	return -1;    }    /* cmd=set rank=n */    PMIU_getval( "rank", cmd, PMIU_MAXLINE );    PMI_rank = atoi(cmd);    PMIU_Set_rank( PMI_rank );    /* debug flag */    err = PMIU_readline( fd, buf, PMIU_MAXLINE );    if (err < 0) {	PMIU_printf( 1, "Error reading debug on %d\n", fd );	return -1;    }    PMIU_parse_keyvals( buf );    PMIU_getval( "cmd", cmd, PMIU_MAXLINE );    if ( strcmp(cmd,"set")) {	PMIU_printf( 1, "got unexpected command %s in %s\n", cmd, buf );	return -1;    }    /* cmd=set debug=n */    PMIU_getval( "debug", cmd, PMIU_MAXLINE );    PMI_debug = atoi(cmd);    if (PMI_debug) {	DBG_PRINTF( ("end of handshake, rank = %d, size = %d\n", 		    PMI_rank, PMI_size )); 	DBG_PRINTF( ("Completed init\n" ) );    }    return 0;}static int PMII_singinit(){    int pid, rc;    int singinit_listen_sock, pmi_sock, stdin_sock, stdout_sock, stderr_sock;    char *newargv[8], charpid[8], port_c[8];    struct sockaddr_in sin;    socklen_t len;    sin.sin_family = AF_INET;    sin.sin_addr.s_addr = INADDR_ANY;    sin.sin_port = htons(0);    /* anonymous port */    singinit_listen_sock = socket(AF_INET, SOCK_STREAM, 0);    rc = bind(singinit_listen_sock, (struct sockaddr *)&sin ,sizeof(sin));    len = sizeof(struct sockaddr_in);    rc = getsockname( singinit_listen_sock, (struct sockaddr *) &sin, &len );     MPIU_Snprintf(port_c, 8, "%d",ntohs(sin.sin_port));    rc = listen(singinit_listen_sock, 5);    pid = fork();    if (pid < 0)    {	perror("PMII_singinit: fork failed");	exit(-1);    }    else if (pid == 0)    {	newargv[0] = "mpiexec";	newargv[1] = "-pmi_args";	newargv[2] = port_c;	/* FIXME: Use a valid hostname */	newargv[3] = "default_interface";  /* default interface name, for now */	newargv[4] = "default_key";   /* default authentication key, for now */	MPIU_Snprintf(charpid, 8, "%d",getpid());	newargv[5] = charpid;	newargv[6] = NULL;	rc = execvp(newargv[0],newargv);	perror("PMII_singinit: execv failed");	PMIU_printf(1, "  This singleton init program attempted to access some feature\n");	PMIU_printf(1, "  for which process manager support was required, e.g. spawn or universe_size.\n");	PMIU_printf(1, "  But, the necessary mpiexec is not in your path.\n");	return(-1);    }    else    {	pmi_sock = accept_one_connection(singinit_listen_sock);	PMI_fd = pmi_sock;	stdin_sock  = accept_one_connection(singinit_listen_sock);	dup2(stdin_sock, 0);	stdout_sock = accept_one_connection(singinit_listen_sock);	dup2(stdout_sock,1);	stderr_sock = accept_one_connection(singinit_listen_sock);	dup2(stderr_sock,2);    }    return(0);}static int accept_one_connection(int list_sock){    int gotit, new_sock;    struct sockaddr_in from;    socklen_t len;    len = sizeof(from);    gotit = 0;    while ( ! gotit )    {	new_sock = accept(list_sock, (struct sockaddr *)&from, &len);	if (new_sock == -1)	{	    if (errno == EINTR)    /* interrupted? If so, try again */		continue;	    else	    {		PMIU_printf(1, "accept failed in accept_one_connection\n");		exit(-1);	    }	}	else	    gotit = 1;    }    return(new_sock);}#endif/* end USE_PMI_PORT */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -