⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3_progress.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 4 页
字号:
      fn_exit:    return mpi_errno;    /* --BEGIN ERROR HANDLING-- */  fn_fail:    if (MPIDI_CH3I_onetomany_fd != -1)    { 	close(MPIDI_CH3I_onetomany_fd);    }    goto fn_exit;    /* --END ERROR HANDLING-- */   }#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDU_Sctp_wait(int fd, int timeout, MPIDU_Sctp_event_t * event){    int msg_flags, error, recv_amount, stream_loop, buf_sz;    MPIU_Size_t sz;    int mpi_errno = MPI_SUCCESS;    char* buf_ptr = NULL;    MPIDI_VC_t * vc;    struct MPID_Request* req = NULL;    struct MPID_Request* q_tail = NULL;    int blocked = FALSE;    buf_sz = MPIDU_Sctpi_socket_bufsz;    /* can't block if we don't know where things are coming from... */    if(MPIDI_CH3I_dynamic_tmp_fd != -1)        timeout = 0;        /* recv buffer */    BufferNode_t* bf_node = NULL;      while(MPIDU_Sctp_event_dequeue(event) != MPI_SUCCESS) {	/* adjust sock mode */	if(timeout == -1 && Global_SendQ.count == 0) {	    sctp_setblock(fd, TRUE);	    blocked = TRUE;	}	/* READ LOOP begins */	BufferList_init(&FirstBufferNode);	while((buf_ptr = request_buffer(CHUNK, &bf_node))) {	    error = sctp_recv(fd, buf_ptr, CHUNK, &sctp_sri,				    &msg_flags, &recv_amount);	    if(error == EAGAIN || recv_amount <= 0) {		break;	    }            	    mpi_errno = MPIDU_Sctp_event_enqueue(MPIDU_SCTP_OP_READ,				     recv_amount, &sctp_sri, fd, buf_ptr,				     NULL, msg_flags, MPI_SUCCESS);            /* --BEGIN ERROR HANDLING-- */            if (mpi_errno != MPI_SUCCESS)            {                mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME,                                                 __LINE__, MPI_ERR_OTHER, "**fail", 0);                goto fn_fail;            }            /* --END ERROR HANDLING-- */                            	    update_size(bf_node, recv_amount);	    	    error = 0;	    recv_amount = 0;	    	    if(blocked) {		sctp_setblock(fd, FALSE);		blocked = FALSE;		break;	    }	    	}	/* READ LOOP ends */        	sctp_setblock(fd, FALSE);        /* read from dynamic_fd if it is exists and hasn't been tried already */        if(MPIDI_CH3I_dynamic_tmp_fd != -1 && fd != MPIDI_CH3I_dynamic_tmp_fd) {            fd = MPIDI_CH3I_dynamic_tmp_fd;            continue;        }	/* WRITE LOOP begins */	q_tail = Global_SendQ.tail;	do {	    req = NULL;	    Global_SendQ_dequeue(req);	    if(req) {		MPIU_Assert(SEND_ACTIVE(req->ch.vc, req->ch.stream) == req);	   		/* keep sending until EAGAIN */		stream_loop = req->ch.stream;		SCTP_IOV* iov_ptr;		vc = req->ch.vc;		iov_ptr = &(vc->ch.posted_iov[stream_loop]);		if(POST_IOV_FLAG(iov_ptr)) {		    mpi_errno = MPIDU_Sctp_writev(vc, POST_IOV(iov_ptr),						  POST_IOV_CNT(iov_ptr), req->ch.stream, 0, &sz);		} else {		    /*  NOT an iov. do a simple write */		    mpi_errno = MPIDU_Sctp_write(vc, POST_BUF(iov_ptr),						 POST_BUF_MIN(iov_ptr), req->ch.stream, 0, &sz);		    		}				sz = (sz < 0)? 0 : sz;		/* adjust iov here, if it's done, enqueue event, else keep it                 *  in global sendQ                 */		if(adjust_posted_iov(iov_ptr, sz)) {		    mpi_errno = MPIDU_Sctp_event_enqueue(MPIDU_SCTP_OP_WRITE,							 sz, NULL, vc->ch.fd, vc, NULL,							 req->ch.stream, MPI_SUCCESS);		    MPIDI_DBG_PRINTF((50, FCNAME, "wrote: %d bytes @ strm: %d", sz, req->ch.stream));		}		else {		    /* need to put it back to globalSendQ, doesn't need to post again */		    Global_SendQ_enqueue(vc, req, stream_loop);		}	    }	    	} while (req != q_tail);	/* WRITE LOOP ends */	/* can't spin forever */	if(!SPIN(timeout))	    break;    }     /* set fd to NON_BLOCK again */    sctp_setblock(fd, 0); fn_exit: fn_fail:    return mpi_errno;}#undef FUNCNAME#define FUNCNAME adjust_posted_iov#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static inline int adjust_posted_iov(SCTP_IOV* post_ptr, MPIU_Size_t nb) {    int complete = 0;    int min_bytes = 0;    if(POST_IOV_FLAG(post_ptr)){	complete = adjust_iov(&POST_IOV(post_ptr), &POST_IOV_CNT(post_ptr),			      nb);    } else {	min_bytes = POST_BUF_MIN(post_ptr);	if(min_bytes == nb) {	    /* send complete */	    complete = 1;	} else {	    POST_BUF(post_ptr) += nb;	    POST_BUF_MIN(post_ptr) -= nb;	    POST_BUF_MAX(post_ptr) = POST_BUF_MIN(post_ptr);	    complete = 0;	}	    }    return complete;}#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_post_close#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Sctp_post_close(MPIDI_VC_t * vc){    return MPIDU_Sctp_event_enqueue(MPIDU_SCTP_OP_CLOSE, 0, NULL, vc->ch.fd, vc, 0, 0, 0);}#undef FUNCNAME#define FUNCNAME MPIDU_Sctp_finalize#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Sctp_finalize(void){    /* need to free eventq  */    if(eventq_head)    {    	MPIDU_Sctp_free_eventq_mem();    }        return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME read_from_advbuf_and_adjust#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)inline static int read_from_advbuf_and_adjust(MPIDI_VC_t* vc, int stream, int amount,				       char* src, MPID_Request* rreq) {    int mpi_errno = MPI_SUCCESS;    MPID_IOV* iovp = rreq->dev.iov;    int nb = 0;    int done = FALSE;    int complete = FALSE;    char *src_ptr;    if(rreq) {	nb = readv_from_advbuf(rreq, 			       src, amount);	done = adjust_req(rreq, nb);		if(done) {            #if 1            int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);            reqFn = rreq->dev.OnDataAvail;            if (!reqFn) {                MPIDI_CH3U_Request_complete(rreq);                complete = TRUE;            }            else {                /* fyi reqFn is MPIDI_CH3_ReqHandler_ReloadIOV with truncated messages */                mpi_errno = reqFn( vc, rreq, &complete );                if (mpi_errno) MPIU_ERR_POP(mpi_errno);            }#else	    mpi_errno = MPIDI_CH3U_Handle_recv_req(vc, rreq, &complete);	    if (mpi_errno != MPI_SUCCESS) {		MPIU_ERR_POP(mpi_errno);	    }#endif	    if(!complete)            {                /* more data to be read (e.g. truncation) */                                /* this is designed to work with truncation, but is it general enough? */                /* since the excess data is already in the advbuf, is this step necessary                 *  for SCTP?  these steps (above and below) allocate a tmp buffer and set                 *  it equal to the req's iov; this tmp buffer is used merely to get the                 *  excess data off of the internal buffers (or the kernel socket receive                 *  buffer, in SCTP's case).  the thing is, this is already done when                 *  reading TO the advbuf, so this process of tmp buf allocation and copying                 *  might be rework...  still, for now it's in there just to model after the                 *  ch3:sock code.                 */                src_ptr = src;                src_ptr += nb;		/* need to reset, because preceeding adjust_iov changed it */		iovp = rreq->dev.iov;                nb = readv_from_advbuf(rreq,			       src_ptr, amount - nb);		done = adjust_req(rreq, nb);                                /* the remaining code is kinda recursive... */                if(done) {            #if 1                    reqFn = rreq->dev.OnDataAvail;                    if (!reqFn) {                        MPIDI_CH3U_Request_complete(rreq);                        complete = TRUE;                    }                    else {                        mpi_errno = reqFn( vc, rreq, &complete );                        if (mpi_errno) MPIU_ERR_POP(mpi_errno);                    }#else                    mpi_errno = MPIDI_CH3U_Handle_recv_req(vc, rreq, &complete);                    if (mpi_errno != MPI_SUCCESS) {                        MPIU_ERR_POP(mpi_errno);                    }#endif                }            }	                if(complete)            {                RECV_ACTIVE(vc, stream) = NULL;            }	}        if(!complete) {	    RECV_ACTIVE(vc, stream) = rreq;	}    } fn_exit:    return mpi_errno; fn_fail:    goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Channel_close#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)int MPIDI_CH3_Channel_close( void ){    /* When called, Outstanding_close_ops in ch3u_handle_connection should be zero */    /* WARNING! : Outstanding_close_ops can be zero prematurely if MPI_Comm_disconnect     *              is called.     */    int mpi_errno = MPI_SUCCESS;    /*  is this code dated now that close is moved to MPIDI_CH3I_Progress_finalize for the     *   one-to-many socket and OP_CLOSE in the event handler?     */           /* still have items in the sendQ so handle them before close */        while(sendq_total)  /* FIXME might need to be more sophisticated with multiple fd's */                        /*    For example, if the sendq_total is non-zero, we could have                         *    writes outstanding on multiple fd's (the "normal" one and                         *    the tmp one used for dynamic procs)                         */    {        int mpi_errno = MPI_SUCCESS;        MPIDU_Sctp_event_t event2;        mpi_errno = MPIDU_Sctp_wait(MPIDI_CH3I_onetomany_fd, MPIDU_SCTP_INFINITE_TIME,                                    &event2);        if (mpi_errno != MPI_SUCCESS)        {            MPIU_Assert(MPIR_ERR_GET_CLASS(mpi_errno) != MPIDU_SOCK_ERR_TIMEOUT);            MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER,"**progress_sock_wait");            goto fn_fail;        }        mpi_errno = MPIDI_CH3I_Progress_handle_sctp_event(&event2);        if (mpi_errno != MPI_SUCCESS) {            MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,                                            "**ch3|sock|handle_sock_event");        }    }                    fn_exit:    return mpi_errno; fn_fail:    goto fn_exit;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -