⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3_shm.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include <stdio.h>/* STATES:NO WARNINGS *//*#undef USE_IOV_LEN_2_SHORTCUT*/#define USE_IOV_LEN_2_SHORTCUT#define SHM_READING_BIT     0x0008#define MPIDI_CH3_PKT_RELOAD_SEND 1#define MPIDI_CH3_PKT_RELOAD_RECV 0#ifndef min#define min(a, b) ((a) < (b) ? (a) : (b))#endif/*  * Here are the two choices that we need to make to allow this  * file to be used by several channels. * * First, is it part of a multi-method channel (e.g., ssm) or part of a * single method channel (e.g. shm)? * * Define MPIDI_CH3_SHM_SHARES_PKTARRAY if it is shared * * Second, how are the receive queues arranged?  Are they * in a list (scalable, by active connection) or  * are they in an array of pointers? * * Define MPIDI_CH3_SHM_SCALABLE_READQUEUES if they are in a list *//* shmem functions */#ifdef MPIDI_CH3_SHM_SHARES_PKTARRAYextern MPIDI_CH3_PktHandler_Fcn *MPIDI_pktArray[MPIDI_CH3_PKT_END_CH3+1];#elsestatic MPIDI_CH3_PktHandler_Fcn *MPIDI_pktArray[MPIDI_CH3_PKT_END_CH3+1];int MPIDI_CH3I_SHM_Progress_init(void){    int mpi_errno;    /* Initialize the code to handle incoming packets */    mpi_errno = MPIDI_CH3_PktHandler_Init( MPIDI_pktArray, 					   MPIDI_CH3_PKT_END_CH3+1 );    return mpi_errno;}#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_write#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_write(MPIDI_VC_t * vc, void *buf, int len, int *num_bytes_ptr){    int total = 0;    int length;    int index;    MPIDI_CH3I_SHM_Queue_t * writeq;    MPIDI_CH3I_VC *vcch;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITE);    MPIDI_STATE_DECL(MPID_STATE_MEMCPY);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITE);    vcch = (MPIDI_CH3I_VC *)vc->channel_private;    writeq = vcch->write_shmq;    index = writeq->tail_index;    if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)    {	*num_bytes_ptr = total;	MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE);	return MPI_SUCCESS;    }        while (len)    {	length = min(len, MPIDI_CH3I_PACKET_SIZE);	/*writeq->packet[index].offset = 0; 	  the reader guarantees this is reset to zero */	writeq->packet[index].num_bytes = length;	MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	memcpy(writeq->packet[index].data, buf, length);	MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	MPIU_DBG_PRINTF(("shm_write: %d bytes in packet %d\n", 			 writeq->packet[index].num_bytes, index));	MPID_WRITE_BARRIER();	writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;	buf = (char *) buf + length;	total += length;	len -= length;	index = (index + 1) % MPIDI_CH3I_NUM_PACKETS;	if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)	{	    writeq->tail_index = index;	    *num_bytes_ptr = total;	    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE);	    return MPI_SUCCESS;	}    }    writeq->tail_index = index;    *num_bytes_ptr = total;    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE);    return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_writev(MPIDI_VC_t *vc, MPID_IOV *iov, int n, 			  int *num_bytes_ptr){#ifdef MPICH_DBG_OUTPUT    int mpi_errno;#endif    int i;    unsigned int total = 0;    unsigned int num_bytes = 0;    unsigned int cur_avail, dest_avail;    unsigned char *cur_pos, *dest_pos;    int index;    MPIDI_CH3I_SHM_Queue_t * writeq;    MPIDI_CH3I_VC *vcch = (MPIDI_CH3I_VC *)vc->channel_private;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);    MPIDI_STATE_DECL(MPID_STATE_MEMCPY);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);    writeq = vcch->write_shmq;    index = writeq->tail_index;    if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)    {	*num_bytes_ptr = 0;	MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);	return MPI_SUCCESS;    }    MPIU_DBG_PRINTF(("writing to write_shmq %p\n", writeq));#ifdef USE_IOV_LEN_2_SHORTCUT    if (n == 2 && (iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN) < 	MPIDI_CH3I_PACKET_SIZE)    {	MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN, writeq, index));	MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	memcpy(writeq->packet[index].data, 	       iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);	MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	memcpy(&writeq->packet[index].data[iov[0].MPID_IOV_LEN],	       iov[1].MPID_IOV_BUF, iov[1].MPID_IOV_LEN);	MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	writeq->packet[index].num_bytes = 	    iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN;	total = writeq->packet[index].num_bytes;	MPID_WRITE_BARRIER();	writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;#ifdef MPICH_DBG_OUTPUT	/*MPIU_Assert(index == writeq->tail_index);*/	if (index != writeq->tail_index)	{	    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**shmq_index", "**shmq_index %d %d", index, writeq->tail_index);	    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);	    return mpi_errno;	}#endif	writeq->tail_index =	    (writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS;	MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", writeq->tail_index));	MPIU_DBG_PRINTF(("shm_writev - %d bytes in packet %d\n", writeq->packet[index].num_bytes, index));	*num_bytes_ptr = total;	MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);	return MPI_SUCCESS;    }#endif    dest_pos = (unsigned char *)(writeq->packet[index].data);    dest_avail = MPIDI_CH3I_PACKET_SIZE;    writeq->packet[index].num_bytes = 0;    for (i=0; i<n; i++)    {	if (iov[i].MPID_IOV_LEN <= dest_avail)	{	    total += iov[i].MPID_IOV_LEN;	    writeq->packet[index].num_bytes += iov[i].MPID_IOV_LEN;	    dest_avail -= iov[i].MPID_IOV_LEN;	    MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", iov[i].MPID_IOV_LEN, writeq, index));	    MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	    memcpy(dest_pos, iov[i].MPID_IOV_BUF, iov[i].MPID_IOV_LEN);	    MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	    MPIU_DBG_PRINTF(("shm_writev: +%d=%d bytes in packet %d\n", iov[i].MPID_IOV_LEN, writeq->packet[index].num_bytes, index));	    dest_pos += iov[i].MPID_IOV_LEN;	}	else	{	    total += dest_avail;	    writeq->packet[index].num_bytes = MPIDI_CH3I_PACKET_SIZE;	    MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", dest_avail, writeq, index));	    MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);	    memcpy(dest_pos, iov[i].MPID_IOV_BUF, dest_avail);	    MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);	    MPIU_DBG_PRINTF(("shm_writev: +%d=%d bytes in packet %d\n", dest_avail, writeq->packet[index].num_bytes, index));	    MPID_WRITE_BARRIER();	    writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;	    cur_pos = (unsigned char *)(iov[i].MPID_IOV_BUF) + dest_avail;	    cur_avail = iov[i].MPID_IOV_LEN - dest_avail;	    while (cur_avail)	    {		index = writeq->tail_index = 		    (writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS;		MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", writeq->tail_index));		if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)		{		    *num_bytes_ptr = total;		    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);		    return MPI_SUCCESS;		}		num_bytes = min(cur_avail, MPIDI_CH3I_PACKET_SIZE);		writeq->packet[index].num_bytes = num_bytes;		MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", num_bytes, writeq, index));		MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY);		memcpy(writeq->packet[index].data, cur_pos, num_bytes);		MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);		MPIU_DBG_PRINTF(("shm_writev: +%d=%d bytes in packet %d\n", num_bytes, writeq->packet[index].num_bytes, index));		total += num_bytes;		cur_pos += num_bytes;		cur_avail -= num_bytes;		if (cur_avail)		{		    MPID_WRITE_BARRIER();		    writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;		}	    }	    dest_pos = (unsigned char *)		(writeq->packet[index].data) + num_bytes;	    dest_avail = MPIDI_CH3I_PACKET_SIZE - num_bytes;	}	if (dest_avail == 0)	{	    MPID_WRITE_BARRIER();	    writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;	    index = writeq->tail_index = 		(writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS;	    MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", 			      writeq->tail_index));	    if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED)	    {		*num_bytes_ptr = total;		MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);		return MPI_SUCCESS;	    }	    dest_pos = (unsigned char *)(writeq->packet[index].data);	    dest_avail = MPIDI_CH3I_PACKET_SIZE;	    writeq->packet[index].num_bytes = 0;	}    }    if (dest_avail < MPIDI_CH3I_PACKET_SIZE)    {	MPID_WRITE_BARRIER();	writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;	writeq->tail_index = 	    (writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS;	MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", 			  writeq->tail_index));    }    *num_bytes_ptr = total;    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV);    return MPI_SUCCESS;}#ifdef USE_RDMA_READV#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_rdma_readv#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_rdma_readv(MPIDI_VC_t *vc, MPID_Request *rreq){    int mpi_errno;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV);    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**notimpl", 0);    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV);    return mpi_errno;}#endif /* USE_RDMA_READV */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_read_progress#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_read_progress(MPIDI_VC_t *vc, int millisecond_timeout, 				 MPIDI_VC_t **vc_pptr, int *num_bytes_ptr, 				 shm_wait_t *shm_out){    int mpi_errno = MPI_SUCCESS;    void *mem_ptr;    char *iter_ptr;    int num_bytes;    unsigned int offset;    MPIDI_VC_t *recv_vc_ptr;    MPIDI_CH3I_VC *recv_vcch;    MPIDI_CH3I_SHM_Packet_t *pkt_ptr;    MPIDI_CH3I_SHM_Queue_t *shm_ptr;    register int index, working;#ifndef MPIDI_CH3_SHM_SCALABLE_READQUEUES    int i;#endif    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_READ_PROGRESS);    MPIDI_STATE_DECL(MPID_STATE_MEMCPY);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_READ_PROGRESS);    for (;;)     {	working = FALSE;#ifdef MPIDI_CH3_SHM_SCALABLE_READQUEUES	for (recv_vc_ptr=vc; recv_vc_ptr; 	     recv_vc_ptr = recv_vcch->shm_next_reader )	{	    recv_vcch = (MPIDI_CH3I_VC *)recv_vc_ptr->channel_private;	    shm_ptr = recv_vcch->read_shmq;	    if (shm_ptr == NULL)		continue;#else	for (i=0; i<MPIDI_PG_Get_size(vc->pg); i++)	{

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -