📄 ch3_progress.c
字号:
vc->ch.fd = MPIDI_CH3I_dynamic_tmp_fd; /* set so connect_to_root returns the correct VC */ MPIDI_CH3I_dynamic_tmp_vc = vc; } else { /* this is the accept side */ /* the accept side may not have called MPI_Accept at this point and may * have just been in the progress engine and received the accept request. * Currently, we go ahead and create the new socket and ack to the * connector with this new sockets info. However, MPIDI_CH3I_dynamic_tmp_fd * should not be set until this is dequeued from the acceptQ... */ int tmp_fd, no_nagle, suggested_port, real_port; struct sctp_event_subscribe evnts; MPIU_Size_t nb; union MPIDI_CH3_Pkt conn_acc_pkt; int iov_cnt = 2; MPID_IOV conn_acc_iov[iov_cnt]; char bizcard[MPI_MAX_PORT_NAME]; MPID_IOV* iovp = conn_acc_iov; /* open new socket */ no_nagle = 1; suggested_port = 0; bzero(&evnts, sizeof(evnts)); evnts.sctp_data_io_event=1; if(sctp_open_dgm_socket2(MPICH_SCTP_NUM_STREAMS, 0, 5, suggested_port, no_nagle, &MPIDU_Sctpi_socket_bufsz, &evnts, &tmp_fd, &real_port) == -1) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); goto fn_fail; } vc->ch.fd = tmp_fd; /* the global tmp_fd shouldn't be set until dequeued from the accept * queue since we only allow one to occur at a time (and like the * spawn/multiple_ports test, they may happen out of order). *//* MPIDI_CH3I_dynamic_tmp_fd = tmp_fd; */ /* store port temporarily so bizcard func works. put new tmp port in to * pass to the connect side. */ suggested_port = MPIDI_CH3I_listener_port; MPIDI_CH3I_listener_port = real_port; mpi_errno = MPIDI_CH3_Get_business_card(-1, bizcard, MPI_MAX_PORT_NAME); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { /* FIXME define error code */ goto fn_fail; } /* --END ERROR HANDLING-- */ MPIDI_CH3I_listener_port = suggested_port; /* restore */ /* get the conn_acc_pkt ready */ MPIDI_Pkt_init(&conn_acc_pkt, MPIDI_CH3I_PKT_SC_CONN_ACCEPT); conn_acc_pkt.sc_conn_accept.bizcard_len = (int) strlen(bizcard) + 1; conn_acc_pkt.sc_conn_accept.port_name_tag = pkt->sc_conn_accept.port_name_tag; conn_acc_pkt.sc_conn_accept.ack = 1; /* this IS an ACK */ /* get the iov ready */ conn_acc_iov[0].MPID_IOV_BUF = (void *) &conn_acc_pkt; conn_acc_iov[0].MPID_IOV_LEN = sizeof(conn_acc_pkt); conn_acc_iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) bizcard; conn_acc_iov[1].MPID_IOV_LEN = conn_acc_pkt.sc_conn_accept.bizcard_len; /* send conn_acc_pkt (ack=1) from my new fd to other side's new fd */ for(;;) { mpi_errno = MPIDU_Sctp_writev_fd(tmp_fd, &vc->ch.to_address, iovp, iov_cnt, MPICH_SCTP_CTL_STREAM, 0, &nb ); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { goto fn_fail; } /* --END ERROR HANDLING-- */ /* deliberately avoid nb < 0 */ if(nb > 0 && adjust_iov(&iovp, &iov_cnt, nb)) { /* done sending */ break; } } /* put into acceptq and signal completion so upcalls in ch3u_port.c work */ MPIDI_CH3I_Acceptq_enqueue(vc, pkt->sc_conn_accept.port_name_tag); MPIDI_CH3_Progress_signal_completion(); } break; } default: /* event has NO info at all, this is not an error (used by MPI_Test) */ break; }fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SCTP_EVENT); return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3I_Progress_handle_sctp_event() */#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_delay#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Progress_delay(unsigned int completion_count){ int mpi_errno = MPI_SUCCESS; # if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { while (completion_count == MPIDI_CH3I_progress_completion_count) { MPID_Thread_cond_wait(&MPIDI_CH3I_progress_completion_cond, &MPIR_Process.global_mutex); } }# endif return mpi_errno;}/* end MPIDI_CH3I_Progress_delay() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_continue#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Progress_continue(unsigned int completion_count){ int mpi_errno = MPI_SUCCESS;# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { MPID_Thread_cond_broadcast(&MPIDI_CH3I_progress_completion_cond); }# endif return mpi_errno;}/* end MPIDI_CH3I_Progress_continue() */#endif /* (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL) */#define MPIDI_MAX_KVS_KEY_LEN 256/* post_connect only retrieves the IP/PORT info from KVS now. * it can also send the connection packet, but not required to do so now */ #undef FUNCNAME#define FUNCNAME MPIDI_CH3I_VC_post_connect#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_VC_post_connect(MPIDI_VC_t * vc){ int mpi_errno = MPI_SUCCESS; char key[MPIDI_MAX_KVS_KEY_LEN]; char val[MPIDI_MAX_KVS_VALUE_LEN]; char host_description[MAX_HOST_DESCRIPTION_LEN]; int port; MPIDU_Sock_ifaddr_t ifaddr; int hasIfaddr = 0; int rc; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_VC_POST_CONNECT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_VC_POST_CONNECT); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); /* this should not be called more than once per VC */ MPIU_Assert(vc->ch.pkt == NULL); MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to VC_STATE_CONNECTING"); vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTING; /* vc->pg can be NULL for temp VCs (used in dynamic processes). if this is the case, * then we know here that the to_address is legit because in order to establish * the temp VC, a conn_acc_pkt had to have been sent during connect/accept. */ if(vc->pg != NULL) { /* "standard" VC */ rc = MPIU_Snprintf(key, MPIDI_MAX_KVS_KEY_LEN, "P%d-businesscard", vc->pg_rank); /* --BEGIN ERROR HANDLING-- */ if (rc < 0 || rc > MPIDI_MAX_KVS_KEY_LEN) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", NULL); goto fn_exit; } /* --END ERROR HANDLING-- */ mpi_errno = MPIDI_PG_GetConnString( vc->pg, vc->pg_rank, val, sizeof(val)); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } mpi_errno = MPIDU_Sctp_get_conninfo_from_bc( val, host_description, sizeof(host_description), &port, &ifaddr, &hasIfaddr ); if(mpi_errno) { MPIU_ERR_POP(mpi_errno); } /* save the sockaddr_in */ giveMeSockAddr(ifaddr.ifaddr, port, &vc->ch.to_address); } /* setup the connection packet, and initialize iov arrays */ vc->ch.pkt = (void*) MPIU_Malloc(sizeof(MPIDI_CH3_Pkt_t)); if (vc->ch.pkt == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); goto fn_exit; } MPIDI_CH3_Pkt_t* temp = vc->ch.pkt; MPIDI_Pkt_init(temp, MPIDI_CH3I_PKT_SC_OPEN_REQ); temp-> sc_open_req.pg_id_len = (int) strlen(MPIDI_Process.my_pg->id) + 1; temp-> sc_open_req.pg_rank = MPIR_Process.comm_world->rank; int loop; MPID_IOV* iov_ptr = NULL; /* this loop may not be needed, because connection_iov is used for something more general now... */ for(loop=0; loop < MPICH_SCTP_NUM_REQS_ACTIVE_TO_INIT; loop++) { iov_ptr = VC_IOV(vc, loop); iov_ptr[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) temp; iov_ptr[0].MPID_IOV_LEN = (int) sizeof(*temp); iov_ptr[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) MPIDI_Process.my_pg->id; iov_ptr[1].MPID_IOV_LEN = temp-> sc_open_req.pg_id_len; } /* init control stream early purposely */ MPIDU_Sctp_stream_init(vc, NULL, MPICH_SCTP_CTL_STREAM); fn_exit: MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_VC_POST_CONNECT); return mpi_errno; fn_fail: /* --BEGIN ERROR HANDLING-- */ goto fn_exit; /* --END ERROR HANDLING-- */}/* end MPIDI_CH3I_VC_post_connect() */#undef FUNCNAME#define FUNCNAME stream_post_sendq_req#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int stream_post_sendq_req(MPIDI_VC_t * vc, int stream){ int mpi_errno = MPI_SUCCESS; MPID_Request* nextReq = NULL; MPIDI_STATE_DECL(MPID_STATE_STREAM_POST_SENDQ_REQ); MPIDI_FUNC_ENTER(MPID_STATE_STREAM_POST_SENDQ_REQ); nextReq = MPIDI_CH3I_SendQ_head_x(vc, stream); if(nextReq) { MPIDI_CH3I_SendQ_dequeue_x(vc, stream); MPIDU_Sctp_post_writev(vc, nextReq, 0, NULL, stream); } else { SEND_ACTIVE(vc, stream) = NULL; } MPIDI_FUNC_EXIT(MPID_STATE_STREAM_POST_SENDQ_REQ); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_init#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Sctp_init(void) { char * env; long flags; struct sctp_initmsg initm; struct sctp_event_subscribe evnts; struct sockaddr_in addr; int port = 0; int rc; int no_nagle; int len; int mpi_errno = MPI_SUCCESS; /* FIXME TODO : change error codes to have SCTP errors */ if (MPIDI_CH3I_onetomany_fd == -1) { /* see if a socket buffer size is specified */ env = getenv("MPICH_SCTP_BUFFER_SIZE"); if (env) { int tmp; /* FIXME: atoi doesn't detect errors (e.g., non-digits) */ tmp = atoi(env); if (tmp > 0) { MPIDU_Sctpi_socket_bufsz = tmp; } } /* see if Nagle value is specified */ no_nagle = 1; env = getenv("MPICH_SCTP_NAGLE_ON"); if (env) { int tmp; /* FIXME: atoi doesn't detect errors (e.g., non-digits) */ tmp = atoi(env); if (tmp > 0) { no_nagle = 0; } } /* Create a socket */ /* set up parameters for the SCTP socket */ port = 0; bzero(&evnts, sizeof(evnts)); evnts.sctp_data_io_event=1; MPIDU_Sctpi_socket_bufsz = 233016; if(sctp_open_dgm_socket2(MPICH_SCTP_NUM_STREAMS, 0, 5, port, no_nagle, &MPIDU_Sctpi_socket_bufsz, &evnts, &MPIDI_CH3I_onetomany_fd, &MPIDI_CH3I_listener_port) == -1) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); goto fn_fail; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -