⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 shm.c

📁 mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环境
💻 C
📖 第 1 页 / 共 4 页
字号:
				}			    }			}			else			{			    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);			    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);			    return mpi_errno;			}			recv_vc_ptr->ch.recv_active = NULL;			recv_vc_ptr->ch.shm_reading_pkt = TRUE;			if (num_bytes > sizeof(MPIDI_CH3_Pkt_t))			{			    pkt_ptr->offset += sizeof(MPIDI_CH3_Pkt_t);			    num_bytes -= sizeof(MPIDI_CH3_Pkt_t);			    pkt_ptr->num_bytes = num_bytes;			    mem_ptr = (char*)mem_ptr + sizeof(MPIDI_CH3_Pkt_t);			}			else			{			    pkt_ptr->offset = 0;			    MPID_READ_WRITE_BARRIER(); /* the writing of the flag cannot occur before the reading of the last piece of data */			    pkt_ptr->avail = MPIDI_CH3I_PKT_AVAILABLE;			    vc->ch.shm[i].head_index = (index + 1) % MPIDI_CH3I_NUM_PACKETS;			}			/* return from the wait */			*num_bytes_ptr = 0;			*vc_pptr = recv_vc_ptr;			*shm_out = SHM_WAIT_WAKEUP;			MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);			return MPI_SUCCESS;		    }		    else		    {			mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "shared memory read progress unable to handle unknown rdma packet");			MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);			return mpi_errno;		    }		}		else#endif /* MPIDI_CH3_CHANNEL_RNDV */		{		    mpi_errno = MPIDI_CH3U_Handle_recv_pkt(recv_vc_ptr, (MPIDI_CH3_Pkt_t*)mem_ptr, &recv_vc_ptr->ch.recv_active);		    if (mpi_errno != MPI_SUCCESS)		    {			mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "shared memory read progress unable to handle incoming packet");			MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);			return mpi_errno;		    }		}		if (recv_vc_ptr->ch.recv_active == NULL)		{		    recv_vc_ptr->ch.shm_reading_pkt = TRUE;		}		else		{		    mpi_errno = MPIDI_CH3I_SHM_post_readv(recv_vc_ptr, recv_vc_ptr->ch.recv_active->dev.iov, recv_vc_ptr->ch.recv_active->dev.iov_count, NULL);		}		if (num_bytes > sizeof(MPIDI_CH3_Pkt_t))		{		    pkt_ptr->offset += sizeof(MPIDI_CH3_Pkt_t);		    num_bytes -= sizeof(MPIDI_CH3_Pkt_t);		    pkt_ptr->num_bytes = num_bytes;		    mem_ptr = (char*)mem_ptr + sizeof(MPIDI_CH3_Pkt_t);		}		else		{		    pkt_ptr->offset = 0;		    MPID_READ_WRITE_BARRIER(); /* the writing of the flag cannot occur before the reading of the last piece of data */		    pkt_ptr->avail = MPIDI_CH3I_PKT_AVAILABLE;		    vc->ch.shm[i].head_index = (index + 1) % MPIDI_CH3I_NUM_PACKETS;		    continue;		}		if (recv_vc_ptr->ch.recv_active == NULL)		    continue;	    }	    MPIDI_DBG_PRINTF((60, FCNAME, "read %d bytes\n", num_bytes));	    /*MPIDI_DBG_PRINTF((60, FCNAME, "shm_wait(recv finished %d bytes)", num_bytes));*/	    if (!(recv_vc_ptr->ch.shm_state & SHM_READING_BIT))	    {#ifdef USE_SHM_UNEX		/* Should we buffer unexpected messages or leave them in the shmem queue? */		/*shmi_buffer_unex_read(recv_vc_ptr, pkt_ptr, mem_ptr, 0, num_bytes);*/#endif		continue;	    }	    MPIDI_DBG_PRINTF((60, FCNAME, "read update, total = %d + %d = %d\n", recv_vc_ptr->ch.read.total, num_bytes, recv_vc_ptr->ch.read.total + num_bytes));	    if (recv_vc_ptr->ch.read.use_iov)	    {		iter_ptr = mem_ptr;		while (num_bytes && recv_vc_ptr->ch.read.iovlen > 0)		{		    if ((int)recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_LEN <= num_bytes)		    {			/* copy the received data */			MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);			memcpy(recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_BUF, iter_ptr,			    recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_LEN);			MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);			MPIU_DBG_PRINTF(("a:shm_read_progress: %d bytes read from packet %d offset %d\n",			    recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_LEN, index,			    pkt_ptr->offset + (int)((char*)iter_ptr - (char*)mem_ptr)));			iter_ptr += recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_LEN;			/* update the iov */			num_bytes -= recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_LEN;			recv_vc_ptr->ch.read.index++;			recv_vc_ptr->ch.read.iovlen--;		    }		    else		    {			/* copy the received data */			MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);			memcpy(recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_BUF, iter_ptr, num_bytes);			MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);			MPIU_DBG_PRINTF(("b:shm_read_progress: %d bytes read from packet %d offset %d\n", num_bytes, index,			    pkt_ptr->offset + (int)((char*)iter_ptr - (char*)mem_ptr)));			iter_ptr += num_bytes;			/* update the iov */			recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_LEN -= num_bytes;			recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)(			    (char*)(recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_BUF) + num_bytes);			num_bytes = 0;		    }		}		offset = (unsigned char*)iter_ptr - (unsigned char*)mem_ptr;		recv_vc_ptr->ch.read.total += offset;		if (num_bytes == 0)		{		    /* put the shm buffer back in the queue */		    vc->ch.shm[i].packet[index].offset = 0;		    MPID_READ_WRITE_BARRIER(); /* the writing of the flag cannot occur before the reading of the last piece of data */		    vc->ch.shm[i].packet[index].avail = MPIDI_CH3I_PKT_AVAILABLE;		    vc->ch.shm[i].head_index = (index + 1) % MPIDI_CH3I_NUM_PACKETS;		}		else		{		    /* save the unused but received data */		    /*shmi_buffer_unex_read(recv_vc_ptr, pkt_ptr, mem_ptr, offset, num_bytes);*/		    /* OR */		    /* update the head of the shmem queue */		    pkt_ptr->offset += (pkt_ptr->num_bytes - num_bytes);		    pkt_ptr->num_bytes = num_bytes;		}		if (recv_vc_ptr->ch.read.iovlen == 0)		{		    if (recv_vc_ptr->ch.recv_active->kind < MPID_LAST_REQUEST_KIND)		    {			recv_vc_ptr->ch.shm_state &= ~SHM_READING_BIT;			*num_bytes_ptr = recv_vc_ptr->ch.read.total;			*vc_pptr = recv_vc_ptr;			*shm_out = SHM_WAIT_READ;			MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);			return MPI_SUCCESS;		    }#ifdef MPIDI_CH3_CHANNEL_RNDV		    else if (recv_vc_ptr->ch.recv_active->kind == MPIDI_CH3I_RTS_IOV_READ_REQUEST)		    {			int found;			/*printf("received rts iov_read.\n");fflush(stdout);*/			mpi_errno = MPIDI_CH3U_Handle_recv_rndv_pkt(recv_vc_ptr,								    &recv_vc_ptr->ch.recv_active->ch.pkt,								    &rreq, &found);			/* --BEGIN ERROR HANDLING-- */			if (mpi_errno != MPI_SUCCESS)			{			    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "shared memory read progress unable to handle incoming rts(get) packet");			    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);			    return mpi_errno;			}			/* --END ERROR HANDLING-- */			for (i=0; i<recv_vc_ptr->ch.recv_active->dev.rdma_iov_count; i++)			{			    rreq->dev.rdma_iov[i].MPID_IOV_BUF = recv_vc_ptr->ch.recv_active->dev.rdma_iov[i].MPID_IOV_BUF;			    rreq->dev.rdma_iov[i].MPID_IOV_LEN = recv_vc_ptr->ch.recv_active->dev.rdma_iov[i].MPID_IOV_LEN;			}			rreq->dev.rdma_iov_count = recv_vc_ptr->ch.recv_active->dev.rdma_iov_count;			if (found)			{			    mpi_errno = MPIDI_CH3U_Post_data_receive(TRUE, &rreq);			    /* --BEGIN ERROR HANDLING-- */			    if (mpi_errno != MPI_SUCCESS)			    {				mpi_errno = MPIR_Err_create_code (mpi_errno, MPIR_ERR_FATAL,				    FCNAME, __LINE__,				    MPI_ERR_OTHER,				    "**ch3|postrecv",				    "**ch3|postrecv %s",				    "MPIDI_CH3_PKT_RNDV_REQ_TO_SEND");				MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);				return mpi_errno;			    }			    /* --END ERROR HANDLING-- */			    mpi_errno = MPIDI_CH3_iStartRndvTransfer(recv_vc_ptr, rreq);			    /* --BEGIN ERROR HANDLING-- */			    if (mpi_errno != MPI_SUCCESS)			    {				mpi_errno = MPIR_Err_create_code (mpi_errno, MPIR_ERR_FATAL,				    FCNAME, __LINE__,				    MPI_ERR_OTHER,				    "**ch3|ctspkt", 0);				MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);				return mpi_errno;			    }			    /* --END ERROR HANDLING-- */			}			rreq = recv_vc_ptr->ch.recv_active;			/* free the request used to receive the rts packet and iov data */			MPIU_Object_set_ref(rreq, 0);			MPIDI_CH3_Request_destroy(rreq);			recv_vc_ptr->ch.recv_active = NULL;			recv_vc_ptr->ch.shm_reading_pkt = TRUE;		    }		    else if (recv_vc_ptr->ch.recv_active->kind == MPIDI_CH3I_IOV_READ_REQUEST)		    {			/*printf("received iov_read.\n");fflush(stdout);*/			rreq = recv_vc_ptr->ch.recv_active;			mpi_errno = MPIDI_CH3_iStartRndvTransfer(recv_vc_ptr, rreq->ch.req);			/* --BEGIN ERROR HANDLING-- */			if (mpi_errno != MPI_SUCCESS)			{			    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "shared memory read progress unable to handle incoming rts(get) iov");			    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);			    return mpi_errno;			}			/* --END ERROR HANDLING-- */			recv_vc_ptr->ch.recv_active = NULL;			recv_vc_ptr->ch.shm_reading_pkt = TRUE;			/* free the request used to receive the iov data */			MPIU_Object_set_ref(rreq, 0);			MPIDI_CH3_Request_destroy(rreq);		    }		    else if (recv_vc_ptr->ch.recv_active->kind == MPIDI_CH3I_IOV_WRITE_REQUEST)		    {			/*printf("received iov_write.\n");fflush(stdout);*/			mpi_errno = MPIDI_CH3I_SHM_rdma_writev(recv_vc_ptr, recv_vc_ptr->ch.recv_active->ch.req);			/* --BEGIN ERROR HANDLING-- */			if (mpi_errno != MPI_SUCCESS)			{			    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);			    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);			    return mpi_errno;			}			/* --END ERROR HANDLING-- */			/* return from the wait */			MPID_Request_release(recv_vc_ptr->ch.recv_active);			recv_vc_ptr->ch.recv_active = NULL;			recv_vc_ptr->ch.shm_reading_pkt = TRUE;			*num_bytes_ptr = 0;			*vc_pptr = recv_vc_ptr;			*shm_out = SHM_WAIT_WAKEUP;			MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);			return MPI_SUCCESS;		    }#endif /* MPIDI_CH3_CHANNEL_RNDV */		    else		    {			mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "invalid request type", recv_vc_ptr->ch.recv_active->kind);			MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);			return mpi_errno;		    }		}	    }	    else	    {		if ((unsigned int)num_bytes > recv_vc_ptr->ch.read.bufflen)		{		    /* copy the received data */		    MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);		    memcpy(recv_vc_ptr->ch.read.buffer, mem_ptr, recv_vc_ptr->ch.read.bufflen);		    MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);		    MPIU_DBG_PRINTF(("c:shm_read_progress: %d bytes read from packet %d offset %d\n", recv_vc_ptr->ch.read.bufflen, index, pkt_ptr->offset));		    recv_vc_ptr->ch.read.total = recv_vc_ptr->ch.read.bufflen;		    /*shmi_buffer_unex_read(recv_vc_ptr, pkt_ptr, mem_ptr, recv_vc_ptr->ch.read.bufflen, num_bytes - recv_vc_ptr->ch.read.bufflen);*/		    pkt_ptr->offset += recv_vc_ptr->ch.read.bufflen;		    pkt_ptr->num_bytes = num_bytes - recv_vc_ptr->ch.read.bufflen;		    recv_vc_ptr->ch.read.bufflen = 0;		}		else		{		    /* copy the received data */		    MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);		    memcpy(recv_vc_ptr->ch.read.buffer, mem_ptr, num_bytes);		    MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);		    MPIU_DBG_PRINTF(("d:shm_read_progress: %d bytes read from packet %d offset %d\n", num_bytes, index, pkt_ptr->offset));		    recv_vc_ptr->ch.read.total += num_bytes;		    /* advance the user pointer */		    recv_vc_ptr->ch.read.buffer = (char*)(recv_vc_ptr->ch.read.buffer) + num_bytes;		    recv_vc_ptr->ch.read.bufflen -= num_bytes;		    /* put the shm buffer back in the queue */		    vc->ch.shm[i].packet[index].offset = 0;		    MPID_READ_WRITE_BARRIER(); /* the writing of the flag cannot occur before the reading of the last piece of data */		    vc->ch.shm[i].packet[index].avail = MPIDI_CH3I_PKT_AVAILABLE;		    vc->ch.shm[i].head_index = (index + 1) % MPIDI_CH3I_NUM_PACKETS;		}		if (recv_vc_ptr->ch.read.bufflen == 0)		{		    MPIU_Assert(recv_vc_ptr->ch.recv_active->kind < MPID_LAST_REQUEST_KIND);		    recv_vc_ptr->ch.shm_state &= ~SHM_READING_BIT;		    *num_bytes_ptr = recv_vc_ptr->ch.read.total;		    *vc_pptr = recv_vc_ptr;		    *shm_out = SHM_WAIT_READ;		    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);		    return MPI_SUCCESS;		}	    }	}	if (millisecond_timeout == 0 && !working)	{	    *shm_out = SHM_WAIT_TIMEOUT;	    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);	    return MPI_SUCCESS;	}    }    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT);    return MPI_SUCCESS;}/* non-blocking functions */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_post_read#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_post_read(MPIDI_VC_t *vc, void *buf, int len, int (*rfn)(int, void*)){    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_POST_READ);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_POST_READ);    MPIDI_DBG_PRINTF((60, FCNAME, "posting a read of %d bytes", len));    vc->ch.read.total = 0;    vc->ch.read.buffer = buf;    vc->ch.read.bufflen = len;    vc->ch.read.use_iov = FALSE;    vc->ch.shm_state |= SHM_READING_BIT;    vc->ch.shm_reading_pkt = FALSE;#ifdef USE_SHM_UNEX    if (vc->ch.unex_list)	shmi_read_unex(vc);#endif    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_POST_READ);    return SHM_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_post_readv#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_post_readv(MPIDI_VC_t *vc, MPID_IOV *iov, int n, int (*rfn)(int, void*)){    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_POST_READV);#ifdef USE_SHM_IOV_COPY    MPIDI_STATE_DECL(MPID_STATE_MEMCPY);#endif    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_POST_READV);    /* FIXME */    /* Remove this stripping code after the updated segment code no longer       produces iov's with empty buffers */    /* strip any trailing empty buffers */    /*    while (n && iov[n-1].MPID_IOV_LEN == 0)	n--;    */    MPIU_Assert(iov[n-1].MPID_IOV_LEN > 0);    vc->ch.read.total = 0;#ifdef USE_SHM_IOV_COPY    /* This isn't necessary if we require the iov to be valid for the duration of the operation */    MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);    memcpy(vc->ch.read.iov, iov, sizeof(MPID_IOV) * n);    MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);#else    vc->ch.read.iov = iov;#endif    vc->ch.read.iovlen = n;    vc->ch.read.index = 0;    vc->ch.read.use_iov = TRUE;    vc->ch.shm_state |= SHM_READING_BIT;    vc->ch.shm_reading_pkt = FALSE;#ifdef USE_SHM_UNEX    if (vc->ch.unex_list)	shmi_readv_unex(vc);#endif#ifdef MPICH_DBG_OUTPUT    {	int i, total=0;	for (i=0; i<n; i++)	{	    total += iov[i].MPID_IOV_LEN;	}	MPIDI_DBG_PRINTF((60, FCNAME, "posting a read of %d bytes.\n", total));    }#endif    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_POST_READV);    return SHM_SUCCESS;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -