⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3_progress.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 4 页
字号:
            vc->ch.fd = MPIDI_CH3I_dynamic_tmp_fd;            /* set so connect_to_root returns the correct VC */            MPIDI_CH3I_dynamic_tmp_vc = vc;                    } else {            /* this is the accept side */            /* the accept side may not have called MPI_Accept at this point and may             *  have just been in the progress engine and received the accept request.             *  Currently, we go ahead and create the new socket and ack to the             *  connector with this new sockets info.  However, MPIDI_CH3I_dynamic_tmp_fd             *  should not be set until this is dequeued from the acceptQ...             */                        int tmp_fd, no_nagle, suggested_port, real_port;            struct sctp_event_subscribe evnts;            MPIU_Size_t nb;            union MPIDI_CH3_Pkt conn_acc_pkt;            int iov_cnt = 2;            MPID_IOV conn_acc_iov[iov_cnt];            char bizcard[MPI_MAX_PORT_NAME];                        MPID_IOV* iovp = conn_acc_iov;                        /* open new socket */                        no_nagle = 1;            suggested_port = 0;            bzero(&evnts, sizeof(evnts));            evnts.sctp_data_io_event=1;            if(sctp_open_dgm_socket2(MPICH_SCTP_NUM_STREAMS,                                     0, 5, suggested_port, no_nagle,                                     &MPIDU_Sctpi_socket_bufsz, &evnts, &tmp_fd,                                     &real_port) == -1) {                mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME,                                                 __LINE__, MPI_ERR_OTHER, "**fail", 0);                                goto fn_fail;            }            vc->ch.fd = tmp_fd;            /*  the global tmp_fd shouldn't be set until dequeued from the accept             *   queue since we only allow one to occur at a time (and like the             *   spawn/multiple_ports test, they may happen out of order).             *//*             MPIDI_CH3I_dynamic_tmp_fd = tmp_fd; */            /* store port temporarily so bizcard func works. put new tmp port in to             *  pass to the connect side.             */            suggested_port = MPIDI_CH3I_listener_port;            MPIDI_CH3I_listener_port = real_port;            mpi_errno = MPIDI_CH3_Get_business_card(-1, bizcard, MPI_MAX_PORT_NAME);            /* --BEGIN ERROR HANDLING-- */            if (mpi_errno != MPI_SUCCESS) {                /* FIXME define error code */                goto fn_fail;            }            /* --END ERROR HANDLING-- */            MPIDI_CH3I_listener_port = suggested_port; /* restore */                /* get the conn_acc_pkt ready */            MPIDI_Pkt_init(&conn_acc_pkt, MPIDI_CH3I_PKT_SC_CONN_ACCEPT);             conn_acc_pkt.sc_conn_accept.bizcard_len = (int) strlen(bizcard) + 1;             conn_acc_pkt.sc_conn_accept.port_name_tag = pkt->sc_conn_accept.port_name_tag;            conn_acc_pkt.sc_conn_accept.ack = 1; /* this IS an ACK */            /* get the iov ready */            conn_acc_iov[0].MPID_IOV_BUF = (void *) &conn_acc_pkt;            conn_acc_iov[0].MPID_IOV_LEN = sizeof(conn_acc_pkt);            conn_acc_iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) bizcard;            conn_acc_iov[1].MPID_IOV_LEN = conn_acc_pkt.sc_conn_accept.bizcard_len;            /* send conn_acc_pkt (ack=1) from my new fd to other side's new fd */            for(;;) {                        mpi_errno = MPIDU_Sctp_writev_fd(tmp_fd,                                                 &vc->ch.to_address, iovp,                                                 iov_cnt, MPICH_SCTP_CTL_STREAM, 0, &nb );                /* --BEGIN ERROR HANDLING-- */                if (mpi_errno != MPI_SUCCESS) {                    goto fn_fail;                }                /* --END ERROR HANDLING-- */                                /* deliberately avoid nb < 0 */                if(nb > 0 && adjust_iov(&iovp, &iov_cnt, nb)) {                    /* done sending */                    break;                }            }                                        /* put into acceptq and signal completion so upcalls in ch3u_port.c work */            MPIDI_CH3I_Acceptq_enqueue(vc, pkt->sc_conn_accept.port_name_tag);	    MPIDI_CH3_Progress_signal_completion();        }                                      break;            }        default:		/* event has NO info at all, this is not an error (used by MPI_Test) */	break;    }fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SCTP_EVENT);    return mpi_errno; fn_fail:    goto fn_exit;}/* end MPIDI_CH3I_Progress_handle_sctp_event() */#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_delay#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Progress_delay(unsigned int completion_count){    int mpi_errno = MPI_SUCCESS;    #   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)    {	while (completion_count == MPIDI_CH3I_progress_completion_count)	{	    MPID_Thread_cond_wait(&MPIDI_CH3I_progress_completion_cond, &MPIR_Process.global_mutex);	}    }#   endif        return mpi_errno;}/* end MPIDI_CH3I_Progress_delay() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_continue#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Progress_continue(unsigned int completion_count){    int mpi_errno = MPI_SUCCESS;#   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)    {	MPID_Thread_cond_broadcast(&MPIDI_CH3I_progress_completion_cond);    }#   endif        return mpi_errno;}/* end MPIDI_CH3I_Progress_continue() */#endif /* (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL) */#define MPIDI_MAX_KVS_KEY_LEN      256/*  post_connect only retrieves the IP/PORT info from KVS now. *   it can also send the connection packet, but not required to do so now */ #undef FUNCNAME#define FUNCNAME MPIDI_CH3I_VC_post_connect#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_VC_post_connect(MPIDI_VC_t * vc){    int mpi_errno = MPI_SUCCESS;    char key[MPIDI_MAX_KVS_KEY_LEN];    char val[MPIDI_MAX_KVS_VALUE_LEN];    char host_description[MAX_HOST_DESCRIPTION_LEN];    int port;    MPIDU_Sock_ifaddr_t ifaddr;    int hasIfaddr = 0;    int rc;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_VC_POST_CONNECT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_VC_POST_CONNECT);        MPIDI_DBG_PRINTF((60, FCNAME, "entering"));    /* this should not be called more than once per VC */    MPIU_Assert(vc->ch.pkt == NULL);        MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to VC_STATE_CONNECTING");    vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTING;    /* vc->pg can be NULL for temp VCs (used in dynamic processes). if this is the case,     *  then we know here that the to_address is legit because in order to establish     *  the temp VC, a conn_acc_pkt had to have been sent during connect/accept.     */    if(vc->pg != NULL) {        /* "standard" VC */        rc = MPIU_Snprintf(key, MPIDI_MAX_KVS_KEY_LEN, "P%d-businesscard", vc->pg_rank);        /* --BEGIN ERROR HANDLING-- */        if (rc < 0 || rc > MPIDI_MAX_KVS_KEY_LEN)        {            mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,                                              FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", NULL);            goto fn_exit;        }        /* --END ERROR HANDLING-- */                mpi_errno = MPIDI_PG_GetConnString( vc->pg, vc->pg_rank, val, sizeof(val));        if (mpi_errno != MPI_SUCCESS) {            MPIU_ERR_POP(mpi_errno);        }        mpi_errno = MPIDU_Sctp_get_conninfo_from_bc( val, host_description,						 sizeof(host_description),						 &port, &ifaddr, &hasIfaddr );        if(mpi_errno) {            MPIU_ERR_POP(mpi_errno);        }        /* save the sockaddr_in */        giveMeSockAddr(ifaddr.ifaddr, port, &vc->ch.to_address);    }    /* setup the connection packet, and initialize iov arrays */        vc->ch.pkt = (void*) MPIU_Malloc(sizeof(MPIDI_CH3_Pkt_t));    if (vc->ch.pkt == NULL)    {        mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0);        goto fn_exit;    }    MPIDI_CH3_Pkt_t* temp = vc->ch.pkt;        MPIDI_Pkt_init(temp, MPIDI_CH3I_PKT_SC_OPEN_REQ);     temp-> sc_open_req.pg_id_len = (int) strlen(MPIDI_Process.my_pg->id) + 1;     temp-> sc_open_req.pg_rank = MPIR_Process.comm_world->rank;    int loop;    MPID_IOV* iov_ptr = NULL;    /* this loop may not be needed, because connection_iov is used for something more general now...  */    for(loop=0; loop < MPICH_SCTP_NUM_REQS_ACTIVE_TO_INIT; loop++) {	iov_ptr = VC_IOV(vc, loop);	iov_ptr[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) temp;	iov_ptr[0].MPID_IOV_LEN = (int) sizeof(*temp);	iov_ptr[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) MPIDI_Process.my_pg->id;	iov_ptr[1].MPID_IOV_LEN = temp-> sc_open_req.pg_id_len;		       	    }    /* init control stream early purposely */    MPIDU_Sctp_stream_init(vc, NULL, MPICH_SCTP_CTL_STREAM);      fn_exit:    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_VC_POST_CONNECT);    return mpi_errno; fn_fail:    /* --BEGIN ERROR HANDLING-- */        goto fn_exit;    /* --END ERROR HANDLING-- */}/* end MPIDI_CH3I_VC_post_connect() */#undef FUNCNAME#define FUNCNAME stream_post_sendq_req#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int stream_post_sendq_req(MPIDI_VC_t * vc, int stream){    int mpi_errno = MPI_SUCCESS;    MPID_Request* nextReq = NULL;    MPIDI_STATE_DECL(MPID_STATE_STREAM_POST_SENDQ_REQ);    MPIDI_FUNC_ENTER(MPID_STATE_STREAM_POST_SENDQ_REQ);    nextReq = MPIDI_CH3I_SendQ_head_x(vc, stream);			    if(nextReq) {	MPIDI_CH3I_SendQ_dequeue_x(vc, stream);	MPIDU_Sctp_post_writev(vc, nextReq, 0, NULL, stream);	    } else {	SEND_ACTIVE(vc, stream) = NULL;    }            MPIDI_FUNC_EXIT(MPID_STATE_STREAM_POST_SENDQ_REQ);    return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_init#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Sctp_init(void) {    char * env;    long flags;    struct sctp_initmsg initm;    struct sctp_event_subscribe evnts;    struct sockaddr_in addr;    int port = 0;    int rc;    int no_nagle;    int len;    int mpi_errno = MPI_SUCCESS;        /* FIXME TODO : change error codes to have SCTP errors */        if (MPIDI_CH3I_onetomany_fd == -1)    {        /* see if a socket buffer size is specified */        	env = getenv("MPICH_SCTP_BUFFER_SIZE");	if (env)	{	    int tmp;	    	    /* FIXME: atoi doesn't detect errors (e.g., non-digits) */	    tmp = atoi(env);	    if (tmp > 0)	    {		MPIDU_Sctpi_socket_bufsz = tmp;	    }	}                /* see if Nagle value is specified */        	no_nagle = 1;	env = getenv("MPICH_SCTP_NAGLE_ON");	if (env)	{	    int tmp;	    	    /* FIXME: atoi doesn't detect errors (e.g., non-digits) */	    tmp = atoi(env);	    if (tmp > 0)	    {                no_nagle = 0;	    }	}        /* Create a socket */                /* set up parameters for the SCTP socket */	port = 0;	bzero(&evnts, sizeof(evnts));	evnts.sctp_data_io_event=1;		MPIDU_Sctpi_socket_bufsz = 233016;	if(sctp_open_dgm_socket2(MPICH_SCTP_NUM_STREAMS,			     0, 5, port, no_nagle,			     &MPIDU_Sctpi_socket_bufsz, &evnts, &MPIDI_CH3I_onetomany_fd,				&MPIDI_CH3I_listener_port) == -1) {            mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME,                                             __LINE__, MPI_ERR_OTHER, "**fail", 0);	    goto fn_fail;	}	    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -