📄 ch3_shm.c
字号:
riov_offset = 0; for (i=sreq->ch.iov_offset; i<send_count; i++) { sbuf = send_iov[i].MPID_IOV_BUF; sbuf_len = send_iov[i].MPID_IOV_LEN; while (sbuf_len) { len = MPIDU_MIN(sbuf_len, rbuf_len); /*printf("writing %d bytes to remote process.\n", len);fflush(stdout);*/#ifdef HAVE_WINDOWS_H if (!WriteProcessMemory(vc->ch.hSharedProcessHandle, rbuf, sbuf, len, &num_written)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "WriteProcessMemory failed", GetLastError()); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } if (num_written == -1) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "WriteProcessMemory returned -1 bytes written"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; }#else#ifdef HAVE_PROC_RDMA_WRITE /* write is not implemented in the /proc device. It is considered a security hole. You can recompile a Linux * kernel with this function enabled and then define HAVE_PROC_RDMA_WRITE and this code will work. */#ifdef USE__LLSEEK n = _llseek(vc->ch.nSharedProcessFileDescriptor, 0, OFF_T_CAST(rbuf), &uOffset, SEEK_SET);#else uOffset = lseek(vc->ch.nSharedProcessFileDescriptor, OFF_T_CAST(rbuf), SEEK_SET); n = 0;#endif if (n != 0 || uOffset != OFF_T_CAST(rbuf)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "lseek failed", errno); ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } num_written = write(vc->ch.nSharedProcessFileDescriptor, sbuf, len); if (num_written < 1) { if (num_written == -1) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "write failed", errno); ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } ptrace(PTRACE_PEEKDATA, vc->ch.nSharedProcessID, rbuf + len - num_written, 0); }#else /* Do not use this code. Using PTRACE_POKEDATA for rdma writes gives horrible performance. * This code is only provided for correctness to show that the put model will run. */ do { int *rbuf2; rbuf2 = (int*)rbuf; for (n=0; n<len/sizeof(int); n++) { if (ptrace(PTRACE_POKEDATA, vc->ch.nSharedProcessID, rbuf2, &((int*)sbuf)[n]) != 0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ptrace pokedata failed", errno); /* printf("EPERM = %d, ESRCH = %d, EIO = %d, EFAULT = %d\n", EPERM, ESRCH, EIO, EFAULT);fflush(stdout); */ MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } } n = len % sizeof(int); if (n > 0) { if (len > sizeof(int)) { if (ptrace(PTRACE_POKEDATA, vc->ch.nSharedProcessID, rbuf + len - sizeof(int), ((int*)(sbuf + len - sizeof(int)))) != 0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ptrace pokedata failed", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } } else { int data; data = ptrace(PTRACE_PEEKDATA, vc->ch.nSharedProcessID, rbuf + len - n, 0); if (data == -1 && errno != 0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ptrace peekdata failed", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } /* mask in the new bits */ /* printf("FIXME!");fflush(stdout); */ if (ptrace(PTRACE_POKEDATA, vc->ch.nSharedProcessID, rbuf + len - n, &data) != 0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ptrace pokedata failed", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } } } } while (0); num_written = len;#endif#endif /*printf("wrote %d bytes to remote process\n", num_written);fflush(stdout);*/ if (num_written < (SIZE_T)rbuf_len) { rbuf = rbuf + num_written; rbuf_len = rbuf_len - num_written; } else { riov_offset = riov_offset + 1; if (riov_offset < recv_count) { rbuf = recv_iov[riov_offset].MPID_IOV_BUF; rbuf_len = recv_iov[riov_offset].MPID_IOV_LEN; } } sbuf = sbuf + num_written; sbuf_len = sbuf_len - num_written; if (riov_offset == recv_count) { if ( (i != (send_count - 1)) || (sbuf_len != 0) ) {#ifndef HAVE_WINDOWS_H if (ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0) != 0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ptrace detach failed", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; }#endif /* the recv iov needs to be reloaded */ if (sbuf_len != 0) { sreq->dev.iov[i].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)sbuf; sreq->dev.iov[i].MPID_IOV_LEN = sbuf_len; } sreq->ch.iov_offset = i; /* send the reload packet to the receiver */ MPIDI_Pkt_init(reload_pkt, MPIDI_CH3_PKT_RELOAD); reload_pkt->send_recv = MPIDI_CH3_PKT_RELOAD_RECV; mpi_errno = MPIDI_CH3_iStartMsg(vc, reload_pkt, sizeof(*reload_pkt), &reload_sreq); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } /* --END ERROR HANDLING-- */ if (reload_sreq != NULL) { /* The sender doesn't need to know when the packet has been sent. So release the request immediately */ MPID_Request_release(reload_sreq); } MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return MPI_SUCCESS; } } } }#ifndef HAVE_WINDOWS_H if (ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0) != 0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ptrace detach failed", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; }#endif /* update the sender's request */ mpi_errno = MPIDI_CH3U_Handle_send_req(vc, sreq, &complete); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "unable to update request after rdma write"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } if (complete || (riov_offset == recv_count)) { /* send the reload/done packet to the receiver */ MPIDI_Pkt_init(reload_pkt, MPIDI_CH3_PKT_RELOAD); reload_pkt->send_recv = MPIDI_CH3_PKT_RELOAD_RECV; mpi_errno = MPIDI_CH3_iStartMsg(vc, reload_pkt, sizeof(*reload_pkt), &reload_sreq); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno; } /* --END ERROR HANDLING-- */ if (reload_sreq != NULL) { /* The sender doesn't need to know when the packet has been sent. So release the request immediately */ MPID_Request_release(reload_sreq); } MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return MPI_SUCCESS; } } MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno;#else int mpi_errno; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**notimpl", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_WRITEV); return mpi_errno;#endif}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_rdma_readv#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_rdma_readv(MPIDI_VC_t *vc, MPID_Request *rreq){#ifdef MPIDI_CH3_CHANNEL_RNDV int mpi_errno = MPI_SUCCESS; int i; char *rbuf, *sbuf; int rbuf_len; int sbuf_len, siov_offset; int len;#ifndef HAVE_WINDOWS_H#define SIZE_T int#endif SIZE_T num_read; MPID_IOV *send_iov, *recv_iov; int send_count, recv_count; int complete; MPIDI_CH3_Pkt_t pkt; MPIDI_CH3_Pkt_rdma_reload_t * reload_pkt = &pkt.reload; MPID_Request * reload_rreq;#ifndef HAVE_WINDOWS_H int n, status; OFF_T uOffset;#endif MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); /* save the sender's request to send back with the reload packet */ /*reload_pkt->req = rreq->dev.rdma_request;*/ reload_pkt->sreq = rreq->dev.sender_req_id; reload_pkt->rreq = rreq->handle;#ifndef HAVE_WINDOWS_H if (ptrace(PTRACE_ATTACH, vc->ch.nSharedProcessID, 0, 0) != 0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ptrace attach failed", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno; } if (waitpid(vc->ch.nSharedProcessID, &status, WUNTRACED) != vc->ch.nSharedProcessID) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "waitpid failed", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno; }#endif for (;;) { recv_iov = rreq->dev.iov; recv_count = rreq->dev.iov_count; send_iov = rreq->dev.rdma_iov; send_count = rreq->dev.rdma_iov_count; siov_offset = rreq->dev.rdma_iov_offset; /* printf("shm_rdma: reading %d send buffers into %d recv buffers.\n", send_count, recv_count);fflush(stdout); for (i=siov_offset; i<send_count; i++) { printf("shm_rdma: send buf[%d] = %p, len = %d\n", i, send_iov[i].MPID_IOV_BUF, send_iov[i].MPID_IOV_LEN); } for (i=0; i<recv_count; i++) { printf("shm_rdma: recv buf[%d] = %p, len = %d\n", i, recv_iov[i].MPID_IOV_BUF, recv_iov[i].MPID_IOV_LEN); } fflush(stdout); */ sbuf = send_iov[siov_offset].MPID_IOV_BUF; sbuf_len = send_iov[siov_offset].MPID_IOV_LEN; for (i=rreq->ch.iov_offset; i<recv_count; i++) { rbuf = recv_iov[i].MPID_IOV_BUF; rbuf_len = recv_iov[i].MPID_IOV_LEN; while (rbuf_len) { len = MPIDU_MIN(rbuf_len, sbuf_len); /*printf("reading %d bytes from the remote process.\n", len);fflush(stdout);*/#ifdef HAVE_WINDOWS_H if (!ReadProcessMemory(vc->ch.hSharedProcessHandle, sbuf, rbuf, len, &num_read)) { /*printf("ReadProcessMemory failed, error %d\n", GetLastError());*/ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ReadProcessMemory failed", GetLastError()); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno; } if (num_read == -1) { /*printf("ReadProcessMemory read -1 bytes.\n");fflush(stdout);*/ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "ReadProcessMemory returned -1 bytes written"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno; }#else#ifdef USE_LLSEEK n = _llseek(vc->ch.nSharedProcessFileDescriptor, 0, OFF_T_CAST(sbuf), &uOffset, SEEK_SET);#else uOffset = lseek(vc->ch.nSharedProcessFileDescriptor, OFF_T_CAST(sbuf), SEEK_SET); n = 0;#endif if (n != 0 || uOffset != OFF_T_CAST(sbuf)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "lseek failed", errno); ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno; } num_read = read(vc->ch.nSharedProcessFileDescriptor, rbuf, len); if (num_read < 1) { if (num_read == -1) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "read failed", errno); ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno; } ptrace(PTRACE_PEEKDATA, vc->ch.nSharedProcessID, sbuf + len - num_read, 0); }#endif /*printf("read %d bytes from the remote process\n", num_read);fflush(stdout);*/ if (num_read < (SIZE_T)sbuf_len) { sbuf = sbuf + num_read; sbuf_len = sbuf_len - num_read; } else { siov_offset = siov_offset + 1; if (siov_offset < send_count) { sbuf = send_iov[siov_offset].MPID_IOV_BUF; sbuf_len = send_iov[siov_offset].MPID_IOV_LEN; } else { sbuf_len = 0; } } rbuf = rbuf + num_read; rbuf_len = rbuf_len - num_read; if (siov_offset == send_count) { if ( (i != (recv_count - 1)) || (rbuf_len != 0) ) {#ifndef HAVE_WINDOWS_H if (ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0) != 0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ptrace detach failed", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno; }#endif /* the send iov needs to be reloaded */ if (rbuf_len != 0) { rreq->dev.iov[i].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)rbuf; rreq->dev.iov[i].MPID_IOV_LEN = rbuf_len; } rreq->ch.iov_offset = i; /* send the reload packet to the sender */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -