⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3_shm.c

📁 mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环境
💻 C
📖 第 1 页 / 共 4 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include <stdio.h>/*#undef USE_IOV_LEN_2_SHORTCUT*/#define USE_IOV_LEN_2_SHORTCUT#define SHM_READING_BIT     0x0008#ifndef min#define min(a, b) ((a) < (b) ? (a) : (b))#endif/* shmem functions */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_write#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_write(MPIDI_VC_t * vc, void *buf, int len, int *num_bytes_ptr){    int total = 0;    int length;    int index;    MPIDI_CH3I_SHM_Queue_t * writeq;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITE);    MPIDI_STATE_DECL(MPID_STATE_MEMCPY);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITE);    MPIDI_DBG_PRINTF((60, FCNAME, "entering"));    writeq = vc->ch.write_shmq;    index = writeq->tail_index;    if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)    {	*num_bytes_ptr = total;	MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));	MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE);	return MPI_SUCCESS;    }        while (len)    {	length = min(len, MPIDI_CH3I_PACKET_SIZE);	/*writeq->packet[index].offset = 0; the reader guarantees this is reset to zero */	writeq->packet[index].num_bytes = length;	MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	memcpy(writeq->packet[index].data, buf, length);	MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	MPID_WRITE_BARRIER();	writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;	buf = (char *) buf + length;	total += length;	len -= length;	index = (index + 1) % MPIDI_CH3I_NUM_PACKETS;	if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)	{	    writeq->tail_index = index;	    *num_bytes_ptr = total;	    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));	    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE);	    return MPI_SUCCESS;	}    }    writeq->tail_index = index;    *num_bytes_ptr = total;    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE);    return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_writev(MPIDI_VC_t *vc, MPID_IOV *iov, int n, int *num_bytes_ptr){#ifdef MPICH_DBG_OUTPUT    int mpi_errno;#endif    int i;    unsigned int total = 0;    unsigned int num_bytes=0;    unsigned int cur_avail, dest_avail;    unsigned char *cur_pos, *dest_pos;    int index;    MPIDI_CH3I_SHM_Queue_t * writeq;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);    MPIDI_STATE_DECL(MPID_STATE_MEMCPY);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);    MPIDI_DBG_PRINTF((60, FCNAME, "entering"));    writeq = vc->ch.write_shmq;    index = writeq->tail_index;    if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)    {	*num_bytes_ptr = 0;	MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));	MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);	return MPI_SUCCESS;    }    MPIU_DBG_PRINTF(("writing to write_shmq %p\n", writeq));#ifdef USE_IOV_LEN_2_SHORTCUT    if (n == 2 && (iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN) < MPIDI_CH3I_PACKET_SIZE)    {	MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN, writeq, index));	MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	memcpy(writeq->packet[index].data, 	       iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);	MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	memcpy(&writeq->packet[index].data[iov[0].MPID_IOV_LEN],	       iov[1].MPID_IOV_BUF, iov[1].MPID_IOV_LEN);	MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	writeq->packet[index].num_bytes = 	    iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN;	total = writeq->packet[index].num_bytes;	MPID_WRITE_BARRIER();	writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;#ifdef MPICH_DBG_OUTPUT	/*MPIU_Assert(index == writeq->tail_index);*/	if (index != writeq->tail_index)	{	    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**shmq_index", "**shmq_index %d %d", index, writeq->tail_index);	    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);	    return mpi_errno;	}#endif	writeq->tail_index =	    (writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS;	MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", writeq->tail_index));	*num_bytes_ptr = total;	MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));	MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);	return MPI_SUCCESS;    }#endif    dest_pos = writeq->packet[index].data;    dest_avail = MPIDI_CH3I_PACKET_SIZE;    writeq->packet[index].num_bytes = 0;    for (i=0; i<n; i++)    {	if (iov[i].MPID_IOV_LEN <= dest_avail)	{	    total += iov[i].MPID_IOV_LEN;	    writeq->packet[index].num_bytes += iov[i].MPID_IOV_LEN;	    dest_avail -= iov[i].MPID_IOV_LEN;	    MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", iov[i].MPID_IOV_LEN, writeq, index));	    MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	    memcpy(dest_pos, iov[i].MPID_IOV_BUF, iov[i].MPID_IOV_LEN);	    MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	    dest_pos += iov[i].MPID_IOV_LEN;	}	else	{	    total += dest_avail;	    writeq->packet[index].num_bytes = MPIDI_CH3I_PACKET_SIZE;	    MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", dest_avail, writeq, index));	    MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	    memcpy(dest_pos, iov[i].MPID_IOV_BUF, dest_avail);	    MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	    MPID_WRITE_BARRIER();	    writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;	    cur_pos = (char *) iov[i].MPID_IOV_BUF + dest_avail;	    cur_avail = iov[i].MPID_IOV_LEN - dest_avail;	    while (cur_avail)	    {		index = writeq->tail_index = 		    (writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS;		MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", writeq->tail_index));		if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)		{		    *num_bytes_ptr = total;		    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));		    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);		    return MPI_SUCCESS;		}		num_bytes = min(cur_avail, MPIDI_CH3I_PACKET_SIZE);		writeq->packet[index].num_bytes = num_bytes;		MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", num_bytes, writeq, index));		MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);		memcpy(writeq->packet[index].data, cur_pos, num_bytes);		MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);		total += num_bytes;		cur_pos += num_bytes;		cur_avail -= num_bytes;		if (cur_avail)		{		    MPID_WRITE_BARRIER();		    writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;		}	    }	    dest_pos = writeq->packet[index].data + num_bytes;	    dest_avail = MPIDI_CH3I_PACKET_SIZE - num_bytes;	}	if (dest_avail == 0)	{	    MPID_WRITE_BARRIER();	    writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;	    index = writeq->tail_index = 		(writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS;	    MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", writeq->tail_index));	    if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)	    {		*num_bytes_ptr = total;		MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));		MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);		return MPI_SUCCESS;	    }	    dest_pos = writeq->packet[index].data;	    dest_avail = MPIDI_CH3I_PACKET_SIZE;	    writeq->packet[index].num_bytes = 0;	}    }    if (dest_avail < MPIDI_CH3I_PACKET_SIZE)    {	MPID_WRITE_BARRIER();	writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;	writeq->tail_index = 	    (writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS;	MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", writeq->tail_index));    }    *num_bytes_ptr = total;    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);    return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_read#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_read(MPIDI_VC_t * recv_vc_ptr, void *buf, int len, int *num_bytes_ptr){    int mpi_errno = MPI_SUCCESS;    void *mem_ptr;    int num_bytes;    MPIDI_CH3I_SHM_Packet_t *pkt_ptr;    MPIDI_CH3I_SHM_Queue_t *shm_ptr;    register int index;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_READ);    MPIDI_STATE_DECL(MPID_STATE_MEMCPY);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_READ);    shm_ptr = recv_vc_ptr->ch.read_shmq;    if (shm_ptr == NULL)    {	*num_bytes_ptr = 0;	goto fn_exit;    }    index = shm_ptr->head_index;    pkt_ptr = &shm_ptr->packet[index];    /* if the packet at the head index is available, the queue is empty */    if (pkt_ptr->avail == MPIDI_CH3I_PKT_EMPTY)    {	*num_bytes_ptr = 0;	goto fn_exit;    }    MPID_READ_BARRIER(); /* no loads after this line can occur before the avail flag has been read */    MPIU_DBG_PRINTF(("MPIDI_CH3I_SHM_read_progress: reading from queue %p\n", shm_ptr));    mem_ptr = (void*)(pkt_ptr->data + pkt_ptr->offset);    num_bytes = pkt_ptr->num_bytes;    MPIDI_DBG_PRINTF((60, FCNAME, "read %d bytes", num_bytes));    if (num_bytes > len)    {	/* copy the received data */	MPIDI_DBG_PRINTF((60, FCNAME, "reading %d bytes from read_shmq %08p packet[%d]", recv_vc_ptr->ch.read.bufflen, shm_ptr, index));	MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	memcpy(buf, mem_ptr, len);	MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	*num_bytes_ptr = len;	pkt_ptr->offset += len;	pkt_ptr->num_bytes = num_bytes - len;    }    else    {	/* copy the received data */	MPIDI_DBG_PRINTF((60, FCNAME, "reading %d bytes from read_shmq %08p packet[%d]", num_bytes, shm_ptr, index));	MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	memcpy(buf, mem_ptr, num_bytes);	MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	*num_bytes_ptr = num_bytes;	/* put the shmem buffer back in the queue */	pkt_ptr->offset = 0;	MPID_READ_WRITE_BARRIER(); /* the writing of the flag cannot occur before the reading of the last piece of data */	pkt_ptr->avail = MPIDI_CH3I_PKT_EMPTY;#ifdef MPICH_DBG_OUTPUT	/*MPIU_Assert(&shm_ptr->packet[index] == pkt_ptr);*/	if (&shm_ptr->packet[index] != pkt_ptr)	{	    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**pkt_ptr", "**pkt_ptr %p %p", &shm_ptr->packet[index], pkt_ptr);	    goto fn_exit;	}#endif	shm_ptr->head_index = (index + 1) % MPIDI_CH3I_NUM_PACKETS;	MPIDI_DBG_PRINTF((60, FCNAME, "read_shmq head = %d", shm_ptr->head_index));    }fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_READ);    return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_rdma_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_rdma_writev(MPIDI_VC_t *vc, MPID_Request *sreq){#ifdef MPIDI_CH3_CHANNEL_RNDV    int mpi_errno = MPI_SUCCESS;    int i;    char *rbuf, *sbuf;    int rbuf_len, riov_offset;    int sbuf_len;    int len;#ifndef HAVE_WINDOWS_H#define SIZE_T int#endif    SIZE_T num_written;    MPID_IOV *send_iov, *recv_iov;    int send_count, recv_count;    int complete;    MPIDI_CH3_Pkt_t pkt;    MPIDI_CH3_Pkt_rdma_reload_t * reload_pkt = &pkt.reload;    MPID_Request * reload_sreq;#ifndef HAVE_WINDOWS_H    int n, status;    OFF_T uOffset;#endif    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV);    /* save the receiver's request to send back with the reload packet */    reload_pkt->rreq = sreq->dev.rdma_request;    reload_pkt->sreq = sreq->handle;#ifndef HAVE_WINDOWS_H    if (ptrace(PTRACE_ATTACH, vc->ch.nSharedProcessID, 0, 0) != 0)    {	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ptrace attach failed", errno);	MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV);	return mpi_errno;    }    if (waitpid(vc->ch.nSharedProcessID, &status, WUNTRACED) != vc->ch.nSharedProcessID)    {	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "waitpid failed", errno);	MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV);	return mpi_errno;    }#endif    for (;;)    {	send_iov = sreq->dev.iov;	send_count = sreq->dev.iov_count;	recv_iov = sreq->dev.rdma_iov;	recv_count = sreq->dev.rdma_iov_count;	/*	printf("shm_rdma: writing %d send buffers into %d recv buffers.\n", send_count, recv_count);fflush(stdout);	for (i=0; i<send_count; i++)	{	    printf("shm_rdma: send buf[%d] = %p, len = %d\n", i, send_iov[i].MPID_IOV_BUF, send_iov[i].MPID_IOV_LEN);	}	for (i=0; i<recv_count; i++)	{	    printf("shm_rdma: recv buf[%d] = %p, len = %d\n", i, recv_iov[i].MPID_IOV_BUF, recv_iov[i].MPID_IOV_LEN);	}	fflush(stdout);	*/	rbuf = recv_iov[0].MPID_IOV_BUF;	rbuf_len = recv_iov[0].MPID_IOV_LEN;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -