📄 ch3_shm.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include <stdio.h>/*#undef USE_IOV_LEN_2_SHORTCUT*/#define USE_IOV_LEN_2_SHORTCUT#define SHM_READING_BIT 0x0008#ifndef min#define min(a, b) ((a) < (b) ? (a) : (b))#endif/* shmem functions */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_write#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_write(MPIDI_VC_t * vc, void *buf, int len, int *num_bytes_ptr){ int total = 0; int length; int index; MPIDI_CH3I_SHM_Queue_t * writeq; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITE); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITE); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); writeq = vc->ch.write_shmq; index = writeq->tail_index; if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE); return MPI_SUCCESS; } while (len) { length = min(len, MPIDI_CH3I_PACKET_SIZE); /*writeq->packet[index].offset = 0; the reader guarantees this is reset to zero */ writeq->packet[index].num_bytes = length; MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(writeq->packet[index].data, buf, length); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPID_WRITE_BARRIER(); writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; buf = (char *) buf + length; total += length; len -= length; index = (index + 1) % MPIDI_CH3I_NUM_PACKETS; if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { writeq->tail_index = index; *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE); return MPI_SUCCESS; } } writeq->tail_index = index; *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_writev(MPIDI_VC_t *vc, MPID_IOV *iov, int n, int *num_bytes_ptr){#ifdef MPICH_DBG_OUTPUT int mpi_errno;#endif int i; unsigned int total = 0; unsigned int num_bytes=0; unsigned int cur_avail, dest_avail; unsigned char *cur_pos, *dest_pos; int index; MPIDI_CH3I_SHM_Queue_t * writeq; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); writeq = vc->ch.write_shmq; index = writeq->tail_index; if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { *num_bytes_ptr = 0; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; } MPIU_DBG_PRINTF(("writing to write_shmq %p\n", writeq));#ifdef USE_IOV_LEN_2_SHORTCUT if (n == 2 && (iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN) < MPIDI_CH3I_PACKET_SIZE) { MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN, writeq, index)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(writeq->packet[index].data, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(&writeq->packet[index].data[iov[0].MPID_IOV_LEN], iov[1].MPID_IOV_BUF, iov[1].MPID_IOV_LEN); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); writeq->packet[index].num_bytes = iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN; total = writeq->packet[index].num_bytes; MPID_WRITE_BARRIER(); writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;#ifdef MPICH_DBG_OUTPUT /*MPIU_Assert(index == writeq->tail_index);*/ if (index != writeq->tail_index) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**shmq_index", "**shmq_index %d %d", index, writeq->tail_index); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return mpi_errno; }#endif writeq->tail_index = (writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", writeq->tail_index)); *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; }#endif dest_pos = writeq->packet[index].data; dest_avail = MPIDI_CH3I_PACKET_SIZE; writeq->packet[index].num_bytes = 0; for (i=0; i<n; i++) { if (iov[i].MPID_IOV_LEN <= dest_avail) { total += iov[i].MPID_IOV_LEN; writeq->packet[index].num_bytes += iov[i].MPID_IOV_LEN; dest_avail -= iov[i].MPID_IOV_LEN; MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", iov[i].MPID_IOV_LEN, writeq, index)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(dest_pos, iov[i].MPID_IOV_BUF, iov[i].MPID_IOV_LEN); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); dest_pos += iov[i].MPID_IOV_LEN; } else { total += dest_avail; writeq->packet[index].num_bytes = MPIDI_CH3I_PACKET_SIZE; MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", dest_avail, writeq, index)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(dest_pos, iov[i].MPID_IOV_BUF, dest_avail); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPID_WRITE_BARRIER(); writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; cur_pos = (char *) iov[i].MPID_IOV_BUF + dest_avail; cur_avail = iov[i].MPID_IOV_LEN - dest_avail; while (cur_avail) { index = writeq->tail_index = (writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", writeq->tail_index)); if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; } num_bytes = min(cur_avail, MPIDI_CH3I_PACKET_SIZE); writeq->packet[index].num_bytes = num_bytes; MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", num_bytes, writeq, index)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(writeq->packet[index].data, cur_pos, num_bytes); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); total += num_bytes; cur_pos += num_bytes; cur_avail -= num_bytes; if (cur_avail) { MPID_WRITE_BARRIER(); writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; } } dest_pos = writeq->packet[index].data + num_bytes; dest_avail = MPIDI_CH3I_PACKET_SIZE - num_bytes; } if (dest_avail == 0) { MPID_WRITE_BARRIER(); writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; index = writeq->tail_index = (writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", writeq->tail_index)); if (writeq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; } dest_pos = writeq->packet[index].data; dest_avail = MPIDI_CH3I_PACKET_SIZE; writeq->packet[index].num_bytes = 0; } } if (dest_avail < MPIDI_CH3I_PACKET_SIZE) { MPID_WRITE_BARRIER(); writeq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; writeq->tail_index = (writeq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", writeq->tail_index)); } *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_read#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_read(MPIDI_VC_t * recv_vc_ptr, void *buf, int len, int *num_bytes_ptr){ int mpi_errno = MPI_SUCCESS; void *mem_ptr; int num_bytes; MPIDI_CH3I_SHM_Packet_t *pkt_ptr; MPIDI_CH3I_SHM_Queue_t *shm_ptr; register int index; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_READ); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_READ); shm_ptr = recv_vc_ptr->ch.read_shmq; if (shm_ptr == NULL) { *num_bytes_ptr = 0; goto fn_exit; } index = shm_ptr->head_index; pkt_ptr = &shm_ptr->packet[index]; /* if the packet at the head index is available, the queue is empty */ if (pkt_ptr->avail == MPIDI_CH3I_PKT_EMPTY) { *num_bytes_ptr = 0; goto fn_exit; } MPID_READ_BARRIER(); /* no loads after this line can occur before the avail flag has been read */ MPIU_DBG_PRINTF(("MPIDI_CH3I_SHM_read_progress: reading from queue %p\n", shm_ptr)); mem_ptr = (void*)(pkt_ptr->data + pkt_ptr->offset); num_bytes = pkt_ptr->num_bytes; MPIDI_DBG_PRINTF((60, FCNAME, "read %d bytes", num_bytes)); if (num_bytes > len) { /* copy the received data */ MPIDI_DBG_PRINTF((60, FCNAME, "reading %d bytes from read_shmq %08p packet[%d]", recv_vc_ptr->ch.read.bufflen, shm_ptr, index)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(buf, mem_ptr, len); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); *num_bytes_ptr = len; pkt_ptr->offset += len; pkt_ptr->num_bytes = num_bytes - len; } else { /* copy the received data */ MPIDI_DBG_PRINTF((60, FCNAME, "reading %d bytes from read_shmq %08p packet[%d]", num_bytes, shm_ptr, index)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(buf, mem_ptr, num_bytes); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); *num_bytes_ptr = num_bytes; /* put the shmem buffer back in the queue */ pkt_ptr->offset = 0; MPID_READ_WRITE_BARRIER(); /* the writing of the flag cannot occur before the reading of the last piece of data */ pkt_ptr->avail = MPIDI_CH3I_PKT_EMPTY;#ifdef MPICH_DBG_OUTPUT /*MPIU_Assert(&shm_ptr->packet[index] == pkt_ptr);*/ if (&shm_ptr->packet[index] != pkt_ptr) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**pkt_ptr", "**pkt_ptr %p %p", &shm_ptr->packet[index], pkt_ptr); goto fn_exit; }#endif shm_ptr->head_index = (index + 1) % MPIDI_CH3I_NUM_PACKETS; MPIDI_DBG_PRINTF((60, FCNAME, "read_shmq head = %d", shm_ptr->head_index)); }fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_READ); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_rdma_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_rdma_writev(MPIDI_VC_t *vc, MPID_Request *sreq){#ifdef MPIDI_CH3_CHANNEL_RNDV int mpi_errno = MPI_SUCCESS; int i; char *rbuf, *sbuf; int rbuf_len, riov_offset; int sbuf_len; int len;#ifndef HAVE_WINDOWS_H#define SIZE_T int#endif SIZE_T num_written; MPID_IOV *send_iov, *recv_iov; int send_count, recv_count; int complete; MPIDI_CH3_Pkt_t pkt; MPIDI_CH3_Pkt_rdma_reload_t * reload_pkt = &pkt.reload; MPID_Request * reload_sreq;#ifndef HAVE_WINDOWS_H int n, status; OFF_T uOffset;#endif MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); /* save the receiver's request to send back with the reload packet */ reload_pkt->rreq = sreq->dev.rdma_request; reload_pkt->sreq = sreq->handle;#ifndef HAVE_WINDOWS_H if (ptrace(PTRACE_ATTACH, vc->ch.nSharedProcessID, 0, 0) != 0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ptrace attach failed", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } if (waitpid(vc->ch.nSharedProcessID, &status, WUNTRACED) != vc->ch.nSharedProcessID) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "waitpid failed", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; }#endif for (;;) { send_iov = sreq->dev.iov; send_count = sreq->dev.iov_count; recv_iov = sreq->dev.rdma_iov; recv_count = sreq->dev.rdma_iov_count; /* printf("shm_rdma: writing %d send buffers into %d recv buffers.\n", send_count, recv_count);fflush(stdout); for (i=0; i<send_count; i++) { printf("shm_rdma: send buf[%d] = %p, len = %d\n", i, send_iov[i].MPID_IOV_BUF, send_iov[i].MPID_IOV_LEN); } for (i=0; i<recv_count; i++) { printf("shm_rdma: recv buf[%d] = %p, len = %d\n", i, recv_iov[i].MPID_IOV_BUF, recv_iov[i].MPID_IOV_LEN); } fflush(stdout); */ rbuf = recv_iov[0].MPID_IOV_BUF; rbuf_len = recv_iov[0].MPID_IOV_LEN;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -