📄 ch3_shm.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include <stdio.h>/* STATES:NO WARNINGS *//*#undef USE_IOV_LEN_2_SHORTCUT*/#define USE_IOV_LEN_2_SHORTCUT#define SHM_READING_BIT 0x0008#define MPIDI_CH3_PKT_RELOAD_SEND 1#define MPIDI_CH3_PKT_RELOAD_RECV 0#ifndef min#define min(a, b) ((a) < (b) ? (a) : (b))#endif/* * Here are the two choices that we need to make to allow this * file to be used by several channels. * * First, is it part of a multi-method channel (e.g., ssm) or part of a * single method channel (e.g. shm)? * * Define MPIDI_CH3_SHM_SHARES_PKTARRAY if it is shared * * Second, how are the receive queues arranged? Are they * in a list (scalable, by active connection) or * are they in an array of pointers? * * Define MPIDI_CH3_SHM_SCALABLE_READQUEUES if they are in a list *//* shmem functions */#ifdef MPIDI_CH3_SHM_SHARES_PKTARRAYextern MPIDI_CH3_PktHandler_Fcn *MPIDI_pktArray[MPIDI_CH3_PKT_END_CH3+1];#elsestatic MPIDI_CH3_PktHandler_Fcn *MPIDI_pktArray[MPIDI_CH3_PKT_END_CH3+1];int MPIDI_CH3I_SHM_Progress_init(void){ int mpi_errno; /* Initialize the code to handle incoming packets */ mpi_errno = MPIDI_CH3_PktHandler_Init( MPIDI_pktArray, MPIDI_CH3_PKT_END_CH3+1 ); return mpi_errno;}#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_write#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_write(MPIDI_VC_t * vc, void *buf, int len, int *num_bytes_ptr){ int total = 0; int length; int index; MPIDI_CH3I_SHM_Queue_t * writeq; MPIDI_CH3I_VC *vcch; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITE); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITE); vcch = (MPIDI_CH3I_VC *)vc->channel_private; writeq = vcch->write_shmq; index = writeq->tail_index; if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { *num_bytes_ptr = total; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE); return MPI_SUCCESS; } while (len) { length = min(len, MPIDI_CH3I_PACKET_SIZE); /*writeq->packet[index].offset = 0; the reader guarantees this is reset to zero */ writeq->packet[index].num_bytes = length; MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(writeq->packet[index].data, buf, length); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIU_DBG_PRINTF(("shm_write: %d bytes in packet %d\n", writeq->packet[index].num_bytes, index)); MPID_WRITE_BARRIER(); writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; buf = (char *) buf + length; total += length; len -= length; index = (index + 1) % MPIDI_CH3I_NUM_PACKETS; if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { writeq->tail_index = index; *num_bytes_ptr = total; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE); return MPI_SUCCESS; } } writeq->tail_index = index; *num_bytes_ptr = total; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_writev(MPIDI_VC_t *vc, MPID_IOV *iov, int n, int *num_bytes_ptr){#ifdef MPICH_DBG_OUTPUT int mpi_errno;#endif int i; unsigned int total = 0; unsigned int num_bytes = 0; unsigned int cur_avail, dest_avail; unsigned char *cur_pos, *dest_pos; int index; MPIDI_CH3I_SHM_Queue_t * writeq; MPIDI_CH3I_VC *vcch = (MPIDI_CH3I_VC *)vc->channel_private; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); writeq = vcch->write_shmq; index = writeq->tail_index; if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { *num_bytes_ptr = 0; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; } MPIU_DBG_PRINTF(("writing to write_shmq %p\n", writeq));#ifdef USE_IOV_LEN_2_SHORTCUT if (n == 2 && (iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN) < MPIDI_CH3I_PACKET_SIZE) { MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN, writeq, index)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(writeq->packet[index].data, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(&writeq->packet[index].data[iov[0].MPID_IOV_LEN], iov[1].MPID_IOV_BUF, iov[1].MPID_IOV_LEN); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); writeq->packet[index].num_bytes = iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN; total = writeq->packet[index].num_bytes; MPID_WRITE_BARRIER(); writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;#ifdef MPICH_DBG_OUTPUT /*MPIU_Assert(index == writeq->tail_index);*/ if (index != writeq->tail_index) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**shmq_index", "**shmq_index %d %d", index, writeq->tail_index); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return mpi_errno; }#endif writeq->tail_index = (writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", writeq->tail_index)); MPIU_DBG_PRINTF(("shm_writev - %d bytes in packet %d\n", writeq->packet[index].num_bytes, index)); *num_bytes_ptr = total; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; }#endif dest_pos = (unsigned char *)(writeq->packet[index].data); dest_avail = MPIDI_CH3I_PACKET_SIZE; writeq->packet[index].num_bytes = 0; for (i=0; i<n; i++) { if (iov[i].MPID_IOV_LEN <= dest_avail) { total += iov[i].MPID_IOV_LEN; writeq->packet[index].num_bytes += iov[i].MPID_IOV_LEN; dest_avail -= iov[i].MPID_IOV_LEN; MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", iov[i].MPID_IOV_LEN, writeq, index)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(dest_pos, iov[i].MPID_IOV_BUF, iov[i].MPID_IOV_LEN); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIU_DBG_PRINTF(("shm_writev: +%d=%d bytes in packet %d\n", iov[i].MPID_IOV_LEN, writeq->packet[index].num_bytes, index)); dest_pos += iov[i].MPID_IOV_LEN; } else { total += dest_avail; writeq->packet[index].num_bytes = MPIDI_CH3I_PACKET_SIZE; MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", dest_avail, writeq, index)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(dest_pos, iov[i].MPID_IOV_BUF, dest_avail); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIU_DBG_PRINTF(("shm_writev: +%d=%d bytes in packet %d\n", dest_avail, writeq->packet[index].num_bytes, index)); MPID_WRITE_BARRIER(); writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; cur_pos = (unsigned char *)(iov[i].MPID_IOV_BUF) + dest_avail; cur_avail = iov[i].MPID_IOV_LEN - dest_avail; while (cur_avail) { index = writeq->tail_index = (writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", writeq->tail_index)); if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { *num_bytes_ptr = total; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; } num_bytes = min(cur_avail, MPIDI_CH3I_PACKET_SIZE); writeq->packet[index].num_bytes = num_bytes; MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", num_bytes, writeq, index)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(writeq->packet[index].data, cur_pos, num_bytes); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIU_DBG_PRINTF(("shm_writev: +%d=%d bytes in packet %d\n", num_bytes, writeq->packet[index].num_bytes, index)); total += num_bytes; cur_pos += num_bytes; cur_avail -= num_bytes; if (cur_avail) { MPID_WRITE_BARRIER(); writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; } } dest_pos = (unsigned char *) (writeq->packet[index].data) + num_bytes; dest_avail = MPIDI_CH3I_PACKET_SIZE - num_bytes; } if (dest_avail == 0) { MPID_WRITE_BARRIER(); writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; index = writeq->tail_index = (writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", writeq->tail_index)); if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { *num_bytes_ptr = total; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; } dest_pos = (unsigned char *)(writeq->packet[index].data); dest_avail = MPIDI_CH3I_PACKET_SIZE; writeq->packet[index].num_bytes = 0; } } if (dest_avail < MPIDI_CH3I_PACKET_SIZE) { MPID_WRITE_BARRIER(); writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; writeq->tail_index = (writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", writeq->tail_index)); } *num_bytes_ptr = total; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS;}#ifdef USE_RDMA_READV#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_rdma_readv#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_rdma_readv(MPIDI_VC_t *vc, MPID_Request *rreq){ int mpi_errno; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**notimpl", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno;}#endif /* USE_RDMA_READV */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_read_progress#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_read_progress(MPIDI_VC_t *vc, int millisecond_timeout, MPIDI_VC_t **vc_pptr, int *num_bytes_ptr, shm_wait_t *shm_out){ int mpi_errno = MPI_SUCCESS; void *mem_ptr; char *iter_ptr; int num_bytes; unsigned int offset; MPIDI_VC_t *recv_vc_ptr; MPIDI_CH3I_VC *recv_vcch; MPIDI_CH3I_SHM_Packet_t *pkt_ptr; MPIDI_CH3I_SHM_Queue_t *shm_ptr; register int index, working;#ifndef MPIDI_CH3_SHM_SCALABLE_READQUEUES int i;#endif MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_READ_PROGRESS); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_READ_PROGRESS); for (;;) { working = FALSE;#ifdef MPIDI_CH3_SHM_SCALABLE_READQUEUES for (recv_vc_ptr=vc; recv_vc_ptr; recv_vc_ptr = recv_vcch->shm_next_reader ) { recv_vcch = (MPIDI_CH3I_VC *)recv_vc_ptr->channel_private; shm_ptr = recv_vcch->read_shmq; if (shm_ptr == NULL) continue;#else for (i=0; i<MPIDI_PG_Get_size(vc->pg); i++) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -