📄 ch3i_bootstrapq.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#ifdef USE_SINGLE_MSG_QUEUE#include <sys/ipc.h>#include <sys/types.h>#include <sys/msg.h>#endif#ifdef HAVE_WINDOWS_H#include <winsock2.h>#include <windows.h>#endif/* STATES:NO WARNINGS *//* FIXME: Are these needed here (these were defined within an unrelated ifdef in mpidimpl.h) */#ifdef HAVE_TIME_H#include <time.h>#endif#ifdef HAVE_UUID_UUID_H#include <uuid/uuid.h>#endif/* FIXME: These routines allocate memory but rarely free it. They need a finalize handler to remove the allocated memory. A clear description of the purpose of these routines and their scope of use would make that easier. *//* FIXME: These routines should *first* decide on the method of shared memory and queue support, then implement the Bootstrap interface in terms of those routines, if necessary using large (but single) ifdef regions rather than the constant ifdef/endif code used here. In addition, common service routines should be defined so that the overall code can simply *//* FIXME: These routines need a description and explanation. For example, where is the bootstrap q pointer exported? Is there more than one bootstrap queue? */#ifdef HAVE_WINDOWS_Htypedef struct bootstrap_msg{ int length; char buffer[1024]; struct bootstrap_msg *next;} bootstrap_msg;#endif /* HAVE_WINDOWS_H */typedef struct MPIDI_CH3I_BootstrapQ_struct{ char name[MPIDI_BOOTSTRAP_NAME_LEN];#ifdef MPIDI_CH3_USES_SHM_NAME char shm_name[MPIDI_BOOTSTRAP_NAME_LEN];#endif#ifdef USE_SINGLE_MSG_QUEUE int pid; int id;#elif defined(USE_SYSV_MQ) || defined(USE_POSIX_MQ) int id;#elif defined (HAVE_WINDOWS_H) bootstrap_msg *msg_list; HWND hWnd; HANDLE hMsgThread; HANDLE hReadyEvent; HANDLE hMutex; HANDLE hMessageArrivedEvent; int error;#else#error *** No bootstrapping queue capability specified ***#endif struct MPIDI_CH3I_BootstrapQ_struct *next;} MPIDI_CH3I_BootstrapQ_struct;static MPIDI_CH3I_BootstrapQ_struct * g_queue_list = NULL;#ifdef HAVE_WINDOWS_H#undef FUNCNAME#define FUNCNAME GetNextBootstrapMsg#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int GetNextBootstrapMsg(MPIDI_CH3I_BootstrapQ queue, bootstrap_msg ** msg_ptr, BOOL blocking){ MPIDI_STATE_DECL(MPID_STATE_GET_NEXT_BOOTSTRAP_MSG); MPIDI_FUNC_ENTER(MPID_STATE_GET_NEXT_BOOTSTRAP_MSG); while (WaitForSingleObject(queue->hMessageArrivedEvent, blocking ? INFINITE : 0) == WAIT_OBJECT_0) { WaitForSingleObject(queue->hMutex, INFINITE); if (queue->msg_list) { *msg_ptr = queue->msg_list; queue->msg_list = queue->msg_list->next; if (queue->msg_list == NULL) ResetEvent(queue->hMessageArrivedEvent); ReleaseMutex(queue->hMutex); MPIDI_FUNC_EXIT(MPID_STATE_GET_NEXT_BOOTSTRAP_MSG); return MPI_SUCCESS; } ReleaseMutex(queue->hMutex); } *msg_ptr = NULL; MPIDI_FUNC_EXIT(MPID_STATE_GET_NEXT_BOOTSTRAP_MSG); return MPI_SUCCESS;}static MPIDI_CH3I_BootstrapQ s_queue = NULL;#undef FUNCNAME#define FUNCNAME BootstrapQWndProc#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)LRESULT CALLBACK BootstrapQWndProc( HWND hwnd, /* handle to window */ UINT uMsg, /* message identifier */ WPARAM wParam, /* first message parameter */ LPARAM lParam) /* second message parameter */{ PCOPYDATASTRUCT p; bootstrap_msg *bsmsg_ptr, *msg_iter; /*MPIDI_STATE_DECL(MPID_STATE_BOOTSTRAPQWNDPROC); MPIDI_FUNC_ENTER(MPID_STATE_BOOTSTRAPQWNDPROC);*/ switch (uMsg) { case WM_DESTROY: PostQuitMessage(0); break; case WM_COPYDATA: /*printf("WM_COPYDATA received\n");fflush(stdout);*/ p = (PCOPYDATASTRUCT) lParam; if (WaitForSingleObject(s_queue->hMutex, INFINITE) == WAIT_OBJECT_0) { bsmsg_ptr = MPIU_Malloc(sizeof(bootstrap_msg)); if (bsmsg_ptr != NULL) { memcpy(bsmsg_ptr->buffer, p->lpData, p->cbData); bsmsg_ptr->length = p->cbData; bsmsg_ptr->next = NULL; msg_iter = s_queue->msg_list; if (msg_iter == NULL) s_queue->msg_list = bsmsg_ptr; else { while (msg_iter->next != NULL) msg_iter = msg_iter->next; msg_iter->next = bsmsg_ptr; } /*printf("bootstrap message received: %d bytes\n", bsmsg_ptr->length);fflush(stdout);*/ SetEvent(s_queue->hMessageArrivedEvent); } else { MPIU_Error_printf("MPIU_Malloc failed\n"); } ReleaseMutex(s_queue->hMutex); } else { MPIU_Error_printf("Error waiting for s_queue mutex, error %d\n", GetLastError()); } /*MPIDI_FUNC_EXIT(MPID_STATE_BOOTSTRAPQWNDPROC);*/ return 101; default: /*MPIDI_FUNC_EXIT(MPID_STATE_BOOTSTRAPQWNDPROC);*/ return DefWindowProc(hwnd, uMsg, wParam, lParam); } /*MPIDI_FUNC_EXIT(MPID_STATE_BOOTSTRAPQWNDPROC);*/ return 0;}void MessageQueueThreadFn(MPIDI_CH3I_BootstrapQ_struct *queue){ MSG msg; BOOL bRet; WNDCLASS wc; UUID guid; UuidCreate(&guid); MPIU_Snprintf(queue->name, MPIDI_BOOTSTRAP_NAME_LEN, "%08lX-%04X-%04x-%02X%02X-%02X%02X%02X%02X%02X%02X", guid.Data1, guid.Data2, guid.Data3, guid.Data4[0], guid.Data4[1], guid.Data4[2], guid.Data4[3], guid.Data4[4], guid.Data4[5], guid.Data4[6], guid.Data4[7]); wc.style = 0; wc.lpfnWndProc = (WNDPROC) BootstrapQWndProc; wc.cbClsExtra = 0; wc.cbWndExtra = 0; wc.hInstance = NULL; wc.hIcon = LoadIcon((HINSTANCE) NULL, IDI_APPLICATION); wc.hCursor = LoadCursor((HINSTANCE) NULL, IDC_ARROW); wc.hbrBackground = GetStockObject(WHITE_BRUSH); wc.lpszMenuName = "MainMenu"; wc.lpszClassName = queue->name; if (!RegisterClass(&wc)) { queue->error = GetLastError(); SetEvent(queue->hReadyEvent); return; } /* Create the hidden window. */ queue->hWnd = CreateWindow( queue->name, /* window class */ queue->name, /* window name */ WS_OVERLAPPEDWINDOW, CW_USEDEFAULT, CW_USEDEFAULT, CW_USEDEFAULT, CW_USEDEFAULT, (HWND) HWND_MESSAGE, (HMENU) NULL, 0, (LPVOID) NULL); if (!queue->hWnd) { queue->error = GetLastError(); SetEvent(queue->hReadyEvent); return; } ShowWindow(queue->hWnd, SW_HIDE); /* initialize the synchronization objects */ queue->msg_list = NULL; queue->hMessageArrivedEvent = CreateEvent(NULL, TRUE, FALSE, NULL); queue->hMutex = CreateMutex(NULL, FALSE, NULL); /*MPIU_Assert(s_queue == NULL);*/ /* we can only handle one message queue */ if (s_queue != NULL) { MPIU_Error_printf("Error: more than one message queue created\n"); return; } s_queue = queue; /*printf("signalling queue is ready\n");fflush(stdout);*/ /* signal that the queue is ready */ SetEvent(queue->hReadyEvent); /* Start handling messages. */ while( (bRet = GetMessage( &msg, NULL, 0, 0 )) != 0) { if (bRet == -1) { MPIU_Error_printf("MessageQueueThreadFn window received a WM_QUIT message, exiting...\n"); return; } else { TranslateMessage(&msg); DispatchMessage(&msg); /*printf("window message: %d\n", msg.message);fflush(stdout);*/ } } }#endif /* HAVE_WINDOWS_H */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_BootstrapQ_create_unique_name#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_BootstrapQ_create_unique_name(char *name, int length){#ifdef HAVE_WINDOWS_H UUID guid; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_UNIQUE_NAME); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_UNIQUE_NAME); if (length < 40) { MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_UNIQUE_NAME); return -1; } UuidCreate(&guid); MPIU_Snprintf(name, length, "%08lX-%04X-%04x-%02X%02X-%02X%02X%02X%02X%02X%02X", guid.Data1, guid.Data2, guid.Data3, guid.Data4[0], guid.Data4[1], guid.Data4[2], guid.Data4[3], guid.Data4[4], guid.Data4[5], guid.Data4[6], guid.Data4[7]);#elif defined(HAVE_UUID_GENERATE) uuid_t guid; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_UNIQUE_NAME); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_UNIQUE_NAME); if (length < 40) { MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_UNIQUE_NAME); return -1; } uuid_generate(guid); uuid_unparse(guid, name);#elif defined(HAVE_TIME) MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_UNIQUE_NAME); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_UNIQUE_NAME); /* rand is part of stdlib and returns an int, not an unsigned long */ MPIU_Snprintf(name, 40, "%08X%08X%08X%08lX", rand(), rand(), rand(), (unsigned long)time(NULL));#else MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_UNIQUE_NAME); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_UNIQUE_NAME); MPIU_Snprintf(name, 40, "%08X%08X%08X%08X", rand(), rand(), rand(), rand());#endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_UNIQUE_NAME); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_BootstrapQ_create_named#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_BootstrapQ_create_named(MPIDI_CH3I_BootstrapQ *queue_ptr, const char *name, const int initialize){ int mpi_errno = MPI_SUCCESS;#ifdef USE_MQSHM MPIDI_CH3I_BootstrapQ_struct *queue; int id;#endif MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_NAMED); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_NAMED);#ifdef USE_MQSHM queue = (MPIDI_CH3I_BootstrapQ_struct*) MPIU_Malloc(sizeof(MPIDI_CH3I_BootstrapQ_struct)); if (queue == NULL) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_NAMED); return mpi_errno; } queue->next = g_queue_list; g_queue_list = queue; /*printf("[%d] calling mqshm_create(%s) from BootstrapQ_create_named\n", MPIR_Process.comm_world->rank, name);fflush(stdout);*/ mpi_errno = MPIDI_CH3I_mqshm_create(name, initialize, &id); if (mpi_errno != MPI_SUCCESS || id == -1) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**mqshm_create", 0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_NAMED); return mpi_errno; } queue->id = id; queue->pid = getpid(); /*strcpy(queue->name, name);*/ MPIU_Snprintf(queue->name, MPIDI_BOOTSTRAP_NAME_LEN, "%d", queue->pid);#ifdef MPIDI_CH3_USES_SHM_NAME MPIU_Strncpy(queue->shm_name, name, MPIDI_BOOTSTRAP_NAME_LEN);#endif *queue_ptr = queue; MPIU_DBG_PRINTF(("Created bootstrap queue, %d:%s\n", queue->id, queue->name));#else mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**notimpl", 0);#endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE_NAMED); return mpi_errno;}#ifdef USE_SINGLE_MSG_QUEUE#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_BootstrapQ_create#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_BootstrapQ_create(MPIDI_CH3I_BootstrapQ *queue_ptr){ int mpi_errno = MPI_SUCCESS; MPIDI_CH3I_BootstrapQ_struct *queue;#ifndef USE_MQSHM int key; int id;#endif int nb; struct msgbuf { long mtype; char data[BOOTSTRAP_MAX_MSG_SIZE]; } msg; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE); if (g_queue_list) { *queue_ptr = g_queue_list; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_BOOTSTRAPQ_CREATE);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -