📄 shm.c
字号:
}#else#ifdef USE_LLSEEK n = _llseek(vc->ch.nSharedProcessFileDescriptor, 0, OFF_T_CAST(sbuf), &uOffset, SEEK_SET);#else uOffset = lseek(vc->ch.nSharedProcessFileDescriptor, OFF_T_CAST(sbuf), SEEK_SET); n = 0;#endif if (n != 0 || uOffset != OFF_T_CAST(sbuf)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "lseek failed", errno); ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno; } num_read = read(vc->ch.nSharedProcessFileDescriptor, rbuf, len); if (num_read < 1) { if (num_read == -1) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "read failed", errno); ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno; } ptrace(PTRACE_PEEKDATA, vc->ch.nSharedProcessID, sbuf + len - num_read, 0); }#endif /*printf("read %d bytes from the remote process\n", num_read);fflush(stdout);*/ if (num_read < (SIZE_T)sbuf_len) { sbuf = sbuf + num_read; sbuf_len = sbuf_len - num_read; } else { siov_offset = siov_offset + 1; if (siov_offset < send_count) { sbuf = send_iov[siov_offset].MPID_IOV_BUF; sbuf_len = send_iov[siov_offset].MPID_IOV_LEN; } else { sbuf_len = 0; } } rbuf = rbuf + num_read; rbuf_len = rbuf_len - num_read; if (siov_offset == send_count) { if ( (i != (recv_count - 1)) || (rbuf_len != 0) ) {#ifndef HAVE_WINDOWS_H if (ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0) != 0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ptrace detach failed", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno; }#endif /* the send iov needs to be reloaded */ if (rbuf_len != 0) { rreq->dev.iov[i].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)rbuf; rreq->dev.iov[i].MPID_IOV_LEN = rbuf_len; } rreq->ch.iov_offset = i; /* send the reload packet to the sender */ /*printf("sending reload packet to the sender.\n");fflush(stdout);*/ MPIDI_Pkt_init(reload_pkt, MPIDI_CH3_PKT_RELOAD); reload_pkt->send_recv = MPIDI_CH3_PKT_RELOAD_SEND; mpi_errno = MPIDI_CH3_iStartMsg(vc, reload_pkt, sizeof(*reload_pkt), &reload_rreq); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno; } /* --END ERROR HANDLING-- */ if (reload_rreq != NULL) { /* The sender doesn't need to know when the packet has been sent. So release the request immediately */ MPID_Request_release(reload_rreq); } MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return MPI_SUCCESS; } } } }#ifndef HAVE_WINDOWS_H if (ptrace(PTRACE_DETACH, vc->ch.nSharedProcessID, 0, 0) != 0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "ptrace detach failed", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno; }#endif /* update the sender's request */ mpi_errno = MPIDI_CH3U_Handle_recv_req(vc, rreq, &complete); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "unable to update request after rdma read"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno; } if (complete || (siov_offset == send_count)) { /*printf("sending reload send packet.\n");fflush(stdout);*/ /* send the reload/done packet to the sender */ MPIDI_Pkt_init(reload_pkt, MPIDI_CH3_PKT_RELOAD); reload_pkt->send_recv = MPIDI_CH3_PKT_RELOAD_SEND; mpi_errno = MPIDI_CH3_iStartMsg(vc, reload_pkt, sizeof(*reload_pkt), &reload_rreq); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno; } /* --END ERROR HANDLING-- */ if (reload_rreq != NULL) { /* The sender doesn't need to know when the packet has been sent. So release the request immediately */ MPID_Request_release(reload_rreq); } MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return MPI_SUCCESS; } rreq->dev.rdma_iov_offset = siov_offset; rreq->dev.rdma_iov[siov_offset].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)sbuf; rreq->dev.rdma_iov[siov_offset].MPID_IOV_LEN = sbuf_len; } MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno;#else int mpi_errno; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**notimpl", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_RDMA_READV); return mpi_errno;#endif}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_wait(MPIDI_VC_t *vc, int millisecond_timeout, MPIDI_VC_t **vc_pptr, int *num_bytes_ptr, shm_wait_t *shm_out){ int mpi_errno; void *mem_ptr; char *iter_ptr; int num_bytes; unsigned int offset; MPIDI_VC_t *recv_vc_ptr; MPIDI_CH3I_SHM_Packet_t *pkt_ptr; int i; register int index, working;#ifdef USE_SHM_UNEX MPIDI_VC_t *temp_vc_ptr;#endif MPID_Request *sreq, *rreq; int complete; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WAIT); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WAIT); for (;;) {#ifdef USE_SHM_UNEX if (MPIDI_CH3I_Process.unex_finished_list) { MPIDI_DBG_PRINTF((60, FCNAME, "returning previously received %d bytes", MPIDI_CH3I_Process.unex_finished_list->ch.read.total)); *num_bytes_ptr = MPIDI_CH3I_Process.unex_finished_list->ch.read.total; *vc_pptr = MPIDI_CH3I_Process.unex_finished_list; /* remove this vc from the finished list */ temp_vc_ptr = MPIDI_CH3I_Process.unex_finished_list; MPIDI_CH3I_Process.unex_finished_list = MPIDI_CH3I_Process.unex_finished_list->ch.unex_finished_next; temp_vc_ptr->ch.unex_finished_next = NULL; *shm_out = SHM_WAIT_READ; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return MPI_SUCCESS; }#endif /* USE_SHM_UNEX */ working = FALSE; for (i=0; i<MPIDI_PG_Get_size(vc->pg); i++) { /* skip over the vc to myself */ if (vc->pg_rank == i) continue; index = vc->ch.shm[i].head_index; /* if the packet at the head index is available, the queue is empty */ if (vc->ch.shm[i].packet[index].avail == MPIDI_CH3I_PKT_AVAILABLE) continue; MPID_READ_BARRIER(); /* no loads after this line can occur before the avail flag has been read */ working = TRUE; pkt_ptr = &vc->ch.shm[i].packet[index]; mem_ptr = (void*)(pkt_ptr->data + pkt_ptr->offset); /*mem_ptr = (void*)vc->ch.shm[i].packet[index].cur_pos;*/ num_bytes = vc->ch.shm[i].packet[index].num_bytes; MPIDI_PG_Get_vc(vc->pg, i, &recv_vc_ptr); if (recv_vc_ptr->ch.shm_reading_pkt) { MPIU_DBG_PRINTF(("shm_read_progress: reading %d byte header from shm packet %d offset %d size %d\n", sizeof(MPIDI_CH3_Pkt_t), index, pkt_ptr->offset, num_bytes));#ifdef MPIDI_CH3_CHANNEL_RNDV if (((MPIDI_CH3_Pkt_t*)mem_ptr)->type > MPIDI_CH3_PKT_END_CH3) { if (((MPIDI_CH3_Pkt_t*)mem_ptr)->type == MPIDI_CH3_PKT_RTS_IOV) { /*printf("received rts packet.\n");fflush(stdout);*/ rreq = MPID_Request_create(); if (rreq == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } MPIU_Object_set_ref(rreq, 1); rreq->kind = MPIDI_CH3I_RTS_IOV_READ_REQUEST; rreq->dev.rdma_request = ((MPIDI_CH3_Pkt_rdma_rts_iov_t*)mem_ptr)->sreq; rreq->dev.rdma_iov_count = ((MPIDI_CH3_Pkt_rdma_rts_iov_t*)mem_ptr)->iov_len; rreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)&rreq->dev.rdma_iov; rreq->dev.iov[0].MPID_IOV_LEN = rreq->dev.rdma_iov_count * sizeof(MPID_IOV); rreq->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)&rreq->ch.pkt; rreq->dev.iov[1].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t); rreq->dev.iov_count = 2; rreq->ch.req = NULL; recv_vc_ptr->ch.recv_active = rreq; } else if (((MPIDI_CH3_Pkt_t*)mem_ptr)->type == MPIDI_CH3_PKT_CTS_IOV) { /*printf("received cts packet.\n");fflush(stdout);*/ MPID_Request_get_ptr(((MPIDI_CH3_Pkt_rdma_cts_iov_t*)mem_ptr)->sreq, sreq); sreq->dev.rdma_request = ((MPIDI_CH3_Pkt_rdma_cts_iov_t*)mem_ptr)->rreq; sreq->dev.rdma_iov_count = ((MPIDI_CH3_Pkt_rdma_cts_iov_t*)mem_ptr)->iov_len; rreq = MPID_Request_create(); if (rreq == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } MPIU_Object_set_ref(rreq, 1); rreq->kind = MPIDI_CH3I_IOV_WRITE_REQUEST; rreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)&sreq->dev.rdma_iov; rreq->dev.iov[0].MPID_IOV_LEN = sreq->dev.rdma_iov_count * sizeof(MPID_IOV); rreq->dev.iov_count = 1; rreq->ch.req = sreq; recv_vc_ptr->ch.recv_active = rreq; /*MPIDI_CH3I_SHM_post_read(recv_vc_ptr, &sreq->ch.rdma_iov, sreq->ch.rdma_iov_count * sizeof(MPID_IOV), NULL);*/ } else if (((MPIDI_CH3_Pkt_t*)mem_ptr)->type == MPIDI_CH3_PKT_IOV) { if ( ((MPIDI_CH3_Pkt_rdma_iov_t*)mem_ptr)->send_recv == MPIDI_CH3_PKT_RELOAD_SEND ) { /*printf("received sender's iov packet, posting a read of %d iovs.\n", ((MPIDI_CH3_Pkt_rdma_iov_t*)mem_ptr)->iov_len);fflush(stdout);*/ MPID_Request_get_ptr(((MPIDI_CH3_Pkt_rdma_iov_t*)mem_ptr)->req, sreq); sreq->dev.rdma_iov_count = ((MPIDI_CH3_Pkt_rdma_iov_t*)mem_ptr)->iov_len; rreq = MPID_Request_create(); if (rreq == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } MPIU_Object_set_ref(rreq, 1); rreq->kind = MPIDI_CH3I_IOV_READ_REQUEST; rreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)&sreq->dev.rdma_iov; rreq->dev.iov[0].MPID_IOV_LEN = sreq->dev.rdma_iov_count * sizeof(MPID_IOV); rreq->dev.iov_count = 1; rreq->ch.req = sreq; recv_vc_ptr->ch.recv_active = rreq; } else if ( ((MPIDI_CH3_Pkt_rdma_iov_t*)mem_ptr)->send_recv == MPIDI_CH3_PKT_RELOAD_RECV ) { /*printf("received receiver's iov packet, posting a read of %d iovs.\n", ((MPIDI_CH3_Pkt_rdma_iov_t*)mem_ptr)->iov_len);fflush(stdout);*/ MPID_Request_get_ptr(((MPIDI_CH3_Pkt_rdma_iov_t*)mem_ptr)->req, rreq); rreq->dev.rdma_iov_count = ((MPIDI_CH3_Pkt_rdma_iov_t*)mem_ptr)->iov_len; sreq = MPID_Request_create(); if (sreq == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } MPIU_Object_set_ref(sreq, 1); sreq->kind = MPIDI_CH3I_IOV_WRITE_REQUEST; sreq->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)&rreq->dev.rdma_iov; sreq->dev.iov[0].MPID_IOV_LEN = rreq->dev.rdma_iov_count * sizeof(MPID_IOV); sreq->dev.iov_count = 1; sreq->ch.req = rreq; recv_vc_ptr->ch.recv_active = sreq; } else { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "received invalid MPIDI_CH3_PKT_IOV packet"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } } else if (((MPIDI_CH3_Pkt_t*)mem_ptr)->type == MPIDI_CH3_PKT_RELOAD) { if (((MPIDI_CH3_Pkt_rdma_reload_t*)mem_ptr)->send_recv == MPIDI_CH3_PKT_RELOAD_SEND) { /*printf("received reload send packet.\n");fflush(stdout);*/ MPID_Request_get_ptr(((MPIDI_CH3_Pkt_rdma_reload_t*)mem_ptr)->sreq, sreq); mpi_errno = MPIDI_CH3U_Handle_send_req(recv_vc_ptr, sreq, &complete); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "unable to update send request after receiving a reload packet"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } if (!complete) { /* send a new iov */ MPID_Request * rts_sreq; MPIDI_CH3_Pkt_t pkt; /*printf("sending reloaded send iov of length %d\n", sreq->dev.iov_count);fflush(stdout);*/ MPIDI_Pkt_init(&pkt.iov, MPIDI_CH3_PKT_IOV); pkt.iov.send_recv = MPIDI_CH3_PKT_RELOAD_SEND; pkt.iov.req = ((MPIDI_CH3_Pkt_rdma_reload_t*)mem_ptr)->rreq; pkt.iov.iov_len = sreq->dev.iov_count; sreq->dev.rdma_iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)&pkt; sreq->dev.rdma_iov[0].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t); sreq->dev.rdma_iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)sreq->dev.iov; sreq->dev.rdma_iov[1].MPID_IOV_LEN = sreq->dev.iov_count * sizeof(MPID_IOV); mpi_errno = MPIDI_CH3_iStartMsgv(recv_vc_ptr, sreq->dev.rdma_iov, 2, &rts_sreq); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPIU_Object_set_ref(sreq, 0); MPIDI_CH3_Request_destroy(sreq); sreq = NULL; mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|rtspkt", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } /* --END ERROR HANDLING-- */ if (rts_sreq != NULL) { /* The sender doesn't need to know when the message has been sent. So release the request immediately */ MPID_Request_release(rts_sreq); } } } else if (((MPIDI_CH3_Pkt_rdma_reload_t*)mem_ptr)->send_recv == MPIDI_CH3_PKT_RELOAD_RECV) { /*printf("received reload recv packet.\n");fflush(stdout);*/ MPID_Request_get_ptr(((MPIDI_CH3_Pkt_rdma_reload_t*)mem_ptr)->rreq, rreq); mpi_errno = MPIDI_CH3U_Handle_recv_req(recv_vc_ptr, rreq, &complete); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "unable to update request after receiving a reload packet"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } if (!complete) { /* send a new iov */ MPID_Request * cts_sreq; MPIDI_CH3_Pkt_t pkt; /*printf("sending reloaded recv iov of length %d\n", rreq->dev.iov_count);fflush(stdout);*/ MPIDI_Pkt_init(&pkt.iov, MPIDI_CH3_PKT_IOV); pkt.iov.send_recv = MPIDI_CH3_PKT_RELOAD_RECV; pkt.iov.req = ((MPIDI_CH3_Pkt_rdma_reload_t*)mem_ptr)->sreq; pkt.iov.iov_len = rreq->dev.iov_count; rreq->dev.rdma_iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)&pkt; rreq->dev.rdma_iov[0].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t); rreq->dev.rdma_iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)rreq->dev.iov; rreq->dev.rdma_iov[1].MPID_IOV_LEN = rreq->dev.iov_count * sizeof(MPID_IOV); mpi_errno = MPIDI_CH3_iStartMsgv(recv_vc_ptr, rreq->dev.rdma_iov, 2, &cts_sreq); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { /* This destruction probably isn't correct. */ /* I think it needs to save the error in the request, complete the request and return */ MPIU_Object_set_ref(rreq, 0); MPIDI_CH3_Request_destroy(rreq); rreq = NULL; mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|ctspkt", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } /* --END ERROR HANDLING-- */ if (cts_sreq != NULL) { /* The sender doesn't need to know when the message has been sent. So release the request immediately */ MPID_Request_release(cts_sreq);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -