⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcp_module_init.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2006 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "tcp_module_impl.h"static int getSockInterfaceAddr( int myRank, char *ifname, int maxIfname);/* We set dbg_ifname to 1 to help debug the choice of interface name   used when determining which interface to advertise to other   processes in getSockInterfaceAddr. */static int dbg_ifname = 0;static MPID_nem_queue_t _free_queue;MPID_nem_queue_ptr_t MPID_nem_module_tcp_free_queue = 0;MPID_nem_queue_ptr_t MPID_nem_process_recv_queue = 0;MPID_nem_queue_ptr_t MPID_nem_process_free_queue = 0;mpid_nem_tcp_internal_t MPID_nem_tcp_internal_vars = {0};#undef FUNCNAME#define FUNCNAME init_tcp#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int init_tcp (MPIDI_PG_t *pg_p){    int           mpi_errno = MPI_SUCCESS;    int           ret = 0;    int           pmi_errno;    int           numprocs  = MPID_nem_mem_region.ext_procs;    unsigned int  len       = sizeof(struct sockaddr_in);    int           port      = 0 ;    int           grank;    int           index;    node_t       *nodes;    char key[MPID_NEM_MAX_KEY_VAL_LEN];    char val[MPID_NEM_MAX_KEY_VAL_LEN];    char *kvs_name;    MPIU_CHKPMEM_DECL(1);    mpi_errno = MPIDI_PG_GetConnKVSname (&kvs_name);    if (mpi_errno) MPIU_ERR_POP (mpi_errno);    /* Allocate more than used, but fill only the external ones */    MPIU_CHKPMEM_MALLOC (nodes, node_t *, sizeof (node_t) * MPID_nem_mem_region.num_procs, mpi_errno, "node struct");    MPID_nem_tcp_internal_vars.nodes = nodes;    MPID_nem_tcp_internal_vars.nb_procs = numprocs;    /* All Masters create their sockets and put their keys w/PMI */    for (index = 0 ; index < numprocs ; index++)    {	grank = MPID_nem_mem_region.ext_ranks[index];	if (grank > MPID_nem_mem_region.rank)	{	    struct sockaddr_in temp;	    char               s[255];	    const int          len2 = 255;            int                low_port, high_port;	    nodes[grank].desc = socket(AF_INET, SOCK_STREAM, 0);            /* find a port in the specified range */            /*     if the environment var is not set, low_port and high_port are unchanged */            low_port = high_port = 0;            MPIU_GetEnvRange( "MPICH_PORT_RANGE", &low_port, &high_port );            MPIU_ERR_CHKANDJUMP (low_port < 0 || low_port > high_port, mpi_errno, MPI_ERR_OTHER, "**badportrange");            /* if MPICH_PORT_RANGE is not set, low_port and high_port are 0 so bind will use any available address */            for (port = low_port; port <= high_port; ++port)            {                memset ((void *)&temp, 0, sizeof(temp));                temp.sin_family      = AF_INET;                temp.sin_addr.s_addr = htonl(INADDR_ANY);                temp.sin_port        = htons(port);	                ret = bind(nodes[grank].desc, (struct sockaddr *)&temp, len);                if (ret == -1)                {                    /* check for real error */                    MPIU_ERR_CHKANDJUMP3 (errno != EADDRINUSE && errno != EADDRNOTAVAIL, mpi_errno, MPI_ERR_OTHER, "**sock|poll|bind", "**sock|poll|bind %d %d %s", port, errno, strerror (errno));                }                else                {                    break;                }            }            /* check if an available port was found */            MPIU_ERR_CHKANDJUMP3 (ret == -1, mpi_errno, MPI_ERR_OTHER, "**sock|poll|bind", "**sock|poll|bind %d %d %s", port, errno, strerror (errno));	    ret = getsockname(nodes[grank].desc,			      (struct sockaddr *)&(nodes[grank].sock_id),			      &len);            MPIU_ERR_CHKANDJUMP1 (ret == -1, mpi_errno, MPI_ERR_OTHER, "**getsockname", "**getsockname %s", strerror (errno));	    ret = listen(nodes[grank].desc, SOMAXCONN);            MPIU_ERR_CHKANDJUMP2 (ret == -1, mpi_errno, MPI_ERR_OTHER, "**listen", "**listen %s %d", strerror (errno), errno);	    /* Put the key (machine name, port #, src , dest) with PMI */            mpi_errno = getSockInterfaceAddr(MPID_nem_mem_region.rank, s, len2);            if (mpi_errno) MPIU_ERR_POP(mpi_errno);#ifdef TRACE	    fprintf(stderr,"[%i] ID :  %s_%d_%d_%d \n",MPID_nem_mem_region.rank,s,		    ntohs(nodes[grank].sock_id.sin_port),grank,MPID_nem_mem_region.rank);#endif	    MPIU_Snprintf (val, MPID_NEM_MAX_KEY_VAL_LEN, "%d:%s", ntohs(nodes[grank].sock_id.sin_port), s);	    MPIU_Snprintf (key, MPID_NEM_MAX_KEY_VAL_LEN, "TCPkey[%d:%d]", MPID_nem_mem_region.rank, grank);	    /* Put my unique id */	    pmi_errno = PMI_KVS_Put (kvs_name, key, val);            MPIU_ERR_CHKANDJUMP1 (pmi_errno != PMI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**pmi_kvs_put", "**pmi_kvs_put %d", pmi_errno);	    pmi_errno = PMI_KVS_Commit (kvs_name);            MPIU_ERR_CHKANDJUMP1 (pmi_errno != PMI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**pmi_kvs_commit", "**pmi_kvs_commit %d", pmi_errno);	}	else if (grank < MPID_nem_mem_region.rank)	{	    struct sockaddr_in temp;            int                low_port, high_port;	    nodes[grank].desc   = socket(AF_INET, SOCK_STREAM, 0);            /* find a port in the specified range */            /*     if the environment var is not set, low_port and high_port are unchanged */            low_port = high_port = 0;            MPIU_GetEnvRange( "MPICH_PORT_RANGE", &low_port, &high_port );            MPIU_ERR_CHKANDJUMP (low_port < 0 || low_port > high_port, mpi_errno, MPI_ERR_OTHER, "**badportrange");            /* if MPICH_PORT_RANGE is not set, low_port and high_port are 0 so bind will use any available address */            for (port = low_port; port <= high_port; ++port)            {                memset ((void *)&temp, 0, sizeof(temp));                temp.sin_family      = AF_INET;                temp.sin_addr.s_addr = htonl(INADDR_ANY);                temp.sin_port        = htons(port);                ret = bind (nodes[grank].desc, (struct sockaddr *)&temp, len);                if (ret == -1)                {                    /* check for real error */                    MPIU_ERR_CHKANDJUMP3 (errno != EADDRINUSE && errno != EADDRNOTAVAIL, mpi_errno, MPI_ERR_OTHER, "**sock|poll|bind", "**sock|poll|bind %d %d %s", port, errno, strerror (errno));                }                else                {                    break;                }            }            /* check if an available port was found */            MPIU_ERR_CHKANDJUMP3 (ret == -1, mpi_errno, MPI_ERR_OTHER, "**sock|poll|bind", "**sock|poll|bind %d %d %s", port, errno, strerror (errno));	    ret = getsockname(nodes[grank].desc,			      (struct sockaddr *)&(nodes[grank].sock_id),			      &len);            MPIU_ERR_CHKANDJUMP1 (ret == -1, mpi_errno, MPI_ERR_OTHER, "**getsockname", "**getsockname %s", strerror (errno));	}    }    pmi_errno = PMI_Barrier();    MPIU_ERR_CHKANDJUMP1 (pmi_errno != PMI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**pmi_barrier", "**pmi_barrier %d", pmi_errno);#ifdef TRACE    fprintf(stderr,"[%i] ---- Creating sockets done \n",MPID_nem_mem_region.rank);#endif    /* Connect/accept sequence */    for (index = 0 ; index < numprocs ; index++)    {	grank = MPID_nem_mem_region.ext_ranks[index];	if (grank > MPID_nem_mem_region.rank)	{	    /* I am a master */#ifdef TRACE	    fprintf(stderr,"MASTER accepting sockets \n");#endif	    nodes[grank].desc = accept(nodes[grank].desc,                                       (struct sockaddr *)&(nodes[grank].sock_id),                                       &len);            MPIU_ERR_CHKANDJUMP2 (nodes[grank].desc == -1, mpi_errno, MPI_ERR_OTHER, "**sock|poll|accept", "**sock|poll|accept %d %s", errno, strerror (errno));            {                struct sockaddr_in sid;                socklen_t sidlen = sizeof(sid);                getsockname(nodes[grank].desc, (struct sockaddr *)&sid, &sidlen);            }#ifdef TRACE	    fprintf(stderr,"[%i] ====> ACCEPT DONE for GRANK %i\n",MPID_nem_mem_region.rank,grank);#endif	}	else if (grank < MPID_nem_mem_region.rank)	{	    /* I am the slave */	    struct sockaddr_in  master;	    struct hostent     *hp = NULL;	    char                s[255];	    int                 port_num;	    memset(val, 0, MPID_NEM_MAX_KEY_VAL_LEN);	    MPIU_Snprintf (key, MPID_NEM_MAX_KEY_VAL_LEN,"TCPkey[%d:%d]", grank, MPID_nem_mem_region.rank);	    pmi_errno = PMI_KVS_Get (kvs_name, key, val, MPID_NEM_MAX_KEY_VAL_LEN);            MPIU_ERR_CHKANDJUMP1 (pmi_errno != PMI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**pmi_kvs_get", "**pmi_kvs_get %d", pmi_errno);	    ret = sscanf (val, "%d:%s", &port_num, s);            MPIU_ERR_CHKANDJUMP1 (ret != 2, mpi_errno, MPI_ERR_OTHER, "**business_card", "**business_card %s", val);	    hp = gethostbyname(s);            MPIU_ERR_CHKANDJUMP1 (hp == NULL, mpi_errno, MPI_ERR_OTHER, "**gethostbyname", "**gethostbyname %d", h_errno);	    master.sin_family      = AF_INET;	    master.sin_port        = htons(port_num);	    /* POSIX might define h_addr_list only and node define h_addr */#ifdef HAVE_H_ADDR_LIST	    MPID_NEM_MEMCPY(&(master.sin_addr.s_addr), hp->h_addr_list[0], hp->h_length);#else	    MPID_NEM_MEMCPY(&(master.sin_addr.s_addr), hp->h_addr, hp->h_length);#endif	    ret = connect(nodes[grank].desc,(struct sockaddr *)&master, sizeof(master));            MPIU_ERR_CHKANDJUMP4 (ret == -1, mpi_errno, MPI_ERR_OTHER, "**sock_connect", "**sock_connect %s %d %s %d", s, port_num, strerror (errno), errno);            {                struct sockaddr_in sid;                socklen_t sidlen = sizeof(sid);                getsockname(nodes[grank].desc, (struct sockaddr *)&sid, &sidlen);            }#ifdef TRACE	    fprintf(stderr,"====> CONNECT DONE : %i\n", ret);#endif	}    }    for(index = 0 ; index < numprocs ; index++)    {	int       option = 1;        int       option2;        socklen_t size;	grank     = MPID_nem_mem_region.ext_ranks[index];	if(grank != MPID_nem_mem_region.rank)	{	    nodes[grank].internal_recv_queue.head = NULL;	    nodes[grank].internal_recv_queue.tail = NULL;	    nodes[grank].internal_free_queue.head = NULL;	    nodes[grank].internal_free_queue.tail = NULL;	    nodes[grank].left2write     = 0;	    nodes[grank].left2read_head = 0;	    nodes[grank].left2read      = 0;	    nodes[grank].toread         = 0;#ifdef TRACE	    fprintf(stderr,"[%i] ----- DESC %i is %i ------ \n",		    MPID_nem_mem_region.rank,grank,		    nodes[grank].desc);#endif	    FD_SET(nodes[grank].desc, &MPID_nem_tcp_internal_vars.set);	    if(nodes[grank].desc > MPID_nem_tcp_internal_vars.max_fd)		MPID_nem_tcp_internal_vars.max_fd = nodes[grank].desc ;	    ret = fcntl(nodes[grank].desc, F_SETFL, O_NONBLOCK);            MPIU_ERR_CHKANDJUMP1 (ret == -1, mpi_errno, MPI_ERR_OTHER, "**fcntl", "**fcntl %s", strerror (errno));	    ret = setsockopt( nodes[grank].desc,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -