📄 ch3i_bootstrapq.c
字号:
return MPI_SUCCESS; } queue = (MPIDI_CH3I_BootstrapQ_struct*) MPIU_Malloc(sizeof(MPIDI_CH3I_BootstrapQ_struct)); if (queue == NULL) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE); return mpi_errno; } queue->next = NULL; g_queue_list = queue;#ifdef USE_MQSHM queue->id = -1; mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**notimpl", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE); return mpi_errno;#else key = MPICH_MSG_QUEUE_ID; id = msgget(key, IPC_CREAT | 0666); if (id == -1) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**msgget", "**msgget %d", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE); return mpi_errno; } queue->id = id;#endif /* USE_MQSHM */ queue->pid = getpid(); MPIU_Snprintf(queue->name, MPIDI_BOOTSTRAP_NAME_LEN, "%d", getpid()); /* drain any stale messages in the queue */ nb = 0; while (nb != -1) { msg.mtype = queue->pid; nb = msgrcv(queue->id, &msg, BOOTSTRAP_MAX_MSG_SIZE, queue->pid, IPC_NOWAIT); } *queue_ptr = queue;#ifndef USE_MQSHM MPIU_DBG_PRINTF(("Created bootstrap queue, %d -> %d:%s\n", key, queue->id, queue->name));#else MPIU_DBG_PRINTF(("Created bootstrap queue, %d:%s\n", queue->id, queue->name));#endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE); return MPI_SUCCESS;}#else#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_BootstrapQ_create#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_BootstrapQ_create(MPIDI_CH3I_BootstrapQ *queue_ptr){ int mpi_errno = MPI_SUCCESS; MPIDI_CH3I_BootstrapQ_struct *queue;#if defined(USE_POSIX_MQ) char key[100]; mode_t mode; struct mq_attr attr; mqd_t id;#elif defined(USE_SYSV_MQ) int id, key;#elif defined(HAVE_WINDOWS_H) DWORD dwThreadID;#else#error No bootstrap queue mechanism defined#endif MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE); /* allocate a queue structure and add it to the global list */ queue = (MPIDI_CH3I_BootstrapQ_struct*) MPIU_Malloc(sizeof(MPIDI_CH3I_BootstrapQ_struct)); if (queue == NULL) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE); return mpi_errno; } queue->next = g_queue_list; g_queue_list = queue;#ifdef USE_POSIX_MQ srand(getpid()); MPIU_Snprintf(key, 100, "%s%d", MPICH_MSG_QUEUE_NAME, rand()); mode = 0666; memset(&attr, 0, sizeof(attr)); attr.mq_maxmsg = BOOTSTRAP_MAX_NUM_MSGS; attr.mq_msgsize = BOOTSTRAP_MAX_MSG_SIZE; id = mq_open(key, O_CREAT | O_RDWR | O_NONBLOCK, mode, &attr); while (id == -1) { if (errno != EEXIST) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**mq_open", "**mq_open %d", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE); return mpi_errno; } MPIU_Snprintf(key, 100, "%s%d", MPICH_MSG_QUEUE_NAME, rand()); id = mq_open(key, O_CREAT | O_RDWR | O_NONBLOCK, mode, &attr); } queue->id = id; MPIU_Strncpy(queue->name, key, MPIDI_BOOTSTRAP_NAME_LEN); MPIU_DBG_PRINTF(("created message queue: %s\n", queue->name));#elif defined(USE_SYSV_MQ) srand(getpid()); key = rand(); id = msgget(key, IPC_CREAT | IPC_EXCL | 0666); while (id == -1) { if (errno != EEXIST) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**msgget", "**msgget %d", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE); return mpi_errno; } key = rand(); id = msgget(key, IPC_CREAT | IPC_EXCL | 0666); } queue->id = id; MPIU_Snprintf(queue->name, MPIDI_BOOTSTRAP_NAME_LEN, "%d", key); MPIU_DBG_PRINTF(("created message queue: %s\n", queue->name));#elif defined(HAVE_WINDOWS_H) queue->hReadyEvent = CreateEvent(NULL, TRUE, FALSE, NULL); queue->hMsgThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)MessageQueueThreadFn, queue, 0, &dwThreadID); if (queue->hMsgThread == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**CreateThread", "**CreateThread %d", GetLastError()); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE); return mpi_errno; } if (WaitForSingleObject(queue->hReadyEvent, 60000) != WAIT_OBJECT_0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**winwait", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE); return mpi_errno; } CloseHandle(queue->hReadyEvent); queue->hReadyEvent = NULL;#else#error No bootstrap queue mechanism defined#endif *queue_ptr = queue; MPIU_DBG_PRINTF(("Created bootstrap queue: %s\n", queue->name)); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE); return MPI_SUCCESS;}#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_BootstrapQ_tostring#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_BootstrapQ_tostring(MPIDI_CH3I_BootstrapQ queue, char *name, int length){ int mpi_errno; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_TOSTRING); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_TOSTRING); if (queue == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_TOSTRING); return mpi_errno; } /*printf("[%d] queue->name = %s\n", MPIR_Process.comm_world->rank, queue->name);fflush(stdout);*/#ifdef MPIDI_CH3_USES_SHM_NAME mpi_errno = MPIU_Snprintf(name, length, "%s:%s", queue->name, queue->shm_name); if (mpi_errno >= length) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_TOSTRING); return mpi_errno; }#else mpi_errno = MPIU_Strncpy(name, queue->name, length); if (mpi_errno) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**arg", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_TOSTRING); return mpi_errno; }#endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_TOSTRING); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_BootstrapQ_unlink#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_BootstrapQ_unlink(MPIDI_CH3I_BootstrapQ queue){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_UNLINK); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_UNLINK);#ifdef USE_MQSHM mpi_errno = MPIDI_CH3I_mqshm_unlink(queue->id); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**boot_unlink", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_UNLINK); return mpi_errno; }#endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_UNLINK); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_BootstrapQ_destroy#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_BootstrapQ_destroy(MPIDI_CH3I_BootstrapQ queue){ int mpi_errno = MPI_SUCCESS;#ifdef USE_SINGLE_MSG_QUEUE MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_DESTROY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_DESTROY);#ifdef USE_MQSHM mpi_errno = MPIDI_CH3I_mqshm_close(queue->id);#endif#elif defined(USE_POSIX_MQ) int result; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_DESTROY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_DESTROY); result = mq_close(queue->id); if (result == -1) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**mq_close", "**mq_close %d", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_DESTROY); return mpi_errno; }#elif defined(USE_SYSV_MQ) int result; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_DESTROY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_DESTROY); result = msgctl(queue->id, IPC_RMID, NULL); if (result == -1) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**msgctl", "**msgctl %d", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_DESTROY); return mpi_errno; }#elif defined(HAVE_WINDOWS_H) MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_DESTROY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_DESTROY); PostMessage(queue->hWnd, WM_DESTROY, 0, 0);#else#error No bootstrap queue mechanism defined#endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_DESTROY); return MPI_SUCCESS;}#ifdef USE_SINGLE_MSG_QUEUE/* FIXME: What does this routine do? */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_BootstrapQ_attach#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_BootstrapQ_attach(char *name_full, MPIDI_CH3I_BootstrapQ * queue_ptr){ int mpi_errno = MPI_SUCCESS;#ifdef USE_SYSV_MQ int id, key;#endif MPIDI_CH3I_BootstrapQ_struct *iter; char name[100];#ifdef MPIDI_CH3_USES_SHM_NAME char shm_name[MPIDI_MAX_SHM_NAME_LENGTH] = ""; char *token; MPIDI_CH3I_BootstrapQ_struct *matched_queue = NULL;#endif MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); MPIU_Strncpy(name, name_full, MPIDI_MAX_SHM_NAME_LENGTH);#ifdef MPIDI_CH3_USES_SHM_NAME token = strtok(name, ":"); if (token != NULL) { token = strtok(NULL, ""); MPIU_Strncpy(shm_name, token, MPIDI_MAX_SHM_NAME_LENGTH); }#endif /* check if this queue has already been attached to and return it if found */ iter = g_queue_list; while (iter != NULL) { if (strcmp(iter->name, name) == 0) { *queue_ptr = iter; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return MPI_SUCCESS; } iter = iter->next; }#ifdef MPIDI_CH3_USES_SHM_NAME /* search for another node with the same shm_name */ iter = g_queue_list; while (iter != NULL) { if (strcmp(iter->shm_name, shm_name) == 0) { matched_queue = iter; break; } iter = iter->next; } if (matched_queue == NULL) { /* This is a queue this process hasn't seen before, so "create" it (attach to existing) */ mpi_errno = MPIDI_CH3I_BootstrapQ_create_named(&matched_queue, shm_name, 0); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return mpi_errno; } /* change the pid field to match the remote process */ MPIU_Strncpy(matched_queue->name, name, MPIDI_BOOTSTRAP_NAME_LEN); matched_queue->pid = atoi(name); *queue_ptr = matched_queue; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return mpi_errno; }#endif /* FIXME: This memory is not freed */ iter = (MPIDI_CH3I_BootstrapQ_struct*)MPIU_Malloc(sizeof(MPIDI_CH3I_BootstrapQ_struct)); if (iter == NULL) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return mpi_errno; } MPIU_Strncpy(iter->name, name, MPIDI_BOOTSTRAP_NAME_LEN); iter->pid = atoi(name);#ifdef MPIDI_CH3_USES_SHM_NAME MPIU_Strncpy(iter->shm_name, shm_name, MPIDI_BOOTSTRAP_NAME_LEN); iter->id = matched_queue->id;#else iter->id = g_queue_list->id;#endif iter->next = g_queue_list; g_queue_list = iter;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -