📄 shm.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include <stdio.h>#ifndef HAVE_WINDOWS_H#include <sys/time.h>#include <sys/types.h>#include <sys/stat.h>#include <fcntl.h>#include <unistd.h>#ifdef HAVE_SYS_PTRACE_H#include <sys/ptrace.h>#endif#include <sys/wait.h>#include <errno.h>#ifdef USE__LLSEEK#include <linux/unistd.h>static inline _syscall5(int, _llseek, uint, fd, ulong, hi, ulong, lo, loff_t *, res, uint, wh);#define OFF_T loff_t#define OFF_T_CAST(a) ((loff_t)(unsigned)(a))#else#define OFF_T off_t#define OFF_T_CAST(a) ((off_t)(a))#endif#endif#undef USE_SHM_WRITE_FOR_SHM_WRITEV/*#define USE_SHM_WRITE_FOR_SHM_WRITEV*//*#undef USE_IOV_LEN_2_SHORTCUT*/#define USE_IOV_LEN_2_SHORTCUTtypedef int SHM_STATE;#define SHM_READING_BIT 0x0008#define SHM_WRITING_BIT 0x0010#ifndef min#define min(a, b) ((a) < (b) ? (a) : (b))#endif/* shm functions */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_write#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_write(MPIDI_VC_t * vc, void *buf, int len, int *num_bytes_ptr){ int total = 0; int length; int index; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITE); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITE); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); index = vc->ch.write_shmq->tail_index; if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_USED) { *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE); return MPI_SUCCESS; } while (len) { length = min(len, MPIDI_CH3I_PACKET_SIZE); vc->ch.write_shmq->packet[index].num_bytes = length; MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(vc->ch.write_shmq->packet[index].data, buf, length); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIU_DBG_PRINTF(("shm_write: %d bytes in packet %d\n", vc->ch.write_shmq->packet[index].num_bytes, index)); MPID_WRITE_BARRIER(); vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_USED; buf = (char *) buf + length; total += length; len -= length; index = (index + 1) % MPIDI_CH3I_NUM_PACKETS; if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_USED) { vc->ch.write_shmq->tail_index = index; *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE); return MPI_SUCCESS; } } vc->ch.write_shmq->tail_index = index; *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITE); return MPI_SUCCESS;}#ifdef USE_SHM_WRITE_FOR_SHM_WRITEV#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_writev(MPIDI_VC_t *vc, MPID_IOV *iov, int n, int *num_bytes_ptr){ int mpi_errno = MPI_SUCCESS; int i; unsigned int total = 0; unsigned int num_bytes; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); for (i=0; i<n; i++) { mpi_errno = MPIDI_CH3I_SHM_write(vc, iov[i].MPID_IOV_BUF, iov[i].MPID_IOV_LEN, &num_bytes); if (mpi_errno != MPI_SUCCESS) { MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return mpi_errno; } total += num_bytes; if (num_bytes < iov[i].MPID_IOV_LEN) { *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; } } *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS;}#else#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_writev#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_SHM_writev(MPIDI_VC_t *vc, MPID_IOV *iov, int n, int *num_bytes_ptr){ int i; unsigned int total = 0; unsigned int num_bytes; unsigned int cur_avail, dest_avail; unsigned char *cur_pos, *dest_pos; int index; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); index = vc->ch.write_shmq->tail_index; if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_USED) { *num_bytes_ptr = 0; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; }#ifdef USE_IOV_LEN_2_SHORTCUT if (n == 2 && (iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN) < MPIDI_CH3I_PACKET_SIZE) { MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(vc->ch.write_shmq->packet[index].data, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(&vc->ch.write_shmq->packet[index].data[iov[0].MPID_IOV_LEN], iov[1].MPID_IOV_BUF, iov[1].MPID_IOV_LEN); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); vc->ch.write_shmq->packet[index].num_bytes = iov[0].MPID_IOV_LEN + iov[1].MPID_IOV_LEN; total = vc->ch.write_shmq->packet[index].num_bytes; MPID_WRITE_BARRIER(); vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_USED; vc->ch.write_shmq->tail_index = (vc->ch.write_shmq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; MPIU_DBG_PRINTF(("shm_writev - %d bytes in packet %d\n", vc->ch.write_shmq->packet[index].num_bytes, index)); *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; }#endif dest_pos = (unsigned char *)(vc->ch.write_shmq->packet[index].data); dest_avail = MPIDI_CH3I_PACKET_SIZE; vc->ch.write_shmq->packet[index].num_bytes = 0; for (i=0; i<n; i++) { if (iov[i].MPID_IOV_LEN <= dest_avail) { total += iov[i].MPID_IOV_LEN; vc->ch.write_shmq->packet[index].num_bytes += iov[i].MPID_IOV_LEN; dest_avail -= iov[i].MPID_IOV_LEN; MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(dest_pos, iov[i].MPID_IOV_BUF, iov[i].MPID_IOV_LEN); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIU_DBG_PRINTF(("shm_writev: +%d=%d bytes in packet %d\n", iov[i].MPID_IOV_LEN, vc->ch.write_shmq->packet[index].num_bytes, index)); dest_pos += iov[i].MPID_IOV_LEN; } else { total += dest_avail; vc->ch.write_shmq->packet[index].num_bytes = MPIDI_CH3I_PACKET_SIZE; MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(dest_pos, iov[i].MPID_IOV_BUF, dest_avail); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIU_DBG_PRINTF(("shm_writev: +%d=%d bytes in packet %d\n", dest_avail, vc->ch.write_shmq->packet[index].num_bytes, index)); MPID_WRITE_BARRIER(); vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_USED; cur_pos = (unsigned char *)(iov[i].MPID_IOV_BUF) + dest_avail; cur_avail = iov[i].MPID_IOV_LEN - dest_avail; while (cur_avail) { index = vc->ch.write_shmq->tail_index = (vc->ch.write_shmq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_USED) { *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; } num_bytes = min(cur_avail, MPIDI_CH3I_PACKET_SIZE); vc->ch.write_shmq->packet[index].num_bytes = num_bytes; MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(vc->ch.write_shmq->packet[index].data, cur_pos, num_bytes); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIU_DBG_PRINTF(("shm_writev: +%d=%d bytes in packet %d\n", num_bytes, vc->ch.write_shmq->packet[index].num_bytes, index)); total += num_bytes; cur_pos += num_bytes; cur_avail -= num_bytes; if (cur_avail) { MPID_WRITE_BARRIER(); vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_USED; } } dest_pos = (unsigned char *)(vc->ch.write_shmq->packet[index].data + num_bytes); dest_avail = MPIDI_CH3I_PACKET_SIZE - num_bytes; } if (dest_avail == 0) { MPID_WRITE_BARRIER(); vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_USED; index = vc->ch.write_shmq->tail_index = (vc->ch.write_shmq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; if (vc->ch.write_shmq->packet[index].avail == MPIDI_CH3I_PKT_USED) { *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS; } dest_pos = (unsigned char *)(vc->ch.write_shmq->packet[index].data); dest_avail = MPIDI_CH3I_PACKET_SIZE; vc->ch.write_shmq->packet[index].num_bytes = 0; } } if (dest_avail < MPIDI_CH3I_PACKET_SIZE) { MPID_WRITE_BARRIER(); vc->ch.write_shmq->packet[index].avail = MPIDI_CH3I_PKT_USED; vc->ch.write_shmq->tail_index = (vc->ch.write_shmq->tail_index + 1) % MPIDI_CH3I_NUM_PACKETS; } *num_bytes_ptr = total; MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WRITEV); return MPI_SUCCESS;}#endif /* USE_SHM_WRITE_FOR_SHM_WRITEV */#ifdef USE_SHM_UNEX#undef FUNCNAME#define FUNCNAME shmi_buffer_unex_read#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int shmi_buffer_unex_read(MPIDI_VC_t *vc_ptr, MPIDI_CH3I_SHM_Packet_t *pkt_ptr, void *mem_ptr, unsigned int offset, unsigned int num_bytes){ MPIDI_CH3I_SHM_Unex_read_t *p; MPIDI_STATE_DECL(MPID_STATE_SHMI_BUFFER_UNEX_READ); MPIDI_FUNC_ENTER(MPID_STATE_SHMI_BUFFER_UNEX_READ); MPIDI_DBG_PRINTF((60, FCNAME, "%d bytes\n", num_bytes)); p = (MPIDI_CH3I_SHM_Unex_read_t *)MPIU_Malloc(sizeof(MPIDI_CH3I_SHM_Unex_read_t)); p->pkt_ptr = pkt_ptr; p->buf = (unsigned char *)mem_ptr + offset; p->length = num_bytes; p->next = vc_ptr->ch.unex_list; vc_ptr->ch.unex_list = p; MPIDI_FUNC_EXIT(MPID_STATE_SHMI_BUFFER_UNEX_READ); return 0;}#undef FUNCNAME#define FUNCNAME shmi_read_unex#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int shmi_read_unex(MPIDI_VC_t *vc_ptr){ unsigned int len; MPIDI_CH3I_SHM_Unex_read_t *temp; MPIDI_STATE_DECL(MPID_STATE_SHMI_READ_UNEX); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_SHMI_READ_UNEX); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); MPIU_Assert(vc_ptr->ch.unex_list); /* copy the received data */ while (vc_ptr->ch.unex_list) { len = min(vc_ptr->ch.unex_list->length, vc_ptr->ch.read.bufflen); MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(vc_ptr->ch.read.buffer, vc_ptr->ch.unex_list->buf, len); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); /* advance the user pointer */ vc_ptr->ch.read.buffer = (char*)(vc_ptr->ch.read.buffer) + len; vc_ptr->ch.read.bufflen -= len; vc_ptr->ch.read.total += len; if (len != vc_ptr->ch.unex_list->length) { vc_ptr->ch.unex_list->length -= len; vc_ptr->ch.unex_list->buf += len; } else { /* put the receive packet back in the pool */ MPIU_Assert(vc_ptr->ch.unex_list->pkt_ptr != NULL); vc_ptr->ch.unex_list->pkt_ptr->cur_pos = vc_ptr->ch.unex_list->pkt_ptr->data; vc_ptr->ch.unex_list->pkt_ptr->avail = MPIDI_CH3I_PKT_AVAILABLE; /* MPIU_Free the unexpected data node */ temp = vc_ptr->ch.unex_list; vc_ptr->ch.unex_list = vc_ptr->ch.unex_list->next; MPIU_Free(temp); } /* check to see if the entire message was received */ if (vc_ptr->ch.read.bufflen == 0) { /* place this vc_ptr in the finished list so it will be completed by shm_wait */ vc_ptr->ch.shm_state &= ~SHM_READING_BIT; vc_ptr->ch.unex_finished_next = MPIDI_CH3I_Process.unex_finished_list; MPIDI_CH3I_Process.unex_finished_list = vc_ptr; MPIDI_FUNC_EXIT(MPID_STATE_SHMI_READ_UNEX); return SHM_SUCCESS; } } MPIDI_FUNC_EXIT(MPID_STATE_SHMI_READ_UNEX); return SHM_SUCCESS;}#undef FUNCNAME#define FUNCNAME shmi_readv_unex#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int shmi_readv_unex(MPIDI_VC_t *vc_ptr){ unsigned int num_bytes; MPIDI_CH3I_SHM_Unex_read_t *temp; MPIDI_STATE_DECL(MPID_STATE_SHMI_READV_UNEX); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_SHMI_READV_UNEX); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); while (vc_ptr->ch.unex_list) { while (vc_ptr->ch.unex_list->length && vc_ptr->ch.read.iovlen) { num_bytes = min(vc_ptr->ch.unex_list->length, vc_ptr->ch.read.iov[vc_ptr->ch.read.index].MPID_IOV_LEN); MPIDI_DBG_PRINTF((60, FCNAME, "copying %d bytes\n", num_bytes)); /* copy the received data */ MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(vc_ptr->ch.read.iov[vc_ptr->ch.read.index].MPID_IOV_BUF, vc_ptr->ch.unex_list->buf, num_bytes); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); vc_ptr->ch.read.total += num_bytes; vc_ptr->ch.unex_list->buf += num_bytes; vc_ptr->ch.unex_list->length -= num_bytes; /* update the iov */ vc_ptr->ch.read.iov[vc_ptr->ch.read.index].MPID_IOV_LEN -= num_bytes; vc_ptr->ch.read.iov[vc_ptr->ch.read.index].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)( (char*)(vc_ptr->ch.read.iov[vc_ptr->ch.read.index].MPID_IOV_BUF) + num_bytes); if (vc_ptr->ch.read.iov[vc_ptr->ch.read.index].MPID_IOV_LEN == 0) { vc_ptr->ch.read.index++; vc_ptr->ch.read.iovlen--; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -