📄 ch3u_connect_sock.c
字号:
MPIU_Assert(conn->state == CONN_STATE_LISTENING); MPIDI_CH3I_listener_conn = NULL; MPIDI_CH3I_listener_port = 0; MPIDI_CH3_Progress_signal_completion(); /* FIXME: Why is this commented out? */ /* MPIDI_CH3I_progress_completion_count++; */ } conn->sock = MPIDU_SOCK_INVALID_SOCK; MPIU_DBG_CONNSTATECHANGE(conn->vc,conn,CONN_STATE_CLOSED); conn->state = CONN_STATE_CLOSED; if (conn->vc) { MPIDI_CH3I_VC *vcch = (MPIDI_CH3I_VC *)conn->vc->channel_private; /* This step is important; without this, test disconnect_reconnect fails because the vc->ch.conn connection will continue to be used, even though the memory has been freed */ /* FIXME: There have been reports of SEGVs in the disconnect calls, so we've added checks to the fields before assuming that they're non-null. */ if (vcch && vcch->conn == conn) vcch->conn = 0; /* FIXME: If this isn't the associated connection, there may be a problem */ } connection_destroy(conn); } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_SOCKCONN_HANDLE_CLOSE_EVENT); return mpi_errno; fn_fail: goto fn_exit;}/* Cycle through the connection setup states *//* FIXME: separate out the accept and connect sides to make it easier to follow the logic */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Sockconn_handle_conn_event#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Sockconn_handle_conn_event( MPIDI_CH3I_Connection_t * conn ){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_SOCKCONN_HANDLE_CONN_EVENT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_SOCKCONN_HANDLE_CONN_EVENT); /* FIXME: Is there an assumption about conn->state? */ if (conn->pkt.type == MPIDI_CH3I_PKT_SC_OPEN_REQ) { MPIDI_CH3I_Pkt_sc_open_req_t *openpkt = (MPIDI_CH3I_Pkt_sc_open_req_t *)&conn->pkt.type; /* Answer to fixme: it appears from the control flow that this is the required state) */ MPIU_Assert( conn->state == CONN_STATE_OPEN_LRECV_PKT); MPIU_DBG_CONNSTATECHANGE(conn->vc,conn,CONN_STATE_OPEN_LRECV_DATA); conn->state = CONN_STATE_OPEN_LRECV_DATA; mpi_errno = MPIDU_Sock_post_read(conn->sock, conn->pg_id, openpkt->pg_id_len, openpkt->pg_id_len, NULL); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } else if (conn->pkt.type == MPIDI_CH3I_PKT_SC_CONN_ACCEPT) { MPIDI_VC_t *vc; MPIDI_CH3I_VC *vcch; int port_name_tag; MPIDI_CH3I_Pkt_sc_conn_accept_t *acceptpkt = (MPIDI_CH3I_Pkt_sc_conn_accept_t *)&conn->pkt.type; MPIDI_CH3I_Pkt_sc_open_resp_t *openresp = (MPIDI_CH3I_Pkt_sc_open_resp_t *)&conn->pkt.type; vc = (MPIDI_VC_t *) MPIU_Malloc(sizeof(MPIDI_VC_t)); /* --BEGIN ERROR HANDLING-- */ if (vc == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", NULL); goto fn_fail; } /* --END ERROR HANDLING-- */ /* FIXME - where does this vc get freed? */ MPIDI_VC_Init(vc, NULL, 0); vcch = (MPIDI_CH3I_VC *)vc->channel_private; MPIU_DBG_VCCHSTATECHANGE(vc,VC_STATE_CONNECTING); vcch->state = MPIDI_CH3I_VC_STATE_CONNECTING; vcch->sock = conn->sock; vcch->conn = conn; conn->vc = vc; port_name_tag = acceptpkt->port_name_tag; MPIDI_Pkt_init(openresp, MPIDI_CH3I_PKT_SC_OPEN_RESP); openresp->ack = TRUE; /* FIXME: Possible ambiguous state (two ways to get to OPEN_LSEND) */ MPIU_DBG_CONNSTATECHANGE(conn->vc,conn,CONN_STATE_OPEN_LSEND); conn->state = CONN_STATE_OPEN_LSEND; mpi_errno = connection_post_send_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_INTERN, "**ch3|sock|scconnaccept"); } /* ENQUEUE vc */ MPIDI_CH3I_Acceptq_enqueue(vc, port_name_tag); } else if (conn->pkt.type == MPIDI_CH3I_PKT_SC_OPEN_RESP) { MPIDI_CH3I_Pkt_sc_open_resp_t *openpkt = (MPIDI_CH3I_Pkt_sc_open_resp_t *)&conn->pkt.type; /* FIXME: is this the correct assert? */ MPIU_Assert( conn->state == CONN_STATE_OPEN_CRECV ); if (openpkt->ack) { MPIDI_CH3I_VC *vcch = (MPIDI_CH3I_VC *)conn->vc->channel_private; MPIU_DBG_CONNSTATECHANGE(conn->vc,conn,CONN_STATE_CONNECTED); conn->state = CONN_STATE_CONNECTED; vcch->state = MPIDI_CH3I_VC_STATE_CONNECTED; MPIU_Assert(vcch->conn == conn); MPIU_Assert(vcch->sock == conn->sock); mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } mpi_errno = connection_post_sendq_req(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_INTERN, "**ch3|sock|scopenresp"); } } else { MPIDI_CH3I_VC *vcch = (MPIDI_CH3I_VC *)conn->vc->channel_private; /* FIXME: Should conn->vc be freed? Who allocated? Why not? */ /* FIXME: Should probably reduce ref count on conn->vc */ /* FIXME: What happens to the state of the associated VC? Why isn't it changed? Is there an assert here, such as conn->vc->conn != conn (there is another connection chosen for the vc)? */ /* MPIU_Assert( conn->vc->ch.conn != conn ); */ /* Set the candidate vc for this connection to NULL (we are discarding this connection because (I think) we are performing a head-to-head connection, and this connection is being rejected in favor of the connection from the other side. */ if (vcch->conn == conn) vcch->conn = NULL; MPIU_DBG_CONNSTATECHANGE_MSG(conn->vc,conn,CONN_STATE_CLOSING, "because ack on OPEN_CRECV was false"); conn->vc = NULL; conn->state = CONN_STATE_CLOSING; /* FIXME: What does post close do here? */ MPIU_DBG_MSG(CH3_DISCONNECT,TYPICAL,"CLosing sock (Post_close)"); mpi_errno = MPIDU_Sock_post_close(conn->sock); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } } /* --BEGIN ERROR HANDLING-- */ else { MPIU_DBG_STMT(CH3_CONNECT,VERBOSE,MPIDI_DBG_Print_packet(&conn->pkt)); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_INTERN, "**ch3|sock|badpacket", "**ch3|sock|badpacket %d", conn->pkt.type); goto fn_fail; } /* --END ERROR HANDLING-- */ fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_SOCKCONN_HANDLE_CONN_EVENT); return mpi_errno; fn_fail: goto fn_exit;}/* FIXME: This should really be combined with handle_conn_event */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Sockconn_handle_connopen_event#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Sockconn_handle_connopen_event( MPIDI_CH3I_Connection_t * conn ){ int mpi_errno = MPI_SUCCESS; MPIDI_PG_t * pg; int pg_rank; MPIDI_VC_t * vc; MPIDI_CH3I_VC *vcch; MPIDI_CH3I_Pkt_sc_open_req_t *openpkt = (MPIDI_CH3I_Pkt_sc_open_req_t *)&conn->pkt.type; MPIDI_CH3I_Pkt_sc_open_resp_t *openresp = (MPIDI_CH3I_Pkt_sc_open_resp_t *)&conn->pkt.type; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_SOCKCONN_HANDLE_CONNOPEN_EVENT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_SOCKCONN_HANDLE_CONNOPEN_EVENT); /* Look up pg based on conn->pg_id */ mpi_errno = MPIDI_PG_Find(conn->pg_id, &pg); if (pg == NULL) { MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**pglookup", "**pglookup %s", conn->pg_id); } /* We require that the packet be the open_req type */ pg_rank = openpkt->pg_rank; MPIDI_PG_Get_vc(pg, pg_rank, &vc); MPIU_Assert(vc->pg_rank == pg_rank); vcch = (MPIDI_CH3I_VC *)vc->channel_private; if (vcch->conn == NULL) { /* no head-to-head connects, accept the connection */ MPIU_DBG_VCCHSTATECHANGE(vc,VC_STATE_CONNECTING); vcch->state = MPIDI_CH3I_VC_STATE_CONNECTING; vcch->sock = conn->sock; vcch->conn = conn; conn->vc = vc; MPIDI_Pkt_init(openresp, MPIDI_CH3I_PKT_SC_OPEN_RESP); openresp->ack = TRUE; } else { /* head to head situation */ if (pg == MPIDI_Process.my_pg) { /* the other process is in the same comm_world; just compare the ranks */ if (MPIR_Process.comm_world->rank < pg_rank) { /* accept connection */ MPIU_DBG_VCCHSTATECHANGE(vc,VC_STATE_CONNECTING); vcch->state = MPIDI_CH3I_VC_STATE_CONNECTING; vcch->sock = conn->sock; vcch->conn = conn; conn->vc = vc; MPIDI_Pkt_init(openresp, MPIDI_CH3I_PKT_SC_OPEN_RESP); openresp->ack = TRUE; } else { /* refuse connection */ MPIU_DBG_MSG_FMT(CH3_CONNECT,TYPICAL,(MPIU_DBG_FDEST, "vc=%p,conn=%p:Refuse head-to-head connection (my process group)",vc,conn)); MPIDI_Pkt_init(openresp, MPIDI_CH3I_PKT_SC_OPEN_RESP); openresp->ack = FALSE; } } else { /* the two processes are in different comm_worlds; compare their unique pg_ids. */ if (strcmp(MPIDI_Process.my_pg->id, pg->id) < 0) { /* accept connection */ MPIU_DBG_VCCHSTATECHANGE(vc,VC_STATE_CONNECTING); vcch->state = MPIDI_CH3I_VC_STATE_CONNECTING; vcch->sock = conn->sock; vcch->conn = conn; conn->vc = vc; MPIDI_Pkt_init(openresp, MPIDI_CH3I_PKT_SC_OPEN_RESP); openresp->ack = TRUE; } else { /* refuse connection */ MPIU_DBG_MSG_FMT(CH3_CONNECT,TYPICAL,(MPIU_DBG_FDEST, "vc=%p,conn=%p:Refuse head-to-head connection (two process groups)",vc,conn)); MPIDI_Pkt_init(openresp, MPIDI_CH3I_PKT_SC_OPEN_RESP); openresp->ack = FALSE; } } } MPIU_DBG_CONNSTATECHANGE(conn->vc,conn,CONN_STATE_OPEN_LSEND); conn->state = CONN_STATE_OPEN_LSEND; mpi_errno = connection_post_send_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_INTERN, "**ch3|sock|open_lrecv_data"); } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_SOCKCONN_HANDLE_CONNOPEN_EVENT); return mpi_errno; fn_fail: goto fn_exit;}/* FIXME: This routine is called when? What is valid in conn? */#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Sockconn_handle_connwrite#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Sockconn_handle_connwrite( MPIDI_CH3I_Connection_t * conn ){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_SOCKCONN_HANDLE_CONNWRITE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_SOCKCONN_HANDLE_CONNWRITE); if (conn->state == CONN_STATE_OPEN_CSEND) { /* finished sending open request packet */ /* post receive for open response packet */ MPIU_DBG_CONNSTATECHANGE(conn->vc,conn,CONN_STATE_OPEN_CRECV); conn->state = CONN_STATE_OPEN_CRECV; mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } else if (conn->state == CONN_STATE_OPEN_LSEND) { MPIDI_CH3I_Pkt_sc_open_resp_t *openresp = (MPIDI_CH3I_Pkt_sc_open_resp_t *)&conn->pkt.type; /* finished sending open response packet */ if (openresp->ack == TRUE) { MPIDI_CH3I_VC *vcch = (MPIDI_CH3I_VC *)conn->vc->channel_private; /* post receive for packet header */ MPIU_DBG_CONNSTATECHANGE(conn->vc,conn,CONN_STATE_CONNECTED); conn->state = CONN_STATE_CONNECTED; MPIU_DBG_VCCHSTATECHANGE(conn->vc,VC_STATE_CONNECTED); vcch->state = MPIDI_CH3I_VC_STATE_CONNECTED; mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } mpi_errno = connection_post_sendq_req(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_INTERN, "**ch3|sock|openlsend"); } } else { /* head-to-head connections - close this connection */ MPIU_DBG_CONNSTATECHANGE(conn->vc,conn,CONN_STATE_CLOSING); /* FIXME: the connect side of this sets conn->vc to NULL. Why is this different? The code that checks CONN_STATE_CLOSING uses conn == NULL to identify intentional close, which this appears to be. */ conn->state = CONN_STATE_CLOSING; MPIU_DBG_MSG(CH3_DISCONNECT,TYPICAL,"Closing sock2 (Post_close)"); mpi_errno = MPIDU_Sock_post_close(conn->sock); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**sock_post_close");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -