⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3_init.c

📁 fortran并行计算包
💻 C
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"/* extern'd in mpidi_ch3_impl.h */  /* assocID -> VC hash */HASH* MPIDI_CH3I_assocID_table;  /* lone socket for standard communications */int MPIDI_CH3I_onetomany_fd;  /* number of items waiting to be sent (all VCs) */int sendq_total;  /* event queue */struct MPIDU_Sctp_eventq_elem* eventq_head;struct MPIDU_Sctp_eventq_elem* eventq_tail;  /* for dynamic processes */MPIDI_VC_t * MPIDI_CH3I_dynamic_tmp_vc;int MPIDI_CH3I_dynamic_tmp_fd;/* need the NUM_STREAM def */#include "mpidi_ch3_pre.h"static int MPIDI_CH3U_Init_sctp(int has_parent, MPIDI_PG_t *pg_p, int pg_rank, 				char **bc_val_p, int *val_max_sz_p);static int MPIDI_CH3I_Connect_to_root_sctp(const char * port_name,                                           MPIDI_VC_t ** new_vc);/* *  MPIDI_CH3_Init  - makes sctp specific initializations.  Most of this  *                    functionality is in the MPIDI_CH3U_Init_sctp upcall  *                    because the same tasks may need to be done for   *                    future (sctp + ?) channels.   */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Init(int has_parent, MPIDI_PG_t * pg_p, int pg_rank ){    int mpi_errno = MPI_SUCCESS;    char *publish_bc_orig = NULL;    char *bc_val = NULL;    int val_max_remaining;    MPIDI_STATE_DECL(MPID_STATE_MPID_CH3_INIT);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_CH3_INIT);    mpi_errno = MPIDI_CH3I_Progress_init(MPIDI_PG_Get_size(pg_p));    if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);    /* Initialize the business card */    mpi_errno = MPIDI_CH3I_BCInit( &bc_val, &val_max_remaining );    if (mpi_errno) MPIU_ERR_POP(mpi_errno);    publish_bc_orig = bc_val;    if (mpi_errno) MPIU_ERR_POP(mpi_errno);    /* Currently, this "upcall" is a static within this file but later this could     *  go with all the others in the ch3/util directory.     */    /* initialize aspects specific to sctp  */    mpi_errno = MPIDI_CH3U_Init_sctp(has_parent, pg_p, pg_rank, &bc_val, 				     &val_max_remaining);    if (mpi_errno) MPIU_ERR_POP(mpi_errno);    /* Set the connection information in our process group        (publish the business card ) */    MPIDI_PG_SetConnInfo( pg_rank, (const char *)publish_bc_orig );    MPIDI_CH3I_BCFree( publish_bc_orig ); fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPID_CH3_INIT);    return mpi_errno; fn_fail:       if (publish_bc_orig != NULL) {        MPIU_Free(publish_bc_orig);    }               goto fn_exit;}/* This function simply tells the CH3 device to use the defaults for the    MPI Port functions */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_PortFnsInit#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_PortFnsInit( MPIDI_PortFns *portFns ) {    MPIU_UNREFERENCED_ARG(portFns);    return 0;}/* This function simply tells the CH3 device to use the defaults for the    MPI-2 RMA functions */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_RMAFnsInit#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_RMAFnsInit( MPIDI_RMAFns *a ) {     return 0;}/* Perform the channel-specific vc initialization */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_VC_Init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_VC_Init( MPIDI_VC_t *vc ) {    int i = 0;    MPIDI_CH3I_SCTP_Stream_t* str_ptr = NULL;    for(i = 0; i < MPICH_SCTP_NUM_REQS_ACTIVE_TO_INIT; i++) {	str_ptr = &(vc->ch.stream_table[i]);	STREAM_INIT(str_ptr);    }    vc->ch.fd = MPIDI_CH3I_onetomany_fd;    vc->ch.pkt = NULL;    vc->ch.pg_id = NULL;    vc->ch.state = MPIDI_CH3I_VC_STATE_UNCONNECTED;    vc->ch.send_init_count = 0;       return MPI_SUCCESS;}/* Select the routine that uses sctp to connect two communicators   using a temporary socket */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Connect_to_root#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Connect_to_root(const char * port_name, 			      MPIDI_VC_t ** new_vc){    return MPIDI_CH3I_Connect_to_root_sctp( port_name, new_vc );}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Connect_to_root_sctp#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Connect_to_root_sctp(const char * port_name, 			      MPIDI_VC_t ** new_vc){    int tmp_fd, no_nagle, port, real_port, mpi_errno = MPI_SUCCESS;    struct sctp_event_subscribe evnts;    MPIU_Size_t nb;    char host_description[MAX_HOST_DESCRIPTION_LEN];    int port_name_tag;    int bufsz = 233016;    MPIDU_Sock_ifaddr_t ifaddr;    struct sockaddr_in to_address;    int hasIfaddr = 0;    MPIDU_Sctp_event_t event2;        union MPIDI_CH3_Pkt conn_acc_pkt, *pkt;    int iov_cnt = 2;    MPID_IOV conn_acc_iov[iov_cnt];    char bizcard[MPI_MAX_PORT_NAME];                MPID_IOV* iovp = conn_acc_iov;    /* prepare a new socket for connect/accept */    no_nagle = 1;    port = 0;    bzero(&evnts, sizeof(evnts));    evnts.sctp_data_io_event=1;    if(sctp_open_dgm_socket2(MPICH_SCTP_NUM_STREAMS,			     0, 5, port, no_nagle,			     &bufsz, &evnts, &tmp_fd,                             &real_port) == -1) {        mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME,                                         __LINE__, MPI_ERR_OTHER, "**fail", 0);                /* FIXME define error code */        goto fn_fail;    }            MPIU_DBG_MSG_S(CH3_CONNECT,VERBOSE,"Connect to root with portstring %s",		   port_name );    /* obtain the sockaddr_in from the business card */    mpi_errno = MPIDU_Sctp_get_conninfo_from_bc( port_name, host_description,						 sizeof(host_description),						 &port, &ifaddr, &hasIfaddr );    if (mpi_errno) {	MPIU_ERR_POP(mpi_errno);    }    giveMeSockAddr(ifaddr.ifaddr, port, &to_address);    /* handle the port_name_tag */    mpi_errno = MPIDI_GetTagFromPort(port_name, &port_name_tag);    if (mpi_errno != MPIU_STR_SUCCESS) {	MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**argstr_port_name_tag");    }    MPIU_DBG_MSG_D(CH3_CONNECT,VERBOSE,"port tag %d",port_name_tag);    /* store port temporarily so bizcard func works. put new tmp port in to pass to     *  the accept side.     */    port = MPIDI_CH3I_listener_port;    MPIDI_CH3I_listener_port = real_port;    mpi_errno = MPIDI_CH3_Get_business_card(-1, bizcard, MPI_MAX_PORT_NAME);    /* --BEGIN ERROR HANDLING-- */    if (mpi_errno != MPI_SUCCESS) {        /* FIXME define error code */        goto fn_fail;    }    /* --END ERROR HANDLING-- */    MPIDI_CH3I_listener_port = port; /* restore */        /* get the conn_acc_pkt ready */    MPIDI_Pkt_init(&conn_acc_pkt, MPIDI_CH3I_PKT_SC_CONN_ACCEPT);     conn_acc_pkt.sc_conn_accept.bizcard_len = (int) strlen(bizcard) + 1;     conn_acc_pkt.sc_conn_accept.port_name_tag = port_name_tag;    conn_acc_pkt.sc_conn_accept.ack = 0; /* this is NOT an ACK */    /* get the iov ready */    conn_acc_iov[0].MPID_IOV_BUF = (void *) &conn_acc_pkt;    conn_acc_iov[0].MPID_IOV_LEN = sizeof(conn_acc_pkt);    conn_acc_iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) bizcard;    conn_acc_iov[1].MPID_IOV_LEN = conn_acc_pkt.sc_conn_accept.bizcard_len;    /* write on control stream now. send on the existing onetomany socket. the      *  other side won't add this to the hash because of the pkt's type.  we     *  don't want the onetomany sockets to get caught up in the VC close     *  protocol when the tmp VC is killed (so we don't want to mix the new     *  tmp fd with the "standard" ones).     */     for(;;) {                mpi_errno = MPIDU_Sctp_writev_fd(MPIDI_CH3I_onetomany_fd, &to_address, iovp,                             iov_cnt, MPICH_SCTP_CTL_STREAM, 0, &nb );        /* --BEGIN ERROR HANDLING-- */        if (mpi_errno != MPI_SUCCESS) {            goto fn_fail;        }        /* --END ERROR HANDLING-- */        /* deliberately avoid nb < 0 */        if(nb > 0 && adjust_iov(&iovp, &iov_cnt, nb)) {            /* done sending */            break;        }    }    /* for dynamic procs, we only progress one tmp connection at a time... */    MPIU_Assert(MPIDI_CH3I_dynamic_tmp_vc == NULL);    MPIDI_CH3I_dynamic_tmp_fd = tmp_fd;    /* block on tmp_fd until conn_acc_pkt is ACK'd */    while(MPIDI_CH3I_dynamic_tmp_vc == NULL) {                mpi_errno = MPIDU_Sctp_wait(tmp_fd, MPIDU_SCTP_INFINITE_TIME,                                    &event2);        if (mpi_errno != MPI_SUCCESS)        {            MPIU_Assert(MPIR_ERR_GET_CLASS(mpi_errno) != MPIDU_SOCK_ERR_TIMEOUT);            MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER,"**progress_sock_wait");            goto fn_fail;        }        /* inside handle_sctp_event, it changes         *  the read to an accept and calls itself recursively which ultimately sets         *  the value of MPIDI_CH3I_dynamic_tmp_vc.         */        mpi_errno = MPIDI_CH3I_Progress_handle_sctp_event(&event2);        if (mpi_errno != MPI_SUCCESS) {            MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,                                "**ch3|sock|handle_sock_event");        }    }        *new_vc = MPIDI_CH3I_dynamic_tmp_vc; fn_exit:    return mpi_errno; fn_fail:    goto fn_exit;    }/* MPIDI_CH3_CHANNEL_AVOIDS_SELECT is defined so we need this for dynamic processes */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Complete_Acceptq_dequeue#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Complete_Acceptq_dequeue(MPIDI_VC_t *vc) {    if(vc != NULL) {        MPIU_Assert(MPIDI_CH3I_dynamic_tmp_vc == NULL);                MPIDI_CH3I_dynamic_tmp_vc = vc;        MPIDI_CH3I_dynamic_tmp_fd = vc->ch.fd;    }        return MPI_SUCCESS;}/* This "upcall" is (temporarily?) a static here, and may be in ch3/util later. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Init_sctp#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3U_Init_sctp(int has_parent, MPIDI_PG_t *pg_p, int pg_rank, 			 char **bc_val_p, int *val_max_sz_p) {    int mpi_errno = MPI_SUCCESS;    int pmi_errno;    int pg_size;    int p, i;    MPIDI_CH3I_SCTP_Stream_t* str_ptr;    /*     * Initialize the VCs associated with this process group (and thus MPI_COMM_WORLD)     */    pmi_errno = PMI_Get_size(&pg_size);    if (pmi_errno != 0) {	MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**pmi_get_size",			     "**pmi_get_size %d", pmi_errno);    }    sendq_total = 0;        for (p = 0; p < pg_size; p++) {	MPIDI_CH3_VC_Init(&(pg_p->vct[p]));    }        mpi_errno = MPIDI_CH3U_Get_business_card_sctp(bc_val_p, val_max_sz_p);    if (mpi_errno != MPI_SUCCESS) {	MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**init_buscard");    } fn_exit:        return mpi_errno;     fn_fail:    /* --BEGIN ERROR HANDLING-- */    if (pg_p != NULL)    {	/* MPIDI_CH3I_PG_Destroy(), which is called by MPIDI_PG_Destroy(), frees pg->ch.kvs_name */	MPIDI_PG_Destroy(pg_p);    }    goto fn_exit;    /* --END ERROR HANDLING-- */}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_PG_Init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_PG_Init( MPIDI_PG_t *pg ){    return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_VC_GetStateString#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)const char * MPIDI_CH3_VC_GetStateString(struct MPIDI_VC* state ){    return NULL;}/* This routine is a hook for any operations that need to be performed before   freeing a process group */int MPIDI_CH3_PG_Destroy( struct MPIDI_PG *pg ){    return MPI_SUCCESS;}/* This routine is a hook for any operations that need to be performed before   freeing a virtual connection */int MPIDI_CH3_VC_Destroy( struct MPIDI_VC *vc ){    return MPI_SUCCESS;}/* A dummy function so that all channels provide the same set of functions,    enabling dll channels */int MPIDI_CH3_InitCompleted( void ){    return MPI_SUCCESS;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -