📄 shm.c
字号:
} } } else { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } recv_vc_ptr->ch.recv_active = NULL; recv_vc_ptr->ch.shm_reading_pkt = TRUE; if (num_bytes > sizeof(MPIDI_CH3_Pkt_t)) { pkt_ptr->offset += sizeof(MPIDI_CH3_Pkt_t); num_bytes -= sizeof(MPIDI_CH3_Pkt_t); pkt_ptr->num_bytes = num_bytes; mem_ptr = (char*)mem_ptr + sizeof(MPIDI_CH3_Pkt_t); } else { pkt_ptr->offset = 0; MPID_READ_WRITE_BARRIER(); /* the writing of the flag cannot occur before the reading of the last piece of data */ pkt_ptr->avail = MPIDI_CH3I_PKT_AVAILABLE; vc->ch.shm[i].head_index = (index + 1) % MPIDI_CH3I_NUM_PACKETS; } /* return from the wait */ *num_bytes_ptr = 0; *vc_pptr = recv_vc_ptr; *shm_out = SHM_WAIT_WAKEUP; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return MPI_SUCCESS; } else { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "shared memory read progress unable to handle unknown rdma packet"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } } else#endif /* MPIDI_CH3_CHANNEL_RNDV */ { mpi_errno = MPIDI_CH3U_Handle_recv_pkt(recv_vc_ptr, (MPIDI_CH3_Pkt_t*)mem_ptr, &recv_vc_ptr->ch.recv_active); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "shared memory read progress unable to handle incoming packet"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } } if (recv_vc_ptr->ch.recv_active == NULL) { recv_vc_ptr->ch.shm_reading_pkt = TRUE; } else { mpi_errno = MPIDI_CH3I_SHM_post_readv(recv_vc_ptr, recv_vc_ptr->ch.recv_active->dev.iov, recv_vc_ptr->ch.recv_active->dev.iov_count, NULL); } if (num_bytes > sizeof(MPIDI_CH3_Pkt_t)) { pkt_ptr->offset += sizeof(MPIDI_CH3_Pkt_t); num_bytes -= sizeof(MPIDI_CH3_Pkt_t); pkt_ptr->num_bytes = num_bytes; mem_ptr = (char*)mem_ptr + sizeof(MPIDI_CH3_Pkt_t); } else { pkt_ptr->offset = 0; MPID_READ_WRITE_BARRIER(); /* the writing of the flag cannot occur before the reading of the last piece of data */ pkt_ptr->avail = MPIDI_CH3I_PKT_AVAILABLE; vc->ch.shm[i].head_index = (index + 1) % MPIDI_CH3I_NUM_PACKETS; continue; } if (recv_vc_ptr->ch.recv_active == NULL) continue; } MPIDI_DBG_PRINTF((60, FCNAME, "read %d bytes\n", num_bytes)); /*MPIDI_DBG_PRINTF((60, FCNAME, "shm_wait(recv finished %d bytes)", num_bytes));*/ if (!(recv_vc_ptr->ch.shm_state & SHM_READING_BIT)) {#ifdef USE_SHM_UNEX /* Should we buffer unexpected messages or leave them in the shmem queue? */ /*shmi_buffer_unex_read(recv_vc_ptr, pkt_ptr, mem_ptr, 0, num_bytes);*/#endif continue; } MPIDI_DBG_PRINTF((60, FCNAME, "read update, total = %d + %d = %d\n", recv_vc_ptr->ch.read.total, num_bytes, recv_vc_ptr->ch.read.total + num_bytes)); if (recv_vc_ptr->ch.read.use_iov) { iter_ptr = mem_ptr; while (num_bytes && recv_vc_ptr->ch.read.iovlen > 0) { if ((int)recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_LEN <= num_bytes) { /* copy the received data */ MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_BUF, iter_ptr, recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_LEN); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIU_DBG_PRINTF(("a:shm_read_progress: %d bytes read from packet %d offset %d\n", recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_LEN, index, pkt_ptr->offset + (int)((char*)iter_ptr - (char*)mem_ptr))); iter_ptr += recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_LEN; /* update the iov */ num_bytes -= recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_LEN; recv_vc_ptr->ch.read.index++; recv_vc_ptr->ch.read.iovlen--; } else { /* copy the received data */ MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_BUF, iter_ptr, num_bytes); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIU_DBG_PRINTF(("b:shm_read_progress: %d bytes read from packet %d offset %d\n", num_bytes, index, pkt_ptr->offset + (int)((char*)iter_ptr - (char*)mem_ptr))); iter_ptr += num_bytes; /* update the iov */ recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_LEN -= num_bytes; recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)( (char*)(recv_vc_ptr->ch.read.iov[recv_vc_ptr->ch.read.index].MPID_IOV_BUF) + num_bytes); num_bytes = 0; } } offset = (unsigned char*)iter_ptr - (unsigned char*)mem_ptr; recv_vc_ptr->ch.read.total += offset; if (num_bytes == 0) { /* put the shm buffer back in the queue */ vc->ch.shm[i].packet[index].offset = 0; MPID_READ_WRITE_BARRIER(); /* the writing of the flag cannot occur before the reading of the last piece of data */ vc->ch.shm[i].packet[index].avail = MPIDI_CH3I_PKT_AVAILABLE; vc->ch.shm[i].head_index = (index + 1) % MPIDI_CH3I_NUM_PACKETS; } else { /* save the unused but received data */ /*shmi_buffer_unex_read(recv_vc_ptr, pkt_ptr, mem_ptr, offset, num_bytes);*/ /* OR */ /* update the head of the shmem queue */ pkt_ptr->offset += (pkt_ptr->num_bytes - num_bytes); pkt_ptr->num_bytes = num_bytes; } if (recv_vc_ptr->ch.read.iovlen == 0) { if (recv_vc_ptr->ch.recv_active->kind < MPID_LAST_REQUEST_KIND) { recv_vc_ptr->ch.shm_state &= ~SHM_READING_BIT; *num_bytes_ptr = recv_vc_ptr->ch.read.total; *vc_pptr = recv_vc_ptr; *shm_out = SHM_WAIT_READ; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return MPI_SUCCESS; }#ifdef MPIDI_CH3_CHANNEL_RNDV else if (recv_vc_ptr->ch.recv_active->kind == MPIDI_CH3I_RTS_IOV_READ_REQUEST) { int found; /*printf("received rts iov_read.\n");fflush(stdout);*/ mpi_errno = MPIDI_CH3U_Handle_recv_rndv_pkt(recv_vc_ptr, &recv_vc_ptr->ch.recv_active->ch.pkt, &rreq, &found); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "shared memory read progress unable to handle incoming rts(get) packet"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } /* --END ERROR HANDLING-- */ for (i=0; i<recv_vc_ptr->ch.recv_active->dev.rdma_iov_count; i++) { rreq->dev.rdma_iov[i].MPID_IOV_BUF = recv_vc_ptr->ch.recv_active->dev.rdma_iov[i].MPID_IOV_BUF; rreq->dev.rdma_iov[i].MPID_IOV_LEN = recv_vc_ptr->ch.recv_active->dev.rdma_iov[i].MPID_IOV_LEN; } rreq->dev.rdma_iov_count = recv_vc_ptr->ch.recv_active->dev.rdma_iov_count; if (found) { mpi_errno = MPIDI_CH3U_Post_data_receive(TRUE, &rreq); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code (mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_RNDV_REQ_TO_SEND"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } /* --END ERROR HANDLING-- */ mpi_errno = MPIDI_CH3_iStartRndvTransfer(recv_vc_ptr, rreq); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code (mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|ctspkt", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } /* --END ERROR HANDLING-- */ } rreq = recv_vc_ptr->ch.recv_active; /* free the request used to receive the rts packet and iov data */ MPIU_Object_set_ref(rreq, 0); MPIDI_CH3_Request_destroy(rreq); recv_vc_ptr->ch.recv_active = NULL; recv_vc_ptr->ch.shm_reading_pkt = TRUE; } else if (recv_vc_ptr->ch.recv_active->kind == MPIDI_CH3I_IOV_READ_REQUEST) { /*printf("received iov_read.\n");fflush(stdout);*/ rreq = recv_vc_ptr->ch.recv_active; mpi_errno = MPIDI_CH3_iStartRndvTransfer(recv_vc_ptr, rreq->ch.req); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", "shared memory read progress unable to handle incoming rts(get) iov"); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } /* --END ERROR HANDLING-- */ recv_vc_ptr->ch.recv_active = NULL; recv_vc_ptr->ch.shm_reading_pkt = TRUE; /* free the request used to receive the iov data */ MPIU_Object_set_ref(rreq, 0); MPIDI_CH3_Request_destroy(rreq); } else if (recv_vc_ptr->ch.recv_active->kind == MPIDI_CH3I_IOV_WRITE_REQUEST) { /*printf("received iov_write.\n");fflush(stdout);*/ mpi_errno = MPIDI_CH3I_SHM_rdma_writev(recv_vc_ptr, recv_vc_ptr->ch.recv_active->ch.req); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } /* --END ERROR HANDLING-- */ /* return from the wait */ MPID_Request_release(recv_vc_ptr->ch.recv_active); recv_vc_ptr->ch.recv_active = NULL; recv_vc_ptr->ch.shm_reading_pkt = TRUE; *num_bytes_ptr = 0; *vc_pptr = recv_vc_ptr; *shm_out = SHM_WAIT_WAKEUP; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return MPI_SUCCESS; }#endif /* MPIDI_CH3_CHANNEL_RNDV */ else { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s %d", "invalid request type", recv_vc_ptr->ch.recv_active->kind); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return mpi_errno; } } } else { if ((unsigned int)num_bytes > recv_vc_ptr->ch.read.bufflen) { /* copy the received data */ MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(recv_vc_ptr->ch.read.buffer, mem_ptr, recv_vc_ptr->ch.read.bufflen); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIU_DBG_PRINTF(("c:shm_read_progress: %d bytes read from packet %d offset %d\n", recv_vc_ptr->ch.read.bufflen, index, pkt_ptr->offset)); recv_vc_ptr->ch.read.total = recv_vc_ptr->ch.read.bufflen; /*shmi_buffer_unex_read(recv_vc_ptr, pkt_ptr, mem_ptr, recv_vc_ptr->ch.read.bufflen, num_bytes - recv_vc_ptr->ch.read.bufflen);*/ pkt_ptr->offset += recv_vc_ptr->ch.read.bufflen; pkt_ptr->num_bytes = num_bytes - recv_vc_ptr->ch.read.bufflen; recv_vc_ptr->ch.read.bufflen = 0; } else { /* copy the received data */ MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(recv_vc_ptr->ch.read.buffer, mem_ptr, num_bytes); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIU_DBG_PRINTF(("d:shm_read_progress: %d bytes read from packet %d offset %d\n", num_bytes, index, pkt_ptr->offset)); recv_vc_ptr->ch.read.total += num_bytes; /* advance the user pointer */ recv_vc_ptr->ch.read.buffer = (char*)(recv_vc_ptr->ch.read.buffer) + num_bytes; recv_vc_ptr->ch.read.bufflen -= num_bytes; /* put the shm buffer back in the queue */ vc->ch.shm[i].packet[index].offset = 0; MPID_READ_WRITE_BARRIER(); /* the writing of the flag cannot occur before the reading of the last piece of data */ vc->ch.shm[i].packet[index].avail = MPIDI_CH3I_PKT_AVAILABLE; vc->ch.shm[i].head_index = (index + 1) % MPIDI_CH3I_NUM_PACKETS; } if (recv_vc_ptr->ch.read.bufflen == 0) { MPIU_Assert(recv_vc_ptr->ch.recv_active->kind < MPID_LAST_REQUEST_KIND); recv_vc_ptr->ch.shm_state &= ~SHM_READING_BIT; *num_bytes_ptr = recv_vc_ptr->ch.read.total; *vc_pptr = recv_vc_ptr; *shm_out = SHM_WAIT_READ; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return MPI_SUCCESS; } } } if (millisecond_timeout == 0 && !working) { *shm_out = SHM_WAIT_TIMEOUT; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return MPI_SUCCESS; } } MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WAIT); return MPI_SUCCESS;}/* non-blocking functions */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_post_read#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_post_read(MPIDI_VC_t *vc, void *buf, int len, int (*rfn)(int, void*)){ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_POST_READ); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_POST_READ); MPIDI_DBG_PRINTF((60, FCNAME, "posting a read of %d bytes", len)); vc->ch.read.total = 0; vc->ch.read.buffer = buf; vc->ch.read.bufflen = len; vc->ch.read.use_iov = FALSE; vc->ch.shm_state |= SHM_READING_BIT; vc->ch.shm_reading_pkt = FALSE;#ifdef USE_SHM_UNEX if (vc->ch.unex_list) shmi_read_unex(vc);#endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_POST_READ); return SHM_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_post_readv#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_post_readv(MPIDI_VC_t *vc, MPID_IOV *iov, int n, int (*rfn)(int, void*)){ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_POST_READV);#ifdef USE_SHM_IOV_COPY MPIDI_STATE_DECL(MPID_STATE_MEMCPY);#endif MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_POST_READV); /* FIXME */ /* Remove this stripping code after the updated segment code no longer produces iov's with empty buffers */ /* strip any trailing empty buffers */ /* while (n && iov[n-1].MPID_IOV_LEN == 0) n--; */ MPIU_Assert(iov[n-1].MPID_IOV_LEN > 0); vc->ch.read.total = 0;#ifdef USE_SHM_IOV_COPY /* This isn't necessary if we require the iov to be valid for the duration of the operation */ MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(vc->ch.read.iov, iov, sizeof(MPID_IOV) * n); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY);#else vc->ch.read.iov = iov;#endif vc->ch.read.iovlen = n; vc->ch.read.index = 0; vc->ch.read.use_iov = TRUE; vc->ch.shm_state |= SHM_READING_BIT; vc->ch.shm_reading_pkt = FALSE;#ifdef USE_SHM_UNEX if (vc->ch.unex_list) shmi_readv_unex(vc);#endif#ifdef MPICH_DBG_OUTPUT { int i, total=0; for (i=0; i<n; i++) { total += iov[i].MPID_IOV_LEN; } MPIDI_DBG_PRINTF((60, FCNAME, "posting a read of %d bytes.\n", total)); }#endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_POST_READV); return SHM_SUCCESS;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -