📄 ch3_shm.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include <stdio.h>/*#undef USE_IOV_LEN_2_SHORTCUT*/#define USE_IOV_LEN_2_SHORTCUT#define SHM_READING_BIT 0x0008#ifndef min#define min(a, b) ((a) < (b) ? (a) : (b))#endif/* shmem functions */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_write#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_write(MPIDI_VC_t * vc, void *buf, int len, int *num_bytes_ptr){ int total = 0; int length; int index; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITE); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITE); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); index = vc->ch.write_shmq->tail_index; if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE); return MPI_SUCCESS; } while (len) { length = min(len, MPIDI_CH3I_PACKET_SIZE); /*vc->ch.write_shmq->packet[index].offset = 0; the reader guarantees this is reset to zero */ vc->ch.write_shmq->packet[index].num_bytes = length; MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(vc->ch.write_shmq->packet[index].data, buf, length); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPID_WRITE_BARRIER(); vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; buf = (char *) buf + length; total += length; len -= length; index = (index + 1) % MPIDI_CH3I_NUM_PACKETS; if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { vc->ch.write_shmq->tail_index = index; *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE); return MPI_SUCCESS; } } vc->ch.write_shmq->tail_index = index; *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_writev(MPIDI_VC_t *vc, MPID_IOV *iov, int n, int *num_bytes_ptr){#ifdef MPICH_DBG_OUTPUT int mpi_errno;#endif int i; unsigned int total = 0; unsigned int num_bytes= 0; unsigned int cur_avail, dest_avail; unsigned char *cur_pos, *dest_pos; int index; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); index = vc->ch.write_shmq->tail_index; if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { *num_bytes_ptr = 0; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; } MPIU_DBG_PRINTF(("writing to write_shmq %p\n", vc->ch.write_shmq));#ifdef USE_IOV_LEN_2_SHORTCUT if (n == 2 && (iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN) < MPIDI_CH3I_PACKET_SIZE) { MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN, vc->ch.write_shmq, index)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(vc->ch.write_shmq->packet[index].data, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(&vc->ch.write_shmq->packet[index].data[iov[0].MPID_IOV_LEN], iov[1].MPID_IOV_BUF, iov[1].MPID_IOV_LEN); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); vc->ch.write_shmq->packet[index].num_bytes = iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN; total = vc->ch.write_shmq->packet[index].num_bytes; MPID_WRITE_BARRIER(); vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_FILLED;#ifdef MPICH_DBG_OUTPUT /*MPIU_Assert(index == vc->ch.write_shmq->tail_index);*/ if (index != vc->ch.write_shmq->tail_index) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**shmq_index", "**shmq_index %d %d", index, vc->ch.write_shmq->tail_index); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return mpi_errno; }#endif vc->ch.write_shmq->tail_index = (vc->ch.write_shmq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", vc->ch.write_shmq->tail_index)); *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; }#endif dest_pos = (unsigned char *)(vc->ch.write_shmq->packet[index].data); dest_avail = MPIDI_CH3I_PACKET_SIZE; vc->ch.write_shmq->packet[index].num_bytes = 0; for (i=0; i<n; i++) { if (iov[i].MPID_IOV_LEN <= dest_avail) { total += iov[i].MPID_IOV_LEN; vc->ch.write_shmq->packet[index].num_bytes += iov[i].MPID_IOV_LEN; dest_avail -= iov[i].MPID_IOV_LEN; MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", iov[i].MPID_IOV_LEN, vc->ch.write_shmq, index)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(dest_pos, iov[i].MPID_IOV_BUF, iov[i].MPID_IOV_LEN); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); dest_pos += iov[i].MPID_IOV_LEN; } else { total += dest_avail; vc->ch.write_shmq->packet[index].num_bytes = MPIDI_CH3I_PACKET_SIZE; MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", dest_avail, vc->ch.write_shmq, index)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(dest_pos, iov[i].MPID_IOV_BUF, dest_avail); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPID_WRITE_BARRIER(); vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; cur_pos = (unsigned char*)iov[i].MPID_IOV_BUF + dest_avail; cur_avail = iov[i].MPID_IOV_LEN - dest_avail; while (cur_avail) { index = vc->ch.write_shmq->tail_index = (vc->ch.write_shmq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", vc->ch.write_shmq->tail_index)); if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; } num_bytes = min(cur_avail, MPIDI_CH3I_PACKET_SIZE); vc->ch.write_shmq->packet[index].num_bytes = num_bytes; MPIDI_DBG_PRINTF((60, FCNAME, "writing %d bytes to write_shmq %08p packet[%d]", num_bytes, vc->ch.write_shmq, index)); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(vc->ch.write_shmq->packet[index].data, cur_pos, num_bytes); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); total += num_bytes; cur_pos += num_bytes; cur_avail -= num_bytes; if (cur_avail) { MPID_WRITE_BARRIER(); vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; } } dest_pos = (unsigned char *)(vc->ch.write_shmq->packet[index].data) + num_bytes; dest_avail = MPIDI_CH3I_PACKET_SIZE - num_bytes; } if (dest_avail == 0) { MPID_WRITE_BARRIER(); vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; index = vc->ch.write_shmq->tail_index = (vc->ch.write_shmq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", vc->ch.write_shmq->tail_index)); if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_FILLED) { *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; } dest_pos = (unsigned char *)(vc->ch.write_shmq->packet[index].data); dest_avail = MPIDI_CH3I_PACKET_SIZE; vc->ch.write_shmq->packet[index].num_bytes = 0; } } if (dest_avail < MPIDI_CH3I_PACKET_SIZE) { MPID_WRITE_BARRIER(); vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_FILLED; vc->ch.write_shmq->tail_index = (vc->ch.write_shmq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; MPIDI_DBG_PRINTF((60, FCNAME, "write_shmq tail = %d", vc->ch.write_shmq->tail_index)); } *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_rdma_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_rdma_writev(MPIDI_VC_t *vc, MPID_Request *sreq){#ifdef MPIDI_CH3_CHANNEL_RNDV int mpi_errno = MPI_SUCCESS; int i; char *rbuf, *sbuf; int rbuf_len, riov_offset; int sbuf_len; int len;#ifndef HAVE_WINDOWS_H#define SIZE_T int#endif SIZE_T num_written; MPID_IOV *send_iov, *recv_iov; int send_count, recv_count; int complete; MPIDI_CH3_Pkt_t pkt; MPIDI_CH3_Pkt_rdma_reload_t * reload_pkt = &pkt.reload; MPID_Request * reload_sreq;#ifndef HAVE_WINDOWS_H int n, status; OFF_T uOffset;#endif MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); /* save the receiver's request to send back with the reload packet */ reload_pkt->rreq = sreq->dev.rdma_request; reload_pkt->sreq = sreq->handle;#ifndef HAVE_WINDOWS_H if (ptrace(PTRACE_ATTACH, vc->ch.nSharedProcessID, 0, 0) != 0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ptrace attach failed", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } if (waitpid(vc->ch.nSharedProcessID, &status, WUNTRACED) != vc->ch.nSharedProcessID) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "waitpid failed", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; }#endif for (;;) { send_iov = sreq->dev.iov; send_count = sreq->dev.iov_count; recv_iov = sreq->dev.rdma_iov; recv_count = sreq->dev.rdma_iov_count; /* printf("shm_rdma: writing %d send buffers into %d recv buffers.\n", send_count, recv_count);fflush(stdout); for (i=0; i<send_count; i++) { printf("shm_rdma: send buf[%d] = %p, len = %d\n", i, send_iov[i].MPID_IOV_BUF, send_iov[i].MPID_IOV_LEN); } for (i=0; i<recv_count; i++) { printf("shm_rdma: recv buf[%d] = %p, len = %d\n", i, recv_iov[i].MPID_IOV_BUF, recv_iov[i].MPID_IOV_LEN); } fflush(stdout); */ rbuf = recv_iov[0].MPID_IOV_BUF; rbuf_len = recv_iov[0].MPID_IOV_LEN; riov_offset = 0; for (i=sreq->ch.iov_offset; i<send_count; i++) { sbuf = send_iov[i].MPID_IOV_BUF; sbuf_len = send_iov[i].MPID_IOV_LEN; while (sbuf_len) { len = MPIDU_MIN(sbuf_len, rbuf_len); /*printf("writing %d bytes to remote process.\n", len);fflush(stdout);*/#ifdef HAVE_WINDOWS_H if (!WriteProcessMemory(vc->ch.hSharedProcessHandle, rbuf, sbuf, len, &num_written)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "WriteProcessMemory failed", GetLastError()); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } if (num_written == -1) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "WriteProcessMemory returned -1 bytes written"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; }#else#ifdef HAVE_PROC_RDMA_WRITE /* write is not implemented in the /proc device. It is considered a security hole. You can recompile a Linux * kernel with this function enabled and then define HAVE_PROC_RDMA_WRITE and this code will work. */#ifdef USE__LLSEEK n = _llseek(vc->ch.nSharedProcessFileDescriptor, 0, OFF_T_CAST(rbuf), &uOffset, SEEK_SET);#else uOffset = lseek(vc->ch.nSharedProcessFileDescriptor, OFF_T_CAST(rbuf), SEEK_SET); n = 0;#endif if (n != 0 || uOffset != OFF_T_CAST(rbuf)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "lseek failed", errno); ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } num_written = write(vc->ch.nSharedProcessFileDescriptor, sbuf, len); if (num_written < 1) { if (num_written == -1) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "write failed", errno); ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } ptrace(PTRACE_PEEKDATA, vc->ch.nSharedProcessID, rbuf + len - num_written, 0); }#else /* Do not use this code. Using PTRACE_POKEDATA for rdma writes gives horrible performance. * This code is only provided for correctness to show that the put model will run. */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -