📄 ch3_progress.c
字号:
fn_exit: return mpi_errno; /* --BEGIN ERROR HANDLING-- */ fn_fail: if (MPIDI_CH3I_onetomany_fd != -1) { close(MPIDI_CH3I_onetomany_fd); } goto fn_exit; /* --END ERROR HANDLING-- */ }#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sctp_wait(int fd, int timeout, MPIDU_Sctp_event_t * event){ int msg_flags, error, recv_amount, stream_loop, buf_sz; MPIU_Size_t sz; int mpi_errno = MPI_SUCCESS; char* buf_ptr = NULL; MPIDI_VC_t * vc; struct MPID_Request* req = NULL; struct MPID_Request* q_tail = NULL; int blocked = FALSE; buf_sz = MPIDU_Sctpi_socket_bufsz; /* can't block if we don't know where things are coming from... */ if(MPIDI_CH3I_dynamic_tmp_fd != -1) timeout = 0; /* recv buffer */ BufferNode_t* bf_node = NULL; while(MPIDU_Sctp_event_dequeue(event) != MPI_SUCCESS) { /* adjust sock mode */ if(timeout == -1 && Global_SendQ.count == 0) { sctp_setblock(fd, TRUE); blocked = TRUE; } /* READ LOOP begins */ BufferList_init(&FirstBufferNode); while((buf_ptr = request_buffer(CHUNK, &bf_node))) { error = sctp_recv(fd, buf_ptr, CHUNK, &sctp_sri, &msg_flags, &recv_amount); if(error == EAGAIN || recv_amount <= 0) { break; } mpi_errno = MPIDU_Sctp_event_enqueue(MPIDU_SCTP_OP_READ, recv_amount, &sctp_sri, fd, buf_ptr, NULL, msg_flags, MPI_SUCCESS); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); goto fn_fail; } /* --END ERROR HANDLING-- */ update_size(bf_node, recv_amount); error = 0; recv_amount = 0; if(blocked) { sctp_setblock(fd, FALSE); blocked = FALSE; break; } } /* READ LOOP ends */ sctp_setblock(fd, FALSE); /* read from dynamic_fd if it is exists and hasn't been tried already */ if(MPIDI_CH3I_dynamic_tmp_fd != -1 && fd != MPIDI_CH3I_dynamic_tmp_fd) { fd = MPIDI_CH3I_dynamic_tmp_fd; continue; } /* WRITE LOOP begins */ q_tail = Global_SendQ.tail; do { req = NULL; Global_SendQ_dequeue(req); if(req) { MPIU_Assert(SEND_ACTIVE(req->ch.vc, req->ch.stream) == req); /* keep sending until EAGAIN */ stream_loop = req->ch.stream; SCTP_IOV* iov_ptr; vc = req->ch.vc; iov_ptr = &(vc->ch.posted_iov[stream_loop]); if(POST_IOV_FLAG(iov_ptr)) { mpi_errno = MPIDU_Sctp_writev(vc, POST_IOV(iov_ptr), POST_IOV_CNT(iov_ptr), req->ch.stream, 0, &sz); } else { /* NOT an iov. do a simple write */ mpi_errno = MPIDU_Sctp_write(vc, POST_BUF(iov_ptr), POST_BUF_MIN(iov_ptr), req->ch.stream, 0, &sz); } sz = (sz < 0)? 0 : sz; /* adjust iov here, if it's done, enqueue event, else keep it * in global sendQ */ if(adjust_posted_iov(iov_ptr, sz)) { mpi_errno = MPIDU_Sctp_event_enqueue(MPIDU_SCTP_OP_WRITE, sz, NULL, vc->ch.fd, vc, NULL, req->ch.stream, MPI_SUCCESS); MPIDI_DBG_PRINTF((50, FCNAME, "wrote: %d bytes @ strm: %d", sz, req->ch.stream)); } else { /* need to put it back to globalSendQ, doesn't need to post again */ Global_SendQ_enqueue(vc, req, stream_loop); } } } while (req != q_tail); /* WRITE LOOP ends */ /* can't spin forever */ if(!SPIN(timeout)) break; } /* set fd to NON_BLOCK again */ sctp_setblock(fd, 0); fn_exit: fn_fail: return mpi_errno;}#undef FUNCNAME#define FUNCNAME adjust_posted_iov#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static inline int adjust_posted_iov(SCTP_IOV* post_ptr, MPIU_Size_t nb) { int complete = 0; int min_bytes = 0; if(POST_IOV_FLAG(post_ptr)){ complete = adjust_iov(&POST_IOV(post_ptr), &POST_IOV_CNT(post_ptr), nb); } else { min_bytes = POST_BUF_MIN(post_ptr); if(min_bytes == nb) { /* send complete */ complete = 1; } else { POST_BUF(post_ptr) += nb; POST_BUF_MIN(post_ptr) -= nb; POST_BUF_MAX(post_ptr) = POST_BUF_MIN(post_ptr); complete = 0; } } return complete;}#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_post_close#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Sctp_post_close(MPIDI_VC_t * vc){ return MPIDU_Sctp_event_enqueue(MPIDU_SCTP_OP_CLOSE, 0, NULL, vc->ch.fd, vc, 0, 0, 0);}#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_finalize#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Sctp_finalize(void){ /* need to free eventq */ if(eventq_head) { MPIDU_Sctp_free_eventq_mem(); } return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME read_from_advbuf_and_adjust#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)inline static int read_from_advbuf_and_adjust(MPIDI_VC_t* vc, int stream, int amount, char* src, MPID_Request* rreq) { int mpi_errno = MPI_SUCCESS; MPID_IOV* iovp = rreq->dev.iov; int nb = 0; int done = FALSE; int complete = FALSE; char *src_ptr; if(rreq) { nb = readv_from_advbuf(rreq, src, amount); done = adjust_req(rreq, nb); if(done) { #if 1 int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *); reqFn = rreq->dev.OnDataAvail; if (!reqFn) { MPIDI_CH3U_Request_complete(rreq); complete = TRUE; } else { /* fyi reqFn is MPIDI_CH3_ReqHandler_ReloadIOV with truncated messages */ mpi_errno = reqFn( vc, rreq, &complete ); if (mpi_errno) MPIU_ERR_POP(mpi_errno); }#else mpi_errno = MPIDI_CH3U_Handle_recv_req(vc, rreq, &complete); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }#endif if(!complete) { /* more data to be read (e.g. truncation) */ /* this is designed to work with truncation, but is it general enough? */ /* since the excess data is already in the advbuf, is this step necessary * for SCTP? these steps (above and below) allocate a tmp buffer and set * it equal to the req's iov; this tmp buffer is used merely to get the * excess data off of the internal buffers (or the kernel socket receive * buffer, in SCTP's case). the thing is, this is already done when * reading TO the advbuf, so this process of tmp buf allocation and copying * might be rework... still, for now it's in there just to model after the * ch3:sock code. */ src_ptr = src; src_ptr += nb; /* need to reset, because preceeding adjust_iov changed it */ iovp = rreq->dev.iov; nb = readv_from_advbuf(rreq, src_ptr, amount - nb); done = adjust_req(rreq, nb); /* the remaining code is kinda recursive... */ if(done) { #if 1 reqFn = rreq->dev.OnDataAvail; if (!reqFn) { MPIDI_CH3U_Request_complete(rreq); complete = TRUE; } else { mpi_errno = reqFn( vc, rreq, &complete ); if (mpi_errno) MPIU_ERR_POP(mpi_errno); }#else mpi_errno = MPIDI_CH3U_Handle_recv_req(vc, rreq, &complete); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }#endif } } if(complete) { RECV_ACTIVE(vc, stream) = NULL; } } if(!complete) { RECV_ACTIVE(vc, stream) = rreq; } } fn_exit: return mpi_errno; fn_fail: goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Channel_close#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)int MPIDI_CH3_Channel_close( void ){ /* When called, Outstanding_close_ops in ch3u_handle_connection should be zero */ /* WARNING! : Outstanding_close_ops can be zero prematurely if MPI_Comm_disconnect * is called. */ int mpi_errno = MPI_SUCCESS; /* is this code dated now that close is moved to MPIDI_CH3I_Progress_finalize for the * one-to-many socket and OP_CLOSE in the event handler? */ /* still have items in the sendQ so handle them before close */ while(sendq_total) /* FIXME might need to be more sophisticated with multiple fd's */ /* For example, if the sendq_total is non-zero, we could have * writes outstanding on multiple fd's (the "normal" one and * the tmp one used for dynamic procs) */ { int mpi_errno = MPI_SUCCESS; MPIDU_Sctp_event_t event2; mpi_errno = MPIDU_Sctp_wait(MPIDI_CH3I_onetomany_fd, MPIDU_SCTP_INFINITE_TIME, &event2); if (mpi_errno != MPI_SUCCESS) { MPIU_Assert(MPIR_ERR_GET_CLASS(mpi_errno) != MPIDU_SOCK_ERR_TIMEOUT); MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER,"**progress_sock_wait"); goto fn_fail; } mpi_errno = MPIDI_CH3I_Progress_handle_sctp_event(&event2); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|sock|handle_sock_event"); } } fn_exit: return mpi_errno; fn_fail: goto fn_exit;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -