⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3_shm.c

📁 mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环境
💻 C
📖 第 1 页 / 共 4 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include <stdio.h>/*#undef USE_IOV_LEN_2_SHORTCUT*/#define USE_IOV_LEN_2_SHORTCUT#define SHM_READING_BIT     0x0008#ifndef min#define min(a, b) ((a) < (b) ? (a) : (b))#endif/* shmem functions */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_write#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_write(MPIDI_VC_t * vc, void *buf, int len, int *num_bytes_ptr){    int total = 0;    int length;    int index;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITE);    MPIDI_STATE_DECL(MPID_STATE_MEMCPY);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITE);    MPIDI_DBG_PRINTF((60, FCNAME, "entering"));    index = vc->ch.write_shmq->tail_index;    if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)    {	*num_bytes_ptr = total;	MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));	MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE);	return MPI_SUCCESS;    }        while (len)    {	length = min(len, MPIDI_CH3I_PACKET_SIZE);	/*vc->ch.write_shmq->packet[index].offset = 0; the reader guarantees this is reset to zero */	vc->ch.write_shmq->packet[index].num_bytes = length;	MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	memcpy(vc->ch.write_shmq->packet[index].data, buf, length);	MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	MPID_WRITE_BARRIER();	vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;	buf = (char *) buf + length;	total += length;	len -= length;	index = (index + 1) % MPIDI_CH3I_NUM_PACKETS;	if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)	{	    vc->ch.write_shmq->tail_index = index;	    *num_bytes_ptr = total;	    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));	    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE);	    return MPI_SUCCESS;	}    }    vc->ch.write_shmq->tail_index = index;    *num_bytes_ptr = total;    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE);    return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_writev(MPIDI_VC_t *vc, MPID_IOV *iov, int n, int *num_bytes_ptr){#ifdef MPICH_DBG_OUTPUT    int mpi_errno;#endif    int i;    unsigned int total = 0;    unsigned int num_bytes= 0;    unsigned int cur_avail, dest_avail;    unsigned char *cur_pos, *dest_pos;    int index;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);    MPIDI_STATE_DECL(MPID_STATE_MEMCPY);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);    MPIDI_DBG_PRINTF((60, FCNAME, "entering"));    index = vc->ch.write_shmq->tail_index;    if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)    {	*num_bytes_ptr = 0;	MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));	MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);	return MPI_SUCCESS;    }    MPIU_DBG_PRINTF(("writing to write_shmq %p\n", vc->ch.write_shmq));#ifdef USE_IOV_LEN_2_SHORTCUT    if (n == 2 && (iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN) < MPIDI_CH3I_PACKET_SIZE)    {	MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN, vc->ch.write_shmq, index));	MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	memcpy(vc->ch.write_shmq->packet[index].data, 	       iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);	MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	memcpy(&vc->ch.write_shmq->packet[index].data[iov[0].MPID_IOV_LEN],	       iov[1].MPID_IOV_BUF, iov[1].MPID_IOV_LEN);	MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	vc->ch.write_shmq->packet[index].num_bytes = 	    iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN;	total = vc->ch.write_shmq->packet[index].num_bytes;	MPID_WRITE_BARRIER();	vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;#ifdef MPICH_DBG_OUTPUT	/*MPIU_Assert(index == vc->ch.write_shmq->tail_index);*/	if (index != vc->ch.write_shmq->tail_index)	{	    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**shmq_index", "**shmq_index %d %d", index, vc->ch.write_shmq->tail_index);	    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);	    return mpi_errno;	}#endif	vc->ch.write_shmq->tail_index =	    (vc->ch.write_shmq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS;	MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", vc->ch.write_shmq->tail_index));	*num_bytes_ptr = total;	MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));	MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);	return MPI_SUCCESS;    }#endif    dest_pos = (unsigned char *)(vc->ch.write_shmq->packet[index].data);    dest_avail = MPIDI_CH3I_PACKET_SIZE;    vc->ch.write_shmq->packet[index].num_bytes = 0;    for (i=0; i<n; i++)    {	if (iov[i].MPID_IOV_LEN <= dest_avail)	{	    total += iov[i].MPID_IOV_LEN;	    vc->ch.write_shmq->packet[index].num_bytes += iov[i].MPID_IOV_LEN;	    dest_avail -= iov[i].MPID_IOV_LEN;	    MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", iov[i].MPID_IOV_LEN, vc->ch.write_shmq, index));	    MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	    memcpy(dest_pos, iov[i].MPID_IOV_BUF, iov[i].MPID_IOV_LEN);	    MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	    dest_pos += iov[i].MPID_IOV_LEN;	}	else	{	    total += dest_avail;	    vc->ch.write_shmq->packet[index].num_bytes = MPIDI_CH3I_PACKET_SIZE;	    MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", dest_avail, vc->ch.write_shmq, index));	    MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	    memcpy(dest_pos, iov[i].MPID_IOV_BUF, dest_avail);	    MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	    MPID_WRITE_BARRIER();	    vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;	    cur_pos = (unsigned char*)iov[i].MPID_IOV_BUF + dest_avail;	    cur_avail = iov[i].MPID_IOV_LEN - dest_avail;	    while (cur_avail)	    {		index = vc->ch.write_shmq->tail_index = 		    (vc->ch.write_shmq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS;		MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", vc->ch.write_shmq->tail_index));		if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)		{		    *num_bytes_ptr = total;		    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));		    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);		    return MPI_SUCCESS;		}		num_bytes = min(cur_avail, MPIDI_CH3I_PACKET_SIZE);		vc->ch.write_shmq->packet[index].num_bytes = num_bytes;		MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", num_bytes, vc->ch.write_shmq, index));		MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);		memcpy(vc->ch.write_shmq->packet[index].data, cur_pos, num_bytes);		MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);		total += num_bytes;		cur_pos += num_bytes;		cur_avail -= num_bytes;		if (cur_avail)		{		    MPID_WRITE_BARRIER();		    vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;		}	    }	    dest_pos = (unsigned char *)(vc->ch.write_shmq->packet[index].data) + num_bytes;	    dest_avail = MPIDI_CH3I_PACKET_SIZE - num_bytes;	}	if (dest_avail == 0)	{	    MPID_WRITE_BARRIER();	    vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;	    index = vc->ch.write_shmq->tail_index = 		(vc->ch.write_shmq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS;	    MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", vc->ch.write_shmq->tail_index));	    if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)	    {		*num_bytes_ptr = total;		MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));		MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);		return MPI_SUCCESS;	    }	    dest_pos = (unsigned char *)(vc->ch.write_shmq->packet[index].data);	    dest_avail = MPIDI_CH3I_PACKET_SIZE;	    vc->ch.write_shmq->packet[index].num_bytes = 0;	}    }    if (dest_avail < MPIDI_CH3I_PACKET_SIZE)    {	MPID_WRITE_BARRIER();	vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;	vc->ch.write_shmq->tail_index = 	    (vc->ch.write_shmq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS;	MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", vc->ch.write_shmq->tail_index));    }    *num_bytes_ptr = total;    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);    return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_rdma_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_rdma_writev(MPIDI_VC_t *vc, MPID_Request *sreq){#ifdef MPIDI_CH3_CHANNEL_RNDV    int mpi_errno = MPI_SUCCESS;    int i;    char *rbuf, *sbuf;    int rbuf_len, riov_offset;    int sbuf_len;    int len;#ifndef HAVE_WINDOWS_H#define SIZE_T int#endif    SIZE_T num_written;    MPID_IOV *send_iov, *recv_iov;    int send_count, recv_count;    int complete;    MPIDI_CH3_Pkt_t pkt;    MPIDI_CH3_Pkt_rdma_reload_t * reload_pkt = &pkt.reload;    MPID_Request * reload_sreq;#ifndef HAVE_WINDOWS_H    int n, status;    OFF_T uOffset;#endif    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV);    /* save the receiver's request to send back with the reload packet */    reload_pkt->rreq = sreq->dev.rdma_request;    reload_pkt->sreq = sreq->handle;#ifndef HAVE_WINDOWS_H    if (ptrace(PTRACE_ATTACH, vc->ch.nSharedProcessID, 0, 0) != 0)    {	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ptrace attach failed", errno);	MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV);	return mpi_errno;    }    if (waitpid(vc->ch.nSharedProcessID, &status, WUNTRACED) != vc->ch.nSharedProcessID)    {	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "waitpid failed", errno);	MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV);	return mpi_errno;    }#endif    for (;;)    {	send_iov = sreq->dev.iov;	send_count = sreq->dev.iov_count;	recv_iov = sreq->dev.rdma_iov;	recv_count = sreq->dev.rdma_iov_count;	/*	printf("shm_rdma: writing %d send buffers into %d recv buffers.\n", send_count, recv_count);fflush(stdout);	for (i=0; i<send_count; i++)	{	    printf("shm_rdma: send buf[%d] = %p, len = %d\n", i, send_iov[i].MPID_IOV_BUF, send_iov[i].MPID_IOV_LEN);	}	for (i=0; i<recv_count; i++)	{	    printf("shm_rdma: recv buf[%d] = %p, len = %d\n", i, recv_iov[i].MPID_IOV_BUF, recv_iov[i].MPID_IOV_LEN);	}	fflush(stdout);	*/	rbuf = recv_iov[0].MPID_IOV_BUF;	rbuf_len = recv_iov[0].MPID_IOV_LEN;	riov_offset = 0;	for (i=sreq->ch.iov_offset; i<send_count; i++)	{	    sbuf = send_iov[i].MPID_IOV_BUF;	    sbuf_len = send_iov[i].MPID_IOV_LEN;	    while (sbuf_len)	    {		len = MPIDU_MIN(sbuf_len, rbuf_len);		/*printf("writing %d bytes to remote process.\n", len);fflush(stdout);*/#ifdef HAVE_WINDOWS_H		if (!WriteProcessMemory(vc->ch.hSharedProcessHandle, rbuf, sbuf, len, &num_written))		{		    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "WriteProcessMemory failed", GetLastError());		    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV);		    return mpi_errno;		}		if (num_written == -1)		{		    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "WriteProcessMemory returned -1 bytes written");		    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV);		    return mpi_errno;		}#else#ifdef HAVE_PROC_RDMA_WRITE		/* write is not implemented in the /proc device. It is considered a security hole.  You can recompile a Linux		 * kernel with this function enabled and then define HAVE_PROC_RDMA_WRITE and this code will work.		 */#ifdef USE__LLSEEK		n = _llseek(vc->ch.nSharedProcessFileDescriptor, 0, OFF_T_CAST(rbuf), &uOffset, SEEK_SET);#else		uOffset = lseek(vc->ch.nSharedProcessFileDescriptor, OFF_T_CAST(rbuf), SEEK_SET);		n = 0;#endif		if (n != 0 || uOffset != OFF_T_CAST(rbuf))		{		    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "lseek failed", errno);		    ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0);		    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV);		    return mpi_errno;		}		num_written = write(vc->ch.nSharedProcessFileDescriptor, sbuf, len);		if (num_written < 1)		{		    if (num_written == -1)		    {			mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "write failed", errno);			ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0);			MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV);			return mpi_errno;		    }		    ptrace(PTRACE_PEEKDATA, vc->ch.nSharedProcessID, rbuf + len - num_written, 0);		}#else		/* Do not use this code.  Using PTRACE_POKEDATA for rdma writes gives horrible performance.		 * This code is only provided for correctness to show that the put model will run.		 */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -