📄 ch3_progress.c
字号:
MPIU_ERR_POP(mpi_errno); } if (complete) { /* conn->recv_active = NULL; -- already set to NULL */ mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } break; } } else { MPIDI_DBG_PRINTF((55, FCNAME, "posting readv, vc=0x%p, rreq=0x%08x", conn->vc, rreq->handle)); conn->recv_active = rreq; mpi_errno = MPIDU_Sock_post_readv(conn->sock, iovp, rreq->dev.iov_count, NULL); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code( mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postread", "ch3|sock|postread %p %p %p", rreq, conn, conn->vc); goto fn_fail; } /* --END ERROR HANDLING-- */ break; } } } } else /* incoming data */ { mpi_errno = MPIDI_CH3U_Handle_recv_req(conn->vc, rreq, &complete); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } if (complete) { conn->recv_active = NULL; mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } else /* more data to be read */ { for(;;) { MPID_IOV * iovp; MPIU_Size_t nb; iovp = rreq->dev.iov; mpi_errno = MPIDU_Sock_readv(conn->sock, iovp, rreq->dev.iov_count, &nb); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|immedread", "ch3|sock|immedread %p %p %p", rreq, conn, conn->vc); goto fn_fail; } /* --END ERROR HANDLING-- */ MPIDI_DBG_PRINTF((55, FCNAME, "immediate readv, vc=0x%p nb=%d, rreq=0x%08x", conn->vc, rreq->handle, nb)); if (nb > 0 && adjust_iov(&iovp, &rreq->dev.iov_count, nb)) { mpi_errno = MPIDI_CH3U_Handle_recv_req(conn->vc, rreq, &complete); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } if (complete) { conn->recv_active = NULL; mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } break; } } else { MPIDI_DBG_PRINTF((55, FCNAME, "posting readv, vc=0x%p, rreq=0x%08x", conn->vc, rreq->handle)); /* conn->recv_active = rreq; -- already set to current request */ mpi_errno = MPIDU_Sock_post_readv(conn->sock, iovp, rreq->dev.iov_count, NULL); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code( mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postread", "ch3|sock|postread %p %p %p", rreq, conn, conn->vc); goto fn_fail; } /* --END ERROR HANDLING-- */ break; } } } } } else if (conn->state == CONN_STATE_OPEN_LRECV_DATA) { MPIDI_PG_t * pg; int pg_rank; MPIDI_VC_t * vc; /* Look up pg based on conn->pg_id */ mpi_errno = MPIDI_PG_Find(conn->pg_id, &pg); if (pg == NULL) { MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**pglookup", "**pglookup %s", conn->pg_id); } pg_rank = conn->pkt.sc_open_req.pg_rank; MPIDI_PG_Get_vc(pg, pg_rank, &vc); MPIU_Assert(vc->pg_rank == pg_rank); if (vc->ch.conn == NULL) { /* no head-to-head connects, accept the connection */ MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to VC_STATE_CONNECTING"); vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTING; vc->ch.sock = conn->sock; vc->ch.conn = conn; conn->vc = vc; MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_RESP); conn->pkt.sc_open_resp.ack = TRUE; } else { /* head to head situation */ if (pg == MPIDI_Process.my_pg) { /* the other process is in the same comm_world; just compare the ranks */ if (MPIR_Process.comm_world->rank < pg_rank) { /* accept connection */ MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to VC_STATE_CONNECTING"); vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTING; vc->ch.sock = conn->sock; vc->ch.conn = conn; conn->vc = vc; MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_RESP); conn->pkt.sc_open_resp.ack = TRUE; } else { /* refuse connection */ MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_RESP); conn->pkt.sc_open_resp.ack = FALSE; } } else { /* the two processes are in different comm_worlds; compare their unique pg_ids. */ if (strcmp(MPIDI_Process.my_pg->id, pg->id) < 0) { /* accept connection */ MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to VC_STATE_CONNECTING"); vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTING; vc->ch.sock = conn->sock; vc->ch.conn = conn; conn->vc = vc; MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_RESP); conn->pkt.sc_open_resp.ack = TRUE; } else { /* refuse connection */ MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_RESP); conn->pkt.sc_open_resp.ack = FALSE; } } } MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_OPEN_LSEND"); conn->state = CONN_STATE_OPEN_LSEND; mpi_errno = connection_post_send_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_INTERN, "**ch3|sock|open_lrecv_data"); } } else /* Handling some internal connection establishment or tear down packet */ { if (conn->pkt.type == MPIDI_CH3I_PKT_SC_OPEN_REQ) { MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_OPEN_LRECV_DATA"); conn->state = CONN_STATE_OPEN_LRECV_DATA; mpi_errno = MPIDU_Sock_post_read(conn->sock, conn->pg_id, conn->pkt.sc_open_req.pg_id_len, conn->pkt.sc_open_req.pg_id_len, NULL); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } else if (conn->pkt.type == MPIDI_CH3I_PKT_SC_CONN_ACCEPT) { MPIDI_VC_t *vc; vc = (MPIDI_VC_t *) MPIU_Malloc(sizeof(MPIDI_VC_t)); /* --BEGIN ERROR HANDLING-- */ if (vc == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", NULL); goto fn_fail; } /* --END ERROR HANDLING-- */ /* FIXME - where does this vc get freed? */ MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to VC_STATE_CONNECTING"); MPIDI_VC_Init(vc, NULL, 0); vc->ch.sendq_head = NULL; vc->ch.sendq_tail = NULL; vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTING; vc->ch.sock = conn->sock; vc->ch.conn = conn; conn->vc = vc; vc->ch.port_name_tag = conn->pkt.sc_conn_accept.port_name_tag; MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_RESP); conn->pkt.sc_open_resp.ack = TRUE; MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_OPEN_LSEND"); conn->state = CONN_STATE_OPEN_LSEND; mpi_errno = connection_post_send_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_INTERN, "**ch3|sock|scconnaccept"); } /* ENQUEUE vc */ MPIDI_CH3I_Acceptq_enqueue(vc); } else if (conn->pkt.type == MPIDI_CH3I_PKT_SC_OPEN_RESP) { if (conn->pkt.sc_open_resp.ack) { MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_CONNECTED"); conn->state = CONN_STATE_CONNECTED; conn->vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTED; MPIU_Assert(conn->vc->ch.conn == conn); MPIU_Assert(conn->vc->ch.sock == conn->sock); mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } mpi_errno = connection_post_sendq_req(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_INTERN, "**ch3|sock|scopenresp"); } } else { conn->vc = NULL; MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_CLOSING"); conn->state = CONN_STATE_CLOSING; MPIDU_Sock_post_close(conn->sock); } } /* --BEGIN ERROR HANDLING-- */ else { MPIDI_DBG_Print_packet(&conn->pkt); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_INTERN, "**ch3|sock|badpacket", "**ch3|sock|badpacket %d", conn->pkt.type); goto fn_fail; } /* --END ERROR HANDLING-- */ } break; } case MPIDU_SOCK_OP_WRITE: { MPIDI_CH3I_Connection_t * conn = (MPIDI_CH3I_Connection_t *) event->user_ptr; /* --BEGIN ERROR HANDLING-- */ if (event->error != MPI_SUCCESS) { mpi_errno = event->error; MPIU_ERR_POP(mpi_errno); } /* --END ERROR HANDLING-- */ if (conn->send_active) { MPID_Request * sreq = conn->send_active; mpi_errno = MPIDI_CH3U_Handle_send_req(conn->vc, sreq, &complete); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } if (complete) { MPIDI_CH3I_SendQ_dequeue(conn->vc); mpi_errno = connection_post_sendq_req(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } else /* more data to send */ { for(;;) { MPID_IOV * iovp; MPIU_Size_t nb; iovp = sreq->dev.iov; mpi_errno = MPIDU_Sock_writev(conn->sock, iovp, sreq->dev.iov_count, &nb); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|immedwrite", "ch3|sock|immedwrite %p %p %p", sreq, conn, conn->vc); goto fn_fail; } /* --END ERROR HANDLING-- */ MPIDI_DBG_PRINTF((55, FCNAME, "immediate writev, vc=0x%p, sreq=0x%08x, nb=%d", conn->vc, sreq->handle, nb)); if (nb > 0 && adjust_iov(&iovp, &sreq->dev.iov_count, nb)) { mpi_errno = MPIDI_CH3U_Handle_send_req(conn->vc, sreq, &complete); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } if (complete) { MPIDI_CH3I_SendQ_dequeue(conn->vc); mpi_errno = connection_post_sendq_req(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } break; } } else { MPIDI_DBG_PRINTF((55, FCNAME, "posting writev, vc=0x%p, sreq=0x%08x", conn->vc, sreq->handle)); mpi_errno = MPIDU_Sock_post_writev(conn->sock, iovp, sreq->dev.iov_count, NULL); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code( mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postwrite", "ch3|sock|postwrite %p %p %p", sreq, conn, conn->vc); goto fn_fail; } /* --END ERROR HANDLING-- */ break; } } } } else /* finished writing internal packet header */ { if (conn->state == CONN_STATE_OPEN_CSEND) { /* finished sending open request packet */ /* post receive for open response packet */ MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_OPEN_CRECV"); conn->state = CONN_STATE_OPEN_CRECV; mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } else if (conn->state == CONN_STATE_OPEN_LSEND) { /* finished sending open response packet */ if (conn->pkt.sc_open_resp.ack == TRUE) { /* post receive for packet header */ MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_CONNECTED"); conn->state = CONN_STATE_CONNECTED; MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to VC_STATE_CONNECTED"); conn->vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTED; mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } mpi_errno = connection_post_sendq_req(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_INTERN, "**ch3|sock|openlsend"); } } else { /* head-to-head connections - close this connection */ MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_CLOSIANG"); conn->state = CONN_STATE_CLOSING; mpi_errno = MPIDU_Sock_post_close(conn->sock); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**sock_post_close"); } } } } break; } case MPIDU_SOCK_OP_ACCEPT: { MPIDI_CH3I_Connection_t * conn; mpi_errno = MPIDI_CH3I_Connection_alloc(&conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } mpi_errno = MPIDU_Sock_accept(MPIDI_CH3I_listener_conn->sock, MPIDI_CH3I_sock_set, conn, &conn->sock); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|sock|accept"); } conn->vc = NULL; MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_OPEN_LRECV_PKT"); conn->state = CONN_STATE_OPEN_LRECV_PKT; conn->send_active = NULL; conn->recv_active = NULL; mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } break; } case MPIDU_SOCK_OP_CONNECT: { MPIDI_CH3I_Connection_t * conn = (MPIDI_CH3I_Connection_t *) event->user_ptr; /* --BEGIN ERROR HANDLING-- */ if (event->error != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code( event->error, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|connfailed", "**ch3|sock|connfailed %s %d", conn->vc->pg->id, conn->vc->pg_rank); goto fn_fail; } /* --END ERROR HANDLING-- */ if (conn->state == CONN_STATE_CONNECTING) { MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_OPEN_CSEND");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -