⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3_progress_sock.c

📁 mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环境
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "ch3i_progress.h"/* FIXME: This is nowhere set to true.  The name is non-conforming if it is   not static */static int shutting_down = FALSE;static inline void connection_post_send_pkt_and_pgid(MPIDI_CH3I_Connection_t * conn);static inline int connection_post_recv_pkt(MPIDI_CH3I_Connection_t * conn);static inline int connection_post_send_pkt(MPIDI_CH3I_Connection_t * conn);static inline int connection_post_sendq_req(MPIDI_CH3I_Connection_t * conn);static int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb);#undef FUNCNAME#define FUNCNAME connection_post_sendq_req#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int connection_post_sendq_req(MPIDI_CH3I_Connection_t * conn){    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POST_SENDQ_REQ);    MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POST_SENDQ_REQ);    /* post send of next request on the send queue */    conn->send_active = MPIDI_CH3I_SendQ_head(conn->vc); /* MT */    if (conn->send_active != NULL)    {	mpi_errno = MPIDU_Sock_post_writev(conn->sock, conn->send_active->dev.iov, conn->send_active->dev.iov_count, NULL);	if (mpi_errno != MPI_SUCCESS)	{	    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL);	}    }        MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POST_SENDQ_REQ);    return mpi_errno;}#undef FUNCNAME#define FUNCNAME connection_post_send_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int connection_post_send_pkt(MPIDI_CH3I_Connection_t * conn){    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POST_SEND_PKT);    MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POST_SEND_PKT);        mpi_errno = MPIDU_Sock_post_write(conn->sock, &conn->pkt, sizeof(conn->pkt), sizeof(conn->pkt), NULL);    /* --BEGIN ERROR HANDLING-- */    if (mpi_errno != MPI_SUCCESS)    {	mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL);    }    /* --END ERROR HANDLING-- */    MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POST_SEND_PKT);    return mpi_errno;}#undef FUNCNAME#define FUNCNAME connection_post_recv_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int connection_post_recv_pkt(MPIDI_CH3I_Connection_t * conn){    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POST_RECV_PKT);    MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POST_RECV_PKT);    mpi_errno = MPIDU_Sock_post_read(conn->sock, &conn->pkt, sizeof(conn->pkt), sizeof(conn->pkt), NULL);    /* --BEGIN ERROR HANDLING-- */    if (mpi_errno != MPI_SUCCESS)    {	mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL);    }    /* --END ERROR HANDLING-- */    MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POST_RECV_PKT);    return mpi_errno;}#undef FUNCNAME#define FUNCNAME adjust_iov#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb){    MPID_IOV * const iov = *iovp;    const int count = *countp;    int offset = 0;    while (offset < count)    {	if (iov[offset].MPID_IOV_LEN <= nb)	{	    nb -= iov[offset].MPID_IOV_LEN;	    offset++;	}	else	{	    iov[offset].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)((char *) iov[offset].MPID_IOV_BUF + nb);	    iov[offset].MPID_IOV_LEN -= nb;	    break;	}    }    *iovp += offset;    *countp -= offset;    return (*countp == 0);}#undef FUNCNAME#define FUNCNAME connection_post_send_pkt_and_pgid#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline void connection_post_send_pkt_and_pgid(MPIDI_CH3I_Connection_t * conn){    int mpi_errno;    MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POST_SEND_PKT_AND_PGID);    MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POST_SEND_PKT_AND_PGID);        conn->iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) &conn->pkt;    conn->iov[0].MPID_IOV_LEN = (int) sizeof(conn->pkt);    conn->iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) MPIDI_Process.my_pg->id;    conn->iov[1].MPID_IOV_LEN = (int) strlen(MPIDI_Process.my_pg->id) + 1;    mpi_errno = MPIDU_Sock_post_writev(conn->sock, conn->iov, 2, NULL);    /* --BEGIN ERROR HANDLING-- */    if (mpi_errno != MPI_SUCCESS)    {	mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL);    }    /* --END ERROR HANDLING-- */    MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POST_SEND_PKT_AND_PGID);}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_handle_sock_event#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_handle_sock_event(MPIDU_Sock_event_t * event){    int complete;    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT);    switch (event->op_type)    {	case MPIDU_SOCK_OP_READ:	{	    MPIDI_CH3I_Connection_t * conn = (MPIDI_CH3I_Connection_t *) event->user_ptr;			    MPID_Request * rreq = conn->recv_active;	    /* --BEGIN ERROR HANDLING-- */	    if (event->error != MPI_SUCCESS)	    {		/* FIXME: the following should be handled by the close protocol */		if (!shutting_down || MPIR_ERR_GET_CLASS(event->error) != MPIDU_SOCK_ERR_CONN_CLOSED)		{		    mpi_errno = MPIR_Err_create_code(event->error, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL);		    goto fn_exit;		}		    		break;	    }	    /* --END ERROR HANDLING-- */	    if (conn->state == CONN_STATE_CONNECTED)	    {		if (conn->recv_active == NULL)		{		    MPIU_Assert(conn->pkt.type < MPIDI_CH3_PKT_END_CH3);					    mpi_errno = MPIDI_CH3U_Handle_recv_pkt(conn->vc, &conn->pkt, &rreq);		    /* --BEGIN ERROR HANDLING-- */		    if (mpi_errno != MPI_SUCCESS)		    {			mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER,							 "**fail", NULL);			goto fn_exit;		    }		    /* --END ERROR HANDLING-- */		    if (rreq == NULL)		    {			if (conn->state != CONN_STATE_CLOSING)			{			    /* conn->recv_active = NULL;  -- already set to NULL */			    mpi_errno = connection_post_recv_pkt(conn);			    /* --BEGIN ERROR HANDLING-- */			    if (mpi_errno != MPI_SUCCESS)			    {				mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_INTERN,				    "**fail", NULL);				goto fn_exit;			    }			    /* --END ERROR HANDLING-- */			}		    }		    else		    {			for(;;)			{			    MPID_IOV * iovp;			    MPIU_Size_t nb;							    iovp = rreq->dev.iov;			    			    mpi_errno = MPIDU_Sock_readv(conn->sock, iovp, rreq->dev.iov_count, &nb);			    /* --BEGIN ERROR HANDLING-- */			    if (mpi_errno != MPI_SUCCESS)			    {				mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER,								 "**ch3|sock|immedread", "ch3|sock|immedread %p %p %p",								 rreq, conn, conn->vc);				goto fn_exit;			    }			    /* --END ERROR HANDLING-- */			    MPIDI_DBG_PRINTF((55, FCNAME, "immediate readv, vc=0x%p nb=%d, rreq=0x%08x",					      conn->vc, rreq->handle, nb));			    if (nb > 0 && adjust_iov(&iovp, &rreq->dev.iov_count, nb))			    {				mpi_errno = MPIDI_CH3U_Handle_recv_req(conn->vc, rreq, &complete);				/* --BEGIN ERROR HANDLING-- */				if (mpi_errno != MPI_SUCCESS)				{				    mpi_errno = MPIR_Err_create_code(					mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL);				    goto fn_exit;				}				/* --END ERROR HANDLING-- */				if (complete)				{				    /* conn->recv_active = NULL; -- already set to NULL */				    mpi_errno = connection_post_recv_pkt(conn);				    /* --BEGIN ERROR HANDLING-- */				    if (mpi_errno != MPI_SUCCESS)				    {					mpi_errno = MPIR_Err_create_code(					    mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_INTERN, "**fail", NULL);					goto fn_exit;				    }				    /* --END ERROR HANDLING-- */				    break;				}			    }			    else			    {				MPIDI_DBG_PRINTF((55, FCNAME, "posting readv, vc=0x%p, rreq=0x%08x", conn->vc, rreq->handle));				conn->recv_active = rreq;				mpi_errno = MPIDU_Sock_post_readv(conn->sock, iovp, rreq->dev.iov_count, NULL);				/* --BEGIN ERROR HANDLING-- */				if (mpi_errno != MPI_SUCCESS)				{				    mpi_errno = MPIR_Err_create_code(					mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postread",					"ch3|sock|postread %p %p %p", rreq, conn, conn->vc);				    goto fn_exit;				}				/* --END ERROR HANDLING-- */				break;			    }			}		    }		}		else /* incoming data */		{		    mpi_errno = MPIDI_CH3U_Handle_recv_req(conn->vc, rreq, &complete);		    /* --BEGIN ERROR HANDLING-- */		    if (mpi_errno != MPI_SUCCESS)		    {			mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER,							 "**fail", NULL);			goto fn_exit;		    }		    /* --END ERROR HANDLING-- */		    if (complete)		    {			conn->recv_active = NULL;			mpi_errno = connection_post_recv_pkt(conn);			/* --BEGIN ERROR HANDLING-- */			if (mpi_errno != MPI_SUCCESS)			{			    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_INTERN,							     "**fail", NULL);			    goto fn_exit;			}			/* --END ERROR HANDLING-- */		    }		    else /* more data to be read */		    {			for(;;)			{			    MPID_IOV * iovp;			    MPIU_Size_t nb;							    iovp = rreq->dev.iov;			    			    mpi_errno = MPIDU_Sock_readv(conn->sock, iovp, rreq->dev.iov_count, &nb);			    /* --BEGIN ERROR HANDLING-- */			    if (mpi_errno != MPI_SUCCESS)			    {				mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER,								 "**ch3|sock|immedread", "ch3|sock|immedread %p %p %p",								 rreq, conn, conn->vc);				goto fn_exit;			    }			    /* --END ERROR HANDLING-- */			    MPIDI_DBG_PRINTF((55, FCNAME, "immediate readv, vc=0x%p nb=%d, rreq=0x%08x",					      conn->vc, rreq->handle, nb));							    if (nb > 0 && adjust_iov(&iovp, &rreq->dev.iov_count, nb))			    {				mpi_errno = MPIDI_CH3U_Handle_recv_req(conn->vc, rreq, &complete);				/* --BEGIN ERROR HANDLING-- */				if (mpi_errno != MPI_SUCCESS)				{				    mpi_errno = MPIR_Err_create_code(					mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL);				    goto fn_exit;				}				/* --END ERROR HANDLING-- */				if (complete)				{				    conn->recv_active = NULL;				    mpi_errno = connection_post_recv_pkt(conn);				    /* --BEGIN ERROR HANDLING-- */				    if (mpi_errno != MPI_SUCCESS)				    {					mpi_errno = MPIR_Err_create_code(					    mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_INTERN, "**fail", NULL);					goto fn_exit;				    }				    /* --END ERROR HANDLING-- */				    break;				}			    }			    else			    {				MPIDI_DBG_PRINTF((55, FCNAME, "posting readv, vc=0x%p, rreq=0x%08x", conn->vc, rreq->handle));				/* conn->recv_active = rreq;  -- already set to current request */				mpi_errno = MPIDU_Sock_post_readv(conn->sock, iovp, rreq->dev.iov_count, NULL);				/* --BEGIN ERROR HANDLING-- */				if (mpi_errno != MPI_SUCCESS)				{				    mpi_errno = MPIR_Err_create_code(					mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postread",					"ch3|sock|postread %p %p %p", rreq, conn, conn->vc);				    goto fn_exit;				}				/* --END ERROR HANDLING-- */				break;			    }			}		    }		}	    }	    else if (conn->state == CONN_STATE_OPEN_LRECV_DATA)	    {		MPIDI_PG_t * pg;		int pg_rank;		MPIDI_VC_t * vc;		/* Look up pg based on conn->pg_id */		mpi_errno = MPIDI_PG_Find(conn->pg_id, &pg);		/* --BEGIN ERROR HANDLING-- */		if (pg == NULL)		{		    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER,						     "**pglookup", "**pglookup %s", conn->pg_id);		    goto fn_exit;		}		/* --END ERROR HANDLING-- */		pg_rank = conn->pkt.sc_open_req.pg_rank;		MPIDI_PG_Get_vc(pg, pg_rank, &vc);		MPIU_Assert(vc->pg_rank == pg_rank);                    		if (vc->ch.conn == NULL)		{		    /* no head-to-head connects, accept the		       connection */		    vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTING;		    vc->ch.sock = conn->sock;		    vc->ch.conn = conn;		    conn->vc = vc;                        		    MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_RESP);		    conn->pkt.sc_open_resp.ack = TRUE;		}		else		{		    /* head to head situation */		    if (pg == MPIDI_Process.my_pg)		    {			/* the other process is in the same comm_world; just compare the ranks */			if (MPIR_Process.comm_world->rank < pg_rank)			{			    /* accept connection */			    vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTING;			    vc->ch.sock = conn->sock;			    vc->ch.conn = conn;			    conn->vc = vc;                                			    MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_RESP);			    conn->pkt.sc_open_resp.ack = TRUE;			}			else			{			    /* refuse connection */			    MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_RESP);			    conn->pkt.sc_open_resp.ack = FALSE;			}		    }		    else		    { 			/* the two processes are in different comm_worlds; compare their unique pg_ids. */			if (strcmp(MPIDI_Process.my_pg->id, pg->id) < 0)			{			    /* accept connection */			    vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTING;			    vc->ch.sock = conn->sock;			    vc->ch.conn = conn;			    conn->vc = vc;                                			    MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_RESP);			    conn->pkt.sc_open_resp.ack = TRUE;			}			else			{			    /* refuse connection */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -