📄 ch3_progress_sock.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "ch3i_progress.h"/* FIXME: This is nowhere set to true. The name is non-conforming if it is not static */static int shutting_down = FALSE;static inline void connection_post_send_pkt_and_pgid(MPIDI_CH3I_Connection_t * conn);static inline int connection_post_recv_pkt(MPIDI_CH3I_Connection_t * conn);static inline int connection_post_send_pkt(MPIDI_CH3I_Connection_t * conn);static inline int connection_post_sendq_req(MPIDI_CH3I_Connection_t * conn);static int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb);#undef FUNCNAME#define FUNCNAME connection_post_sendq_req#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int connection_post_sendq_req(MPIDI_CH3I_Connection_t * conn){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POST_SENDQ_REQ); MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POST_SENDQ_REQ); /* post send of next request on the send queue */ conn->send_active = MPIDI_CH3I_SendQ_head(conn->vc); /* MT */ if (conn->send_active != NULL) { mpi_errno = MPIDU_Sock_post_writev(conn->sock, conn->send_active->dev.iov, conn->send_active->dev.iov_count, NULL); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL); } } MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POST_SENDQ_REQ); return mpi_errno;}#undef FUNCNAME#define FUNCNAME connection_post_send_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int connection_post_send_pkt(MPIDI_CH3I_Connection_t * conn){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POST_SEND_PKT); MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POST_SEND_PKT); mpi_errno = MPIDU_Sock_post_write(conn->sock, &conn->pkt, sizeof(conn->pkt), sizeof(conn->pkt), NULL); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL); } /* --END ERROR HANDLING-- */ MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POST_SEND_PKT); return mpi_errno;}#undef FUNCNAME#define FUNCNAME connection_post_recv_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int connection_post_recv_pkt(MPIDI_CH3I_Connection_t * conn){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POST_RECV_PKT); MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POST_RECV_PKT); mpi_errno = MPIDU_Sock_post_read(conn->sock, &conn->pkt, sizeof(conn->pkt), sizeof(conn->pkt), NULL); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL); } /* --END ERROR HANDLING-- */ MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POST_RECV_PKT); return mpi_errno;}#undef FUNCNAME#define FUNCNAME adjust_iov#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb){ MPID_IOV * const iov = *iovp; const int count = *countp; int offset = 0; while (offset < count) { if (iov[offset].MPID_IOV_LEN <= nb) { nb -= iov[offset].MPID_IOV_LEN; offset++; } else { iov[offset].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)((char *) iov[offset].MPID_IOV_BUF + nb); iov[offset].MPID_IOV_LEN -= nb; break; } } *iovp += offset; *countp -= offset; return (*countp == 0);}#undef FUNCNAME#define FUNCNAME connection_post_send_pkt_and_pgid#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline void connection_post_send_pkt_and_pgid(MPIDI_CH3I_Connection_t * conn){ int mpi_errno; MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POST_SEND_PKT_AND_PGID); MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POST_SEND_PKT_AND_PGID); conn->iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) &conn->pkt; conn->iov[0].MPID_IOV_LEN = (int) sizeof(conn->pkt); conn->iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) MPIDI_Process.my_pg->id; conn->iov[1].MPID_IOV_LEN = (int) strlen(MPIDI_Process.my_pg->id) + 1; mpi_errno = MPIDU_Sock_post_writev(conn->sock, conn->iov, 2, NULL); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL); } /* --END ERROR HANDLING-- */ MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POST_SEND_PKT_AND_PGID);}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_handle_sock_event#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_handle_sock_event(MPIDU_Sock_event_t * event){ int complete; int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT); switch (event->op_type) { case MPIDU_SOCK_OP_READ: { MPIDI_CH3I_Connection_t * conn = (MPIDI_CH3I_Connection_t *) event->user_ptr; MPID_Request * rreq = conn->recv_active; /* --BEGIN ERROR HANDLING-- */ if (event->error != MPI_SUCCESS) { /* FIXME: the following should be handled by the close protocol */ if (!shutting_down || MPIR_ERR_GET_CLASS(event->error) != MPIDU_SOCK_ERR_CONN_CLOSED) { mpi_errno = MPIR_Err_create_code(event->error, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL); goto fn_exit; } break; } /* --END ERROR HANDLING-- */ if (conn->state == CONN_STATE_CONNECTED) { if (conn->recv_active == NULL) { MPIU_Assert(conn->pkt.type < MPIDI_CH3_PKT_END_CH3); mpi_errno = MPIDI_CH3U_Handle_recv_pkt(conn->vc, &conn->pkt, &rreq); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL); goto fn_exit; } /* --END ERROR HANDLING-- */ if (rreq == NULL) { if (conn->state != CONN_STATE_CLOSING) { /* conn->recv_active = NULL; -- already set to NULL */ mpi_errno = connection_post_recv_pkt(conn); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_INTERN, "**fail", NULL); goto fn_exit; } /* --END ERROR HANDLING-- */ } } else { for(;;) { MPID_IOV * iovp; MPIU_Size_t nb; iovp = rreq->dev.iov; mpi_errno = MPIDU_Sock_readv(conn->sock, iovp, rreq->dev.iov_count, &nb); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|immedread", "ch3|sock|immedread %p %p %p", rreq, conn, conn->vc); goto fn_exit; } /* --END ERROR HANDLING-- */ MPIDI_DBG_PRINTF((55, FCNAME, "immediate readv, vc=0x%p nb=%d, rreq=0x%08x", conn->vc, rreq->handle, nb)); if (nb > 0 && adjust_iov(&iovp, &rreq->dev.iov_count, nb)) { mpi_errno = MPIDI_CH3U_Handle_recv_req(conn->vc, rreq, &complete); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code( mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL); goto fn_exit; } /* --END ERROR HANDLING-- */ if (complete) { /* conn->recv_active = NULL; -- already set to NULL */ mpi_errno = connection_post_recv_pkt(conn); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code( mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_INTERN, "**fail", NULL); goto fn_exit; } /* --END ERROR HANDLING-- */ break; } } else { MPIDI_DBG_PRINTF((55, FCNAME, "posting readv, vc=0x%p, rreq=0x%08x", conn->vc, rreq->handle)); conn->recv_active = rreq; mpi_errno = MPIDU_Sock_post_readv(conn->sock, iovp, rreq->dev.iov_count, NULL); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code( mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postread", "ch3|sock|postread %p %p %p", rreq, conn, conn->vc); goto fn_exit; } /* --END ERROR HANDLING-- */ break; } } } } else /* incoming data */ { mpi_errno = MPIDI_CH3U_Handle_recv_req(conn->vc, rreq, &complete); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL); goto fn_exit; } /* --END ERROR HANDLING-- */ if (complete) { conn->recv_active = NULL; mpi_errno = connection_post_recv_pkt(conn); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_INTERN, "**fail", NULL); goto fn_exit; } /* --END ERROR HANDLING-- */ } else /* more data to be read */ { for(;;) { MPID_IOV * iovp; MPIU_Size_t nb; iovp = rreq->dev.iov; mpi_errno = MPIDU_Sock_readv(conn->sock, iovp, rreq->dev.iov_count, &nb); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|immedread", "ch3|sock|immedread %p %p %p", rreq, conn, conn->vc); goto fn_exit; } /* --END ERROR HANDLING-- */ MPIDI_DBG_PRINTF((55, FCNAME, "immediate readv, vc=0x%p nb=%d, rreq=0x%08x", conn->vc, rreq->handle, nb)); if (nb > 0 && adjust_iov(&iovp, &rreq->dev.iov_count, nb)) { mpi_errno = MPIDI_CH3U_Handle_recv_req(conn->vc, rreq, &complete); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code( mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL); goto fn_exit; } /* --END ERROR HANDLING-- */ if (complete) { conn->recv_active = NULL; mpi_errno = connection_post_recv_pkt(conn); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code( mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_INTERN, "**fail", NULL); goto fn_exit; } /* --END ERROR HANDLING-- */ break; } } else { MPIDI_DBG_PRINTF((55, FCNAME, "posting readv, vc=0x%p, rreq=0x%08x", conn->vc, rreq->handle)); /* conn->recv_active = rreq; -- already set to current request */ mpi_errno = MPIDU_Sock_post_readv(conn->sock, iovp, rreq->dev.iov_count, NULL); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code( mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postread", "ch3|sock|postread %p %p %p", rreq, conn, conn->vc); goto fn_exit; } /* --END ERROR HANDLING-- */ break; } } } } } else if (conn->state == CONN_STATE_OPEN_LRECV_DATA) { MPIDI_PG_t * pg; int pg_rank; MPIDI_VC_t * vc; /* Look up pg based on conn->pg_id */ mpi_errno = MPIDI_PG_Find(conn->pg_id, &pg); /* --BEGIN ERROR HANDLING-- */ if (pg == NULL) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**pglookup", "**pglookup %s", conn->pg_id); goto fn_exit; } /* --END ERROR HANDLING-- */ pg_rank = conn->pkt.sc_open_req.pg_rank; MPIDI_PG_Get_vc(pg, pg_rank, &vc); MPIU_Assert(vc->pg_rank == pg_rank); if (vc->ch.conn == NULL) { /* no head-to-head connects, accept the connection */ vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTING; vc->ch.sock = conn->sock; vc->ch.conn = conn; conn->vc = vc; MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_RESP); conn->pkt.sc_open_resp.ack = TRUE; } else { /* head to head situation */ if (pg == MPIDI_Process.my_pg) { /* the other process is in the same comm_world; just compare the ranks */ if (MPIR_Process.comm_world->rank < pg_rank) { /* accept connection */ vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTING; vc->ch.sock = conn->sock; vc->ch.conn = conn; conn->vc = vc; MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_RESP); conn->pkt.sc_open_resp.ack = TRUE; } else { /* refuse connection */ MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_RESP); conn->pkt.sc_open_resp.ack = FALSE; } } else { /* the two processes are in different comm_worlds; compare their unique pg_ids. */ if (strcmp(MPIDI_Process.my_pg->id, pg->id) < 0) { /* accept connection */ vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTING; vc->ch.sock = conn->sock; vc->ch.conn = conn; conn->vc = vc; MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_RESP); conn->pkt.sc_open_resp.ack = TRUE; } else { /* refuse connection */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -