📄 ch3i_bootstrapq.c
字号:
*queue_ptr = iter; /*printf("[%d] attached to message queue: %s\n", MPIR_Process.comm_world->rank, name);fflush(stdout);*/ MPIU_DBG_PRINTF(("attached to message queue: %s\n", name)); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return MPI_SUCCESS;}#else#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_BootstrapQ_attach#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_BootstrapQ_attach(char *name, MPIDI_CH3I_BootstrapQ * queue_ptr){ int mpi_errno = MPI_SUCCESS;#ifdef USE_MQSHM MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); /*printf("[%d] attaching to %s, returning q_ptr %p\n", MPIR_Process.comm_world->rank, name, g_queue_list);fflush(stdout);*/ *queue_ptr = g_queue_list;#elif defined(USE_POSIX_MQ) int id; MPIDI_CH3I_BootstrapQ_struct *iter = g_queue_list; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); while (iter) { if (strcmp(iter->name, name) == 0) { *queue_ptr = iter; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return MPI_SUCCESS; } iter = iter->next; } iter = (MPIDI_CH3I_BootstrapQ_struct*) MPIU_Malloc(sizeof(MPIDI_CH3I_BootstrapQ_struct)); if (iter == NULL) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return mpi_errno; } MPIU_Strncpy(iter->name, name, MPIDI_BOOTSTRAP_NAME_LEN); id = mq_open(name, O_RDWR | O_NONBLOCK); if (id == -1) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**mq_open", "**mq_open %d", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return mpi_errno; } iter->id = id; iter->next = g_queue_list; /*iter->next = NULL;*/ *queue_ptr = iter; /*printf("[%d] attached to message queue: %s\n", MPIR_Process.comm_world->rank, name);fflush(stdout);*/#elif defined(USE_SYSV_MQ) int id, key; MPIDI_CH3I_BootstrapQ_struct *iter = g_queue_list; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); while (iter) { if (strcmp(iter->name, name) == 0) { *queue_ptr = iter; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return MPI_SUCCESS; } iter = iter->next; } iter = (MPIDI_CH3I_BootstrapQ_struct*) MPIU_Malloc(sizeof(MPIDI_CH3I_BootstrapQ_struct)); if (iter == NULL) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return mpi_errno; } MPIU_Strncpy(iter->name, name, MPIDI_BOOTSTRAP_NAME_LEN); key = atoi(name); id = msgget(key, IPC_CREAT | 0666); if (id == -1) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**msgget", "**msgget %d", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return mpi_errno; } iter->id = id; iter->next = g_queue_list; /*iter->next = NULL;*/ *queue_ptr = iter; /*printf("[%d] attached to message queue: %s\n", MPIR_Process.comm_world->rank, name);fflush(stdout);*/#elif defined(HAVE_WINDOWS_H) MPIDI_CH3I_BootstrapQ_struct *iter = g_queue_list; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); while (iter) { if (strcmp(iter->name, name) == 0) { *queue_ptr = iter; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return MPI_SUCCESS; } iter = iter->next; } iter = (MPIDI_CH3I_BootstrapQ_struct*) MPIU_Malloc(sizeof(MPIDI_CH3I_BootstrapQ_struct)); if (iter == NULL) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return mpi_errno; } MPIU_Strncpy(iter->name, name, MPIDI_BOOTSTRAP_NAME_LEN); /*printf("looking for window %s\n", name);fflush(stdout);*/ iter->hWnd = FindWindowEx(HWND_MESSAGE, NULL, name, name); /*if (iter->hWnd != NULL) { printf("FindWindowEx found the window\n"); fflush(stdout); }*/ if (iter->hWnd == NULL) { iter->hWnd = FindWindow(name, name); /*if (iter->hWnd != NULL) { printf("FindWindow found the window\n"); fflush(stdout); }*/ } if (iter->hWnd == NULL) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**FindWindowEx", "**FindWindowEx %d", GetLastError()); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return mpi_errno; } iter->hMutex = CreateMutex(NULL, FALSE, NULL); iter->hReadyEvent = NULL; iter->msg_list = NULL; iter->hMsgThread = NULL; iter->hMessageArrivedEvent = NULL; iter->next = g_queue_list; /*iter->next = NULL;*/ *queue_ptr = iter;#else#error No bootstrap queue mechanism defined#endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_ATTACH); return MPI_SUCCESS;}#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_BootstrapQ_detach#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_BootstrapQ_detach(MPIDI_CH3I_BootstrapQ queue){ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_DETACH); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_DETACH); /* remove the queue from the global list? */ MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_DETACH); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_BootstrapQ_send_msg#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_BootstrapQ_send_msg(MPIDI_CH3I_BootstrapQ queue, void *buffer, int length){ int mpi_errno = MPI_SUCCESS;#ifdef USE_MQSHM int num_sent = 0; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_SEND_MSG); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_SEND_MSG); /*printf("[%d] calling mqshm_send from BootstrapQ_send_msg\n", MPIR_Process.comm_world->rank);fflush(stdout);*/ mpi_errno = MPIDI_CH3I_mqshm_send(queue->id, buffer, length, queue->pid, &num_sent, 1); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**mqshm_send", 0); }#elif defined(USE_POSIX_MQ) || defined(USE_SYSV_MQ) || defined(USE_SINGLE_MSG_QUEUE) struct msgbuf { long mtype; char data[BOOTSTRAP_MAX_MSG_SIZE]; } msg; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_SEND_MSG); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_SEND_MSG);#ifdef MPICH_DBG_OUTPUT /*MPIU_Assert(length <= BOOTSTRAP_MAX_MSG_SIZE);*/ if (length > BOOTSTRAP_MAX_MSG_SIZE) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**bootqmsg", "bootqmsg %d %d", length, BOOTSTRAP_MAX_MSG_SIZE); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_SEND_MSG); return mpi_errno; }#endif#ifdef USE_SINGLE_MSG_QUEUE msg.mtype = queue->pid;#else msg.mtype = 100;#endif MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(msg.data, buffer, length); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); MPIU_DBG_PRINTF(("sending message %d on queue %d\n", msg.mtype, queue->id));#ifdef USE_POSIX_MQ if (mq_send(queue->id, &msg, length, 0)) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**mq_send", "**mq_send %d", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_SEND_MSG); return mpi_errno; }#elif defined (USE_SYSV_MQ) if (msgsnd(queue->id, &msg, length, 0) == -1) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**msgsnd", "**msgsnd %d", errno); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_SEND_MSG); return mpi_errno; }#else#error No bootstrap queue mechanism defined#endif MPIU_DBG_PRINTF(("message sent: %d bytes\n", length));#elif defined(HAVE_WINDOWS_H) COPYDATASTRUCT data; LRESULT rc; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_SEND_MSG); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_SEND_MSG); data.dwData = 0; /* immediate data field */ data.cbData = length; data.lpData = buffer; rc = SendMessage(queue->hWnd, WM_COPYDATA, 0, (LPARAM)&data); /*printf("SendMessage returned %d\n", rc);fflush(stdout);*/#endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_SEND_MSG); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_BootstrapQ_recv_msg#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_BootstrapQ_recv_msg(MPIDI_CH3I_BootstrapQ queue, void *buffer, int length, int *num_bytes_ptr, BOOL blocking){ int mpi_errno = MPI_SUCCESS;#ifdef USE_MQSHM MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_RECV_MSG); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_RECV_MSG); /*printf("[%d] calling mqshm_receive from BootstrapQ_recv_msg\n", MPIR_Process.comm_world->rank);fflush(stdout);*/ mpi_errno = MPIDI_CH3I_mqshm_receive(queue->id, queue->pid, buffer, length, num_bytes_ptr, blocking); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**mqshm_receive", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_RECV_MSG); return mpi_errno; }#elif defined(USE_POSIX_MQ) || defined(USE_SYSV_MQ) || defined(USE_SINGLE_MSG_QUEUE) int nb; struct msgbuf { long mtype; char data[BOOTSTRAP_MAX_MSG_SIZE]; } msg; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_RECV_MSG); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_RECV_MSG);#ifdef MPICH_DBG_OUTPUT /*MPIU_Assert(length <= BOOTSTRAP_MAX_MSG_SIZE);*/ if (length > BOOTSTRAP_MAX_MSG_SIZE) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**bootqmsg", "bootqmsg %d %d", length, BOOTSTRAP_MAX_MSG_SIZE); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_RECV_MSG); return mpi_errno; }#endif#ifdef USE_SINGLE_MSG_QUEUE msg.mtype = queue->pid; nb = msgrcv(queue->id, &msg, BOOTSTRAP_MAX_MSG_SIZE, queue->pid, blocking ? 0 : IPC_NOWAIT);#else#ifdef USE_POSIX_MQ nb = mq_receive(queue->id, &msg, BOOTSTRAP_MAX_MSG_SIZE, NULL);#elif defined(USE_SYSV_MQ) msg.mtype = 100; nb = msgrcv(queue->id, &msg, BOOTSTRAP_MAX_MSG_SIZE, 0, blocking ? 0 : IPC_NOWAIT);#else#error No bootstrap queue mechanism defined#endif#endif if (nb == -1) { *num_bytes_ptr = 0; if (errno == EAGAIN || errno == ENOMSG) { MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_RECV_MSG); return MPI_SUCCESS; }#ifdef USE_POSIX_MQ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**mq_receive", "**mq_receive %d", errno);#else mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**msgrcv", "**msgrcv %d", errno);#endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_RECV_MSG); return mpi_errno; } MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(buffer, msg.data, nb); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); *num_bytes_ptr = nb; MPIU_DBG_PRINTF(("message %d received: %d bytes\n", msg.mtype, nb)); #elif defined(HAVE_WINDOWS_H) bootstrap_msg * msg; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_RECV_MSG); MPIDI_STATE_DECL(MPID_STATE_MEMCPY); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_RECV_MSG); mpi_errno = GetNextBootstrapMsg(queue, &msg, blocking); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nextbootmsg", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_RECV_MSG); return mpi_errno; } if (msg) { MPIDI_FUNC_ENTER(MPID_STATE_MEMCPY); memcpy(buffer, msg->buffer, min(length, msg->length)); MPIDI_FUNC_EXIT(MPID_STATE_MEMCPY); *num_bytes_ptr = min(length, msg->length); MPIU_Free(msg); } else { *num_bytes_ptr = 0; }#endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_RECV_MSG); return mpi_errno;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -