📄 ch3_progress.c
字号:
} /* --END ERROR HANDLING-- */ break; } }#endif } } else /* incoming data */ { int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *); int complete; reqFn = rreq->dev.OnDataAvail; if (!reqFn) { MPIU_Assert(MPIDI_Request_get_type(rreq)!=MPIDI_REQUEST_TYPE_GET_RESP); MPIDI_CH3U_Request_complete(rreq); complete = TRUE; } else { mpi_errno = reqFn( conn->vc, rreq, &complete ); if (mpi_errno) MPIU_ERR_POP(mpi_errno); } if (complete) { conn->recv_active = NULL; mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } else /* more data to be read */ {#if 1 mpi_errno = ReadMoreData( conn, rreq ); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }#else for(;;) { MPID_IOV * iovp; MPIU_Size_t nb; iovp = rreq->dev.iov; mpi_errno = MPIDU_Sock_readv(conn->sock, iovp, rreq->dev.iov_count, &nb); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|immedread", "ch3|sock|immedread %p %p %p", rreq, conn, conn->vc); goto fn_fail; } /* --END ERROR HANDLING-- */ MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE, (MPIU_DBG_FDEST,"immediate readv, vc=%p nb=%d, rreq=0x%08x", conn->vc, rreq->handle, nb)); if (nb > 0 && adjust_iov(&iovp, &rreq->dev.iov_count, nb)) { int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *); int complete; reqFn = rreq->dev.OnDataAvail; if (!reqFn) { MPIU_Assert(MPIDI_Request_get_type(rreq)!=MPIDI_REQUEST_TYPE_GET_RESP); MPIDI_CH3U_Request_complete(rreq); complete = TRUE; } else { mpi_errno = reqFn( conn->vc, rreq, &complete ); if (mpi_errno) MPIU_ERR_POP(mpi_errno); } if (complete) { /* This differs from ReadMore */ conn->recv_active = NULL; mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } break; } } else { MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE, (MPIU_DBG_FDEST,"posting readv, vc=%p, rreq=0x%08x", conn->vc, rreq->handle)); /* This is different in ReadMore */ /* conn->recv_active = rreq; -- already set to current request */ mpi_errno = MPIDU_Sock_post_readv(conn->sock, iovp, rreq->dev.iov_count, NULL); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code( mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postread", "ch3|sock|postread %p %p %p", rreq, conn, conn->vc); goto fn_fail; } /* --END ERROR HANDLING-- */ break; } }#endif } } } else if (conn->state == CONN_STATE_OPEN_LRECV_DATA) { mpi_errno = MPIDI_CH3_Sockconn_handle_connopen_event( conn ); if (mpi_errno) { MPIU_ERR_POP( mpi_errno ); } } else /* Handling some internal connection establishment or tear down packet */ { mpi_errno = MPIDI_CH3_Sockconn_handle_conn_event( conn ); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } } break; } /* END OF SOCK_OP_READ */ case MPIDU_SOCK_OP_WRITE: { MPIDI_CH3I_Connection_t * conn = (MPIDI_CH3I_Connection_t *) event->user_ptr; /* --BEGIN ERROR HANDLING-- */ if (event->error != MPI_SUCCESS) { mpi_errno = event->error; MPIU_ERR_POP(mpi_errno); } /* --END ERROR HANDLING-- */ if (conn->send_active) { MPID_Request * sreq = conn->send_active; int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *); int complete; reqFn = sreq->dev.OnDataAvail; if (!reqFn) { MPIU_Assert(MPIDI_Request_get_type(sreq)!=MPIDI_REQUEST_TYPE_GET_RESP); MPIDI_CH3U_Request_complete(sreq); complete = TRUE; } else { mpi_errno = reqFn( conn->vc, sreq, &complete ); if (mpi_errno) MPIU_ERR_POP(mpi_errno); } if (complete) { mpi_errno = connection_pop_sendq_req(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } else /* more data to send */ { for(;;) { MPID_IOV * iovp; MPIU_Size_t nb; iovp = sreq->dev.iov; mpi_errno = MPIDU_Sock_writev(conn->sock, iovp, sreq->dev.iov_count, &nb); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|immedwrite", "ch3|sock|immedwrite %p %p %p", sreq, conn, conn->vc); goto fn_fail; } /* --END ERROR HANDLING-- */ MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE, (MPIU_DBG_FDEST,"immediate writev, vc=%p, sreq=0x%08x, nb=" MPIDI_MSG_SZ_FMT, conn->vc, sreq->handle, nb)); if (nb > 0 && adjust_iov(&iovp, &sreq->dev.iov_count, nb)) { int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *); int complete; reqFn = sreq->dev.OnDataAvail; if (!reqFn) { MPIU_Assert(MPIDI_Request_get_type(sreq)!=MPIDI_REQUEST_TYPE_GET_RESP); MPIDI_CH3U_Request_complete(sreq); complete = TRUE; } else { mpi_errno = reqFn( conn->vc, sreq, &complete ); if (mpi_errno) MPIU_ERR_POP(mpi_errno); } if (complete) { mpi_errno = connection_pop_sendq_req(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } break; } } else { MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE, (MPIU_DBG_FDEST,"posting writev, vc=%p, conn=%p, sreq=0x%08x", conn->vc, conn, sreq->handle)); mpi_errno = MPIDU_Sock_post_writev(conn->sock, iovp, sreq->dev.iov_count, NULL); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code( mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postwrite", "ch3|sock|postwrite %p %p %p", sreq, conn, conn->vc); goto fn_fail; } /* --END ERROR HANDLING-- */ break; } } } } else /* finished writing internal packet header */ { /* the connection is not active yet */ mpi_errno = MPIDI_CH3_Sockconn_handle_connwrite( conn ); if (mpi_errno) { MPIU_ERR_POP( mpi_errno ); } } break; } /* END OF SOCK_OP_WRITE */ case MPIDU_SOCK_OP_ACCEPT: { mpi_errno = MPIDI_CH3_Sockconn_handle_accept_event(); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } break; } case MPIDU_SOCK_OP_CONNECT: { mpi_errno = MPIDI_CH3_Sockconn_handle_connect_event( (MPIDI_CH3I_Connection_t *) event->user_ptr, event->error ); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } break; } case MPIDU_SOCK_OP_CLOSE: { mpi_errno = MPIDI_CH3_Sockconn_handle_close_event( (MPIDI_CH3I_Connection_t *) event->user_ptr ); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } break; } case MPIDU_SOCK_OP_WAKEUP: { MPIDI_CH3_Progress_signal_completion(); /* MPIDI_CH3I_progress_completion_count++; */ break; } } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT); return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3I_Progress_handle_sock_event() */#ifdef MPICH_IS_THREADED/* Note that this routine is only called if threads are enabled; it does not need to check whether runtime threads are enabled */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_delay#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Progress_delay(unsigned int completion_count){ int mpi_errno = MPI_SUCCESS; # if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { while (completion_count == MPIDI_CH3I_progress_completion_count) { MPID_Thread_cond_wait(&MPIDI_CH3I_progress_completion_cond, &MPIR_ThreadInfo.global_mutex); } }# endif return mpi_errno;}/* end MPIDI_CH3I_Progress_delay() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_continue#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Progress_continue(unsigned int completion_count){ int mpi_errno = MPI_SUCCESS; MPIU_THREAD_CHECK_BEGIN# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { MPID_Thread_cond_broadcast(&MPIDI_CH3I_progress_completion_cond); }# endif MPIU_THREAD_CHECK_END return mpi_errno;}/* end MPIDI_CH3I_Progress_continue() */#endif /* MPICH_IS_THREADED *//* FIXME: (a) what does this do and where is it used and (b) we could replace it with a #define for the single-method case */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_VC_post_connect#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_VC_post_connect(MPIDI_VC_t * vc){ return MPIDI_CH3I_VC_post_sockconnect( vc );}/* end MPIDI_CH3I_VC_post_connect() *//* FIXME: This function also used in ch3u_connect_sock.c */#undef FUNCNAME#define FUNCNAME connection_pop_sendq_req#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int connection_pop_sendq_req(MPIDI_CH3I_Connection_t * conn){ int mpi_errno = MPI_SUCCESS; MPIDI_CH3I_VC *vcch = (MPIDI_CH3I_VC *)conn->vc->channel_private; MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POP_SENDQ_REQ); MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POP_SENDQ_REQ); /* post send of next request on the send queue */ /* FIXME: Is dequeue/get next the operation we really want? */ MPIDI_CH3I_SendQ_dequeue(vcch); conn->send_active = MPIDI_CH3I_SendQ_head(vcch); /* MT */ if (conn->send_active != NULL) { MPIU_DBG_MSG_P(CH3_CONNECT,TYPICAL,"conn=%p: Posting message from connection send queue", conn ); mpi_errno = MPIDU_Sock_post_writev(conn->sock, conn->send_active->dev.iov, conn->send_active->dev.iov_count, NULL); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } } fn_fail: MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POP_SENDQ_REQ); return mpi_errno;}#undef FUNCNAME#define FUNCNAME connection_post_recv_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int connection_post_recv_pkt(MPIDI_CH3I_Connection_t * conn){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POST_RECV_PKT); MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POST_RECV_PKT); mpi_errno = MPIDU_Sock_post_read(conn->sock, &conn->pkt, sizeof(conn->pkt), sizeof(conn->pkt), NULL); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER, "**fail"); } MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POST_RECV_PKT); return mpi_errno;}/* FIXME: What is this routine for? */#undef FUNCNAME#define FUNCNAME adjust_iov#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb){ MPID_IOV * const iov = *iovp; const int count = *countp; int offset = 0; while (offset < count) { if (iov[offset].MPID_IOV_LEN <= nb) { nb -= iov[offset].MPID_IOV_LEN; offset++; } else { iov[offset].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)((char *) iov[offset].MPID_IOV_BUF + nb); iov[offset].MPID_IOV_LEN -= nb; break; } } *iovp += offset; *countp -= offset; return (*countp == 0);}/* end adjust_iov() */static int ReadMoreData( MPIDI_CH3I_Connection_t * conn, MPID_Request *rreq ){ int mpi_errno = MPI_SUCCESS; while (1) { MPID_IOV * iovp; MPIU_Size_t nb; iovp = rreq->dev.iov; mpi_errno = MPIDU_Sock_readv(conn->sock, iovp, rreq->dev.iov_count, &nb); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|immedread", "ch3|sock|immedread %p %p %p", rreq, conn, conn->vc); goto fn_fail; } /* --END ERROR HANDLING-- */ MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE, (MPIU_DBG_FDEST,"immediate readv, vc=%p nb=" MPIDI_MSG_SZ_FMT ", rreq=0x%08x", conn->vc, nb, rreq->handle)); if (nb > 0 && adjust_iov(&iovp, &rreq->dev.iov_count, nb)) { int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *); int complete; reqFn = rreq->dev.OnDataAvail; if (!reqFn) { MPIU_Assert(MPIDI_Request_get_type(rreq)!=MPIDI_REQUEST_TYPE_GET_RESP); MPIDI_CH3U_Request_complete(rreq); complete = TRUE; } else { mpi_errno = reqFn( conn->vc, rreq, &complete ); if (mpi_errno) MPIU_ERR_POP(mpi_errno); } if (complete) { conn->recv_active = NULL; /* -- already set to NULL */ mpi_errno = connection_post_recv_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } break; } } else { MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE, (MPIU_DBG_FDEST,"posting readv, vc=%p, rreq=0x%08x", conn->vc, rreq->handle)); conn->recv_active = rreq; mpi_errno = MPIDU_Sock_post_readv(conn->sock, iovp, rreq->dev.iov_count, NULL); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code( mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postread", "ch3|sock|postread %p %p %p", rreq, conn, conn->vc); goto fn_fail; } /* --END ERROR HANDLING-- */ break; } } fn_fail: return mpi_errno;}/* * The dynamic-library interface requires a unified Progress routine. * This is that routine. */int MPIDI_CH3I_Progress( int blocking, MPID_Progress_state *state ){ int mpi_errno; if (blocking) mpi_errno = MPIDI_CH3i_Progress_wait(state); else mpi_errno = MPIDI_CH3i_Progress_test(); return mpi_errno;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -