⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pmiport.c

📁 fortran并行计算包
💻 C
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//*   *  (C) 2003 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "pmutilconf.h"#include <stdio.h>/* Note that we need _XOPEN_SOURCE or some BSD source to get a prototype   for gethostname from unistd.h */#ifdef HAVE_UNISTD_H#include <unistd.h>#endif/* Note that we need _XOPEN_SOURCE or _SVID_SOURCE to get a prototype   for putenv from stdlib.h */#ifdef HAVE_STDLIB_H#include <stdlib.h>#endif#ifdef HAVE_STRING_H#include <string.h>#endif#ifdef HAVE_SYS_TYPES_H#include <sys/types.h>#endif/* ctype is needed for isspace and isascii (isspace is only defined for    values on which isascii returns true). */#include <ctype.h>#ifdef HAVE_SYS_SOCKET_H#include <sys/socket.h>#endif#include "pmutil.h"#include "process.h"#include "pmiserv.h"#include "ioloop.h"#ifndef MAX_PENDING_CONN#define MAX_PENDING_CONN 10#endif#ifndef MAX_HOST_NAME#define MAX_HOST_NAME 1024#endif#ifndef MAX_PORT_STRING#define MAX_PORT_STRING (MAX_HOST_NAME+15)#endifstatic int listenfd = -1;/* ----------------------------------------------------------------------- *//* Get a port for the PMI interface                                        *//* Ports can be allocated within a requested range using the runtime       *//* parameter value MPIEXEC_PORTRANGE, which has the format low:high,       *//* where both low and high are positive integers.  Unless this program is  *//* privaledged, the numbers must be greater than 1023.                     *//* Return -1 on error                                                      *//* ----------------------------------------------------------------------- */#include <errno.h>/* sockaddr_in (Internet) */#include <netinet/in.h>/* TCP_NODELAY */#include <netinet/tcp.h>#include <fcntl.h>/* This is really IP!? */#ifndef TCP#define TCP 0#endif/* Definitions for types that may be ints or may be something else *//* socklen_t - getsockname and accept */#ifndef HAVE_SOCKLEN_Ttypedef unsigned int socklen_t;#endif/* * Input Parameters: *   portLen - Number of characters in portString * Output Parameters: *   fdout - An fd that is listening for connection attempts. *           Use PMIServAcceptFromPort to process reads from this fd *   portString - The name of a port that can be used to connect to  *           this process (using connect). */int PMIServGetPort( int *fdout, char *portString, int portLen ){    int                fd = -1;    struct sockaddr_in sa;    int                optval = 1;    int                portnum;    char               *range_ptr;    int                low_port=0, high_port=0;    /* Under cygwin we may want to use 1024 as a low port number */    /* a low and high port of zero allows the system to choose        the port value */        /* Get the low and high portnumber range.  zero may be used to allow       the system to choose.  There is a priority to these values,        we keep going until we get one (and skip if none is found) */        range_ptr = getenv( "MPIEXEC_PORTRANGE" );    if (!range_ptr) {	range_ptr = getenv( "MPIEXEC_PORT_RANGE" );    }    if (!range_ptr) {	range_ptr = getenv( "MPICH_PORT_RANGE" );    }    if (range_ptr) {	char *p;	/* Look for n:m format */	p = range_ptr;	while (*p && isspace(*p)) p++;	while (*p && isdigit(*p)) low_port = 10 * low_port + (*p++ - '0');	if (*p == ':') {	    p++;	    while (*p && isdigit(*p)) high_port = 10 * high_port + (*p++ - '0');	}	if (*p) {	    MPIU_Error_printf( "Invalid character %c in MPIEXEC_PORTRANGE\n", 			       *p );	    return -1;	}    }    for (portnum=low_port; portnum<=high_port; portnum++) {	memset( (void *)&sa, 0, sizeof(sa) );	sa.sin_family	   = AF_INET;	sa.sin_port	   = htons( portnum );	sa.sin_addr.s_addr = INADDR_ANY;    	fd = socket( AF_INET, SOCK_STREAM, TCP );	if (fd < 0) {	    /* Failure; return immediately */	    return fd;	}    	if (setsockopt( fd, IPPROTO_TCP, TCP_NODELAY, 		    (char *)&optval, sizeof(optval) )) {	    MPIU_Internal_sys_error_printf( "setsockopt", errno, 0 );	}		if (bind( fd, (struct sockaddr *)&sa, sizeof(sa) ) < 0) {	    close( fd );	    fd = -1;	    if (errno != EADDRINUSE && errno != EADDRNOTAVAIL) {		return -1;	    }	}	else {	    /* Success! We have a port.  */	    break;	}    }        if (fd < 0) {	/* We were unable to find a usable port */	return -1;    }    DBG_PRINTF( ("Listening on fd %d\n", fd) );    /* Listen is a non-blocking call that enables connections */    listen( fd, MAX_PENDING_CONN );    /* Make sure that this fd doesn't get sent to the children */    fcntl( fd, F_SETFD, FD_CLOEXEC );        *fdout = fd;    if (portnum == 0) {	socklen_t sinlen = sizeof(sa);	/* We asked for *any* port, so we need to find which	   port we actually got */	getsockname( fd, (struct sockaddr *)&sa, &sinlen );	portnum = ntohs(sa.sin_port);    }    /* Create the port string */    {	char hostname[MAX_HOST_NAME+1];	hostname[0] = 0;	MPIE_GetMyHostName( hostname, sizeof(hostname) );	MPIU_Snprintf( portString, portLen, "%s:%d", hostname, portnum );    }        return 0;}/* defs of gethostbyname */#include <netdb.h>int MPIE_GetMyHostName( char myname[MAX_HOST_NAME+1], int namelen ){    struct hostent     *hp;    char *hostname = 0;    /*     * First, get network name if necessary     */    hostname = myname;    if (!myname[0]) {	gethostname( myname, namelen );    }    /* ??? */    hp = gethostbyname( hostname );    if (!hp) {	return -1;    }    return 0;}/* IO Handler for the listen socket   Respond to a connection request by creating a new socket, which is   then registered.   Initialize the startup handshake. */int PMIServAcceptFromPort( int fd, int rdwr, void *data ){    int             newfd;    struct sockaddr sock;    socklen_t       addrlen = sizeof(sock);    int             id;    ProcessUniverse *univ = (ProcessUniverse *)data;    ProcessWorld    *pWorld = univ->worlds;    ProcessApp      *app;    /* Get the new socket */    MPIE_SYSCALL(newfd,accept,( fd, &sock, &addrlen ));    DBG_PRINTF(("Acquired new socket in accept (fd = %d)\n", newfd ));    if (newfd < 0) {	DBG(perror("Error on accept: " ));	return newfd;    }#ifdef FOO    /* Mark this fd as non-blocking */    flags = fcntl( newfd, F_GETFL, 0 );    if (flags >= 0) {	flags |= O_NDELAY;	fcntl( newfd, F_SETFL, flags );    }#endif    /* Make sure that exec'd processes don't get this fd */    fcntl( newfd, F_SETFD, FD_CLOEXEC );    /* Find the matching process.  Do this by reading from the socket and        getting the id value with which process was created. */    id = PMI_Init_port_connection( newfd );    if (id >= 0) {	/* find the matching entry */	ProcessState *pState = 0;	int           nSoFar = 0;	PMIProcess   *pmiprocess;	/* This code assigns processes to the states in a pWorld	   by using the id as the rank, and finding the corresponding	   process among the ranks */	while (pWorld) {	    app = pWorld->apps;	    while (app) {		if (app->nProcess > id - nSoFar) {		    /* Found the matching app */		    pState = app->pState + (id - nSoFar);		    break;		}		else {		    nSoFar += app->nProcess;		}		app = app->nextApp;	    }	    pWorld = pWorld->nextWorld;	}	if (!pState) {	    /* We have a problem */	    MPIU_Error_printf( "Unable to find process with PMI_ID = %d in the universe", id );	    return -1;	}	/* Now, initialize the connection */	/* Create the new process structure (see PMISetupFinishInServer	   for this step when a pre-existing FD is used */	DBG_PRINTF( ("Server connection to id = %d on fd %d\n", id, newfd ));	pmiprocess = PMISetupNewProcess( newfd, pState );	PMI_Init_remote_proc( newfd, pmiprocess );	MPIE_IORegister( newfd, IO_READ, PMIServHandleInput, 			 pmiprocess );    }    else {	/* Error, the id should never be less than zero or unset */	/* An alternative would be to dynamically assign the ranks	   as processes come in (but we'd still need to use the 	   PMI_ID to identify the ProcessApp) */	DBG_PRINTF(("Found an invalid id\n" ));	return -1;    }    /* Return success. */    return 0;}/* * Setup a port and register the handler on which to listen. * Input Parameter: *   pUniv - Pointer to a process universe.  This is passed to the *           routine that is called to handle connection requests to the port * * Output Parameter: *   portString - Name of the port (of maximum size portLen) * * Return Value: *   0 on success. * * Notes: * The listenfd is global to simplify closing it once all processes have  * exited. */int PMIServSetupPort( ProcessUniverse *pUniv, char *portString, int portLen ){    int rc = 0;        rc = PMIServGetPort( &listenfd, portString, portLen );    if (rc) return rc;    rc = MPIE_IORegister( listenfd, IO_READ, PMIServAcceptFromPort, pUniv );    if (pUniv->OnNone) {	MPIU_Internal_error_printf( "pUniv.OnNone already set; cannot set to PMIServEndPort\n" );	return -1;    }    else {	pUniv->OnNone = PMIServEndPort;    }    return rc;}/* This is a signal-safe routine, used to terminate the use of the port   within the ioloop */int PMIServEndPort( void ){    DBG_PRINTF(("deregistering listenerfd %d\n", listenfd ));    MPIE_IODeregister( listenfd );    return 0;}/* ------------------------------------------------------------------------- *//* * This code allows mpiexec to connect back to a program in the * singleton init case.  This routine essentially identical to  * PMII_Connect_to_pm( char *hostname, int portnum ) in simple_pmi.c  */#include <sys/param.h>/* sockaddr_un (Unix) */#include <sys/un.h>/* This routine blocks until connected to the indicated host (by   interface name) and port.  It returns the fd, or -1 on failure */int MPIE_ConnectToPort( char *hostname, int portnum ){    struct hostent     *hp;    struct sockaddr_in sa;    int                fd;    int                optval = 1;    int                q_wait = 1;    char defaultHostname[MAX_HOST_NAME+1];            DBG_PRINTF( ("Connecting to %s:%d\n", hostname, portnum ) );    /* FIXME: simple_pmi should *never* start mpiexec with a bogus       interface name */    if (strcmp(hostname,"default_interface") == 0) {	defaultHostname[0] = 0;	MPIE_GetMyHostName( defaultHostname, sizeof(defaultHostname) );	hostname = defaultHostname;	DBG_PRINTF( ( "Connecting to %s:%d\n", hostname, portnum ) );    }    hp = gethostbyname( hostname );    if (!hp) {	return -1;    }        memset( (void *)&sa, 0, sizeof(sa) );    /* POSIX might define h_addr_list only and not define h_addr */#ifdef HAVE_H_ADDR_LIST    memcpy( (void *)&sa.sin_addr, (void *)hp->h_addr_list[0], hp->h_length);#else    memcpy( (void *)&sa.sin_addr, (void *)hp->h_addr, hp->h_length);#endif    sa.sin_family = hp->h_addrtype;    sa.sin_port   = htons( (unsigned short) portnum );        fd = socket( AF_INET, SOCK_STREAM, TCP );    if (fd < 0) {	return -1;    }        if (setsockopt( fd, IPPROTO_TCP, TCP_NODELAY, 		    (char *)&optval, sizeof(optval) )) {	perror( "Error calling setsockopt:" );    }    /* We wait here for the connection to succeed */    if (connect( fd, (struct sockaddr *)&sa, sizeof(sa) ) < 0) {	switch (errno) {	case ECONNREFUSED:	    /* (close socket, get new socket, try again) */	    if (q_wait)		close(fd);	    return -1;	    	case EINPROGRESS: /*  (nonblocking) - select for writing. */	    break;	    	case EISCONN: /*  (already connected) */	    break;	    	case ETIMEDOUT: /* timed out */	    return -1;	default:	    return -1;	}    }    /* The first message must also be received: cmd=initack */        return fd;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -