📄 ch3i_shm_bootstrapq.c
字号:
#endif MPIDU_Process_unlock(&q_ptr->lock); MPIDU_Yield(); } while (blocking); *num_sent = 0; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_MQSHM_SEND); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_mqshm_receive#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_mqshm_receive(const int id, const int tag, void *buffer, const int maxlen, int *length, const int blocking){ int mpi_errno = MPI_SUCCESS; mqshm_t *q_ptr; int index, last_index = MQSHM_END; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_MQSHM_RECEIVE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_MQSHM_RECEIVE); /*printf("[%d] recv: looking up id %d\n", MPIR_Process.comm_world->rank, id);fflush(stdout);*/ q_ptr = id_to_queue(id); /*printf("[%d] recv: id %d -> %p\n", MPIR_Process.comm_world->rank, id, q_ptr);fflush(stdout);*/ if (q_ptr == NULL) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**arg", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_MQSHM_RECEIVE); return mpi_errno; } if (!blocking && q_ptr->first == MQSHM_END) { *length = 0; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_MQSHM_RECEIVE); return MPI_SUCCESS; } do { MPIDU_Process_lock(&q_ptr->lock);#ifdef DBG_TEST_LOCKING if (q_ptr->inuse) { MPIU_Error_printf("Error, multiple processes acquired the lock.\n"); fflush(stdout); } q_ptr->inuse = 1;#endif index = q_ptr->first; while (index != MQSHM_END) { /*printf("[%d] recv: checking if msg[%d].tag %d == %d\n", MPIR_Process.comm_world->rank, index, q_ptr->msg[index].tag, tag);fflush(stdout);*/ if (q_ptr->msg[index].tag == tag) { /* validate the message */ if (maxlen < q_ptr->msg[index].length) {#ifdef DBG_TEST_LOCKING q_ptr->inuse = 0;#endif MPIDU_Process_unlock(&q_ptr->lock); mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**arg", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_MQSHM_RECEIVE); return mpi_errno; } /* remove the node from the queue */ if (q_ptr->first == index) { MPIU_DBG_MSG_FMT(CH3_CONNECT,VERBOSE,(MPIU_DBG_FDEST, "recv(%d): removing index %d from the head", tag, index)); q_ptr->first = q_ptr->msg[index].next; if (q_ptr->first == MQSHM_END) { /* If the queue becomes empty, reset the last index. */ q_ptr->last = MQSHM_END; } } else { MPIU_DBG_MSG_FMT(CH3_CONNECT,VERBOSE,(MPIU_DBG_FDEST, "recv(%d): removing index %d", tag, index)); q_ptr->msg[last_index].next = q_ptr->msg[index].next; if (index == q_ptr->last) { q_ptr->last = last_index; } } /* copy the message */ memcpy(buffer, q_ptr->msg[index].data, q_ptr->msg[index].length); *length = q_ptr->msg[index].length; /* add the node to the free list */ q_ptr->msg[index].next = q_ptr->next_free; q_ptr->next_free = index; q_ptr->cur_num_messages--;#ifdef DBG_TEST_LOCKING q_ptr->inuse = 0;#endif MPIU_DBG_MSG_FMT(CH3_CONNECT,VERBOSE, (MPIU_DBG_FDEST,"msg_q: first = %d, last = %d, next_free = %d, num=%d", (q_ptr)->first, (q_ptr)->last, (q_ptr)->next_free, (q_ptr)->cur_num_messages)); MPIDU_Process_unlock(&q_ptr->lock); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_MQSHM_RECEIVE); return mpi_errno; } last_index = index; index = q_ptr->msg[index].next; }#ifdef DBG_TEST_LOCKING q_ptr->inuse = 0;#endif MPIDU_Process_unlock(&q_ptr->lock); /*printf("<%d>", MPIR_Process.comm_world->rank);*/ MPIDU_Yield(); } while (blocking); *length = 0; /* zero length signals no message received? */ MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_MQSHM_RECEIVE); return MPI_SUCCESS;}/* FIXME: What is this routine for? Why does this routine duplicate so much of the code in the MPIDI_CH3I_SHM_Get_mem function (but not exactly; e.g., is there a reason that this routine and the Get_mem version use slightly different arguments to the shm_open routine and take different action on failure? *//* Is this routine used only in this file? Should it be static */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_SHM_Get_mem_named#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)/*@ MPIDI_CH3I_SHM_Get_mem_named - allocate and get the address and size of a shared memory block Parameters:+ int size - size- MPIDI_CH3I_Shmem_block_request_result* pOutput - output@*/int MPIDI_CH3I_SHM_Get_mem_named(int size, MPIDI_CH3I_Shmem_block_request_result *pOutput){ int mpi_errno = MPI_SUCCESS;#if defined (USE_POSIX_SHM)#elif defined (USE_SYSV_SHM) int i; FILE *fout; int shmflag;#endif MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_GET_MEM_NAMED); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_GET_MEM_NAMED); if (size == 0 || size > MPIDU_MAX_SHM_BLOCK_SIZE ) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**arg", 0); MPIU_ERR_POP(mpi_errno); } /* Create the shared memory object */#ifdef USE_POSIX_SHM /*printf("[%d] creating a named shm object: '%s'\n", MPIR_Process.comm_world->rank, pOutput->name);*/ /* Mac OSX has a ridiculously short limit on the name (30 characters, based on experiments, as the value of SHM_NAME_MAX is not easily available. (it was nowhere defined on my Mac) */ pOutput->id = shm_open(pOutput->name, O_RDWR | O_CREAT, 0600);#ifdef ENAMETOOLONG /* Try again if there is a name too long error */ if (pOutput->id == -1 && errno == ENAMETOOLONG && strlen(pOutput->name) > 30) { pOutput->name[30] = 0; pOutput->id = shm_open(pOutput->name, O_RDWR | O_CREAT, 0600); }#endif if (pOutput->id == -1) { pOutput->error = errno; perror("shm_open msg" ); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**shm_open", "**shm_open %s %d", pOutput->name, pOutput->error); MPIU_ERR_POP(mpi_errno); } /* Broken Mac OSX implementation only allows ftruncate on the creation of the shared memory */ if (ftruncate(pOutput->id, size) == -1) { pOutput->error = errno; perror( "ftrunctate" ); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ftruncate", "**ftruncate %s %d %d", pOutput->name, size, pOutput->error); MPIU_ERR_POP(mpi_errno); }#elif defined (USE_SYSV_SHM) /* Insert code here to convert the name into a key */ fout = fopen(pOutput->name, "a+"); pOutput->key = ftok(pOutput->name, 12345); fclose(fout); if (pOutput->key == -1) { pOutput->error = errno; mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ftok", "**ftok %s %d %d", pOutput->name, 12345, pOutput->error); MPIU_ERR_POP(mpi_errno); } shmflag = IPC_CREAT;#ifdef HAVE_SHM_RW shmflag |= SHM_R | SHM_W;#endif pOutput->id = shmget(pOutput->key, size, shmflag ); if (pOutput->id == -1) { pOutput->error = errno; mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**shmget", "**shmget %d", pOutput->error); MPIU_ERR_POP(mpi_errno); }#elif defined (USE_WINDOWS_SHM) pOutput->id = CreateFileMapping( INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, size, pOutput->name); if (pOutput->id == NULL) { pOutput->error = GetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**CreateFileMapping", "**CreateFileMapping %d", pOutput->error); /*"Error in CreateFileMapping, %d", pOutput->error);*/ MPIU_ERR_POP(mpi_errno); }#else#error No shared memory subsystem defined#endif /*printf("[%d] mmapping the shared memory object\n", MPIR_Process.comm_world->rank);fflush(stdout);*/ pOutput->addr = NULL;#ifdef USE_POSIX_SHM pOutput->addr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED /* | MAP_NORESERVE*/, pOutput->id, 0); if (pOutput->addr == MAP_FAILED) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**mmap", "**mmap %d", errno); pOutput->addr = NULL; MPIU_ERR_POP(mpi_errno); } #elif defined (USE_SYSV_SHM) pOutput->addr = shmat(pOutput->id, NULL, SHM_RND); if (pOutput->addr == (void*)-1) { pOutput->error = errno; pOutput->addr = NULL; mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**shmat", "**shmat %d", pOutput->error); /*"Error from shmat %d", pOutput->error);*/ MPIU_ERR_POP(mpi_errno); }#elif defined(USE_WINDOWS_SHM) pOutput->addr = MapViewOfFileEx( pOutput->id, FILE_MAP_WRITE, 0, 0, size, NULL ); if (pOutput->addr == NULL) { pOutput->error = GetLastError(); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**MapViewOfFileEx", "**MapViewOfFileEx %d", pOutput->error); MPIU_ERR_POP(mpi_errno); }#else#error No shared memory subsystem defined#endif pOutput->size = size; pOutput->error = MPI_SUCCESS; fn_fail: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_GET_MEM_NAMED); return mpi_errno;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -