📄 nt_ipvishm_priv.cpp
字号:
else if (g_bUseDatabase) { /////////////////////////////////////////////////////////////////////// // Use InDone and PassThroughDone keys to create a barrier here. // Then use the ThroughDone key to guarantee no more database accesses. /////////////////////////////////////////////////////////////////////// char pValue[100]; int length = 100; // Everyone puts an InDone message into the database g_Database.Put("InDone", (void*)"yes", 4, false); if (g_nIproc == 0) { // Process zero consumes all the InDone messages ... for (int i=0; i<g_nNproc; i++) { length = 100; g_Database.Get("InDone", pValue, &length); } // ... and then put a PassThroughDone message g_Database.Put("PassThroughDone", (void*)"yes", 4); } // Everyone waits for the PassThroughDone message from process zero ... length = 100; g_Database.Get("PassThroughDone", pValue, &length); // ... and then puts a ThroughDone message g_bViClosing = true; g_Database.Put("ThroughDone", (void*)"yes", 4, false); if (g_nIproc == 0) { // Process zero consumes all the ThroughDone messages for (int i=0; i<g_nNproc; i++) { length = 100; g_Database.Get("ThroughDone", pValue, &length); } // When all the ThroughDone messages have been consumed, we can // guarantee that there will be no more use of the database. // So, it is safe for process zero to delete the branch in the // database corresponding to this job. g_Database.Delete(); } } else { // Signal that the current process is in End SendInDoneMsg(); WaitForSingleObject(g_hOkToPassThroughDone, INFINITE); CloseHandle(g_hOkToPassThroughDone); if (g_nIproc == 0) { // Wait for everyone else to arrive here WaitForSingleObject(g_hAllInDoneEvent, INFINITE); CloseHandle(g_hAllInDoneEvent); } // Signal the control loop thread to stop SetEvent(g_hStopControlLoopEvent); if (g_hControlLoopThread != NULL) { WaitForSingleObject(g_hControlLoopThread, MPICH_SHORT_TIMEOUT); CloseHandle(g_hControlLoopThread); g_hControlLoopThread = NULL; } } if (g_hCommPortThread != NULL) { // Signal the communication thread to stop DPRINTF(("process %d: MPID_NT_ipvishm_End signalling CommPortThread to exit.\n", g_nIproc)); g_nCommPortCommand = NT_COMM_CMD_EXIT; SetEvent(g_hCommPortEvent); // Assuming there aren't any blocking calls pending // the CommThread should exit soon after signalling g_hCommEvent if (WaitForSingleObject(g_hCommPortThread, MPICH_SHORT_TIMEOUT) == WAIT_TIMEOUT) { //nt_error("wait for CommThread to exit timed out", 1); LogMsg(TEXT("wait for CommPortThread to exit in End timed out")); TerminateThread(g_hCommPortThread, 0); } // Close all the communication sockets for (int i=0; i<g_nNproc; i++) { if (g_pProcTable[i].sock_event != NULL) { // Close the socket NT_Tcp_closesocket(g_pProcTable[i].sock, g_pProcTable[i].sock_event); g_pProcTable[i].sock = INVALID_SOCKET; g_pProcTable[i].sock_event = NULL; CloseHandle(g_pProcTable[i].msg.ovl.hEvent); } CloseHandle(g_pProcTable[i].hValidDataEvent); } } } // Clean up the shared memory stuff EndSMP(); // Clean up the VIA stuff EndVI(); // Clean up the BNR interface if (g_bUseBNR) BNR_Finalize(); // Free up allocated memory if (g_pProcTable != NULL) { delete g_pProcTable; g_pProcTable = NULL; } if (g_bMPDFinalize) easy_socket_finalize(); WSACleanup();}int MPID_NT_ipvishm_exitall(char *msg, int code){ nt_error(msg, code); return 0;}int MPID_NT_ipvishm_is_shm( int rank ){ return g_pProcTable[rank].shm;}// Function name : nt_tcp_shm_proc_info// Description : fills hostname and exename with information for // the i'th process and returns the process id of // that process// Return type : int // Argument : int i// Argument : char **hostname// Argument : char **exenameint nt_ipvishm_proc_info(int i, char **hostname, char **exename){ //DPRINTF(("nt_ipvishm_proc_info called for process %d.\n", i)); // Check bounds if ((i < 0) || (i >= g_nNproc)) return -1; // Check to see whether the information needs to be retrieved if (g_pProcTable[i].pid == 0) { if (g_bUseBNR) { char pszKey[100]; char pszTemp[100]; sprintf(pszKey, "ListenHost%d", i); BNR_Get(g_myBNRgroup, pszKey, g_pProcTable[i].host); sprintf(pszKey, "Executable%d", i); BNR_Get(g_myBNRgroup, pszKey, g_pProcTable[i].exename); sprintf(pszKey, "pid%d", i); BNR_Get(g_myBNRgroup, pszKey, pszTemp); g_pProcTable[i].pid = atoi(pszTemp); } else if (g_bUseDatabase) { char pszKey[100]; char pszTemp[100]; int length = NT_HOSTNAME_LEN; sprintf(pszKey, "ListenHost%d", i); g_Database.Get(pszKey, g_pProcTable[i].host, &length); length = NT_EXENAME_LEN; sprintf(pszKey, "Executable%d", i); g_Database.Get(pszKey, g_pProcTable[i].exename, &length); length = 100; sprintf(pszKey, "pid%d", i); g_Database.Get(pszKey, pszTemp, &length); g_pProcTable[i].pid = atoi(pszTemp); //length = sizeof(int); //g_Database.Get(pszKey, &g_pProcTable[i].pid, &length); } else GetProcessInfo(i); } // Return a pointer to the information in the proc table // I assume the buffers will not be modified only read *hostname = g_pProcTable[i].host; *exename = g_pProcTable[i].exename; return g_pProcTable[i].pid;}// Function name : nt_error// Description : Prints an error message and exits// Return type : void // Argument : char *string// Argument : int valuevoid nt_error(char *string, int value){ printf("Error %d, process %d:\n %s\n", value, g_nIproc, string);fflush(stdout); // Signal the threads to stop and close their socket connections DPRINTF(("process %d: nt_error signalling CommunicationThread to exit.\n", g_nIproc);fflush(stdout)); g_nCommPortCommand = NT_COMM_CMD_EXIT; SetEvent(g_hCommPortEvent); // Close all the communication sockets if (g_pProcTable != NULL) { for (int i=0; i<g_nNproc; i++) { if (g_pProcTable[i].sock_event != NULL) { NT_Tcp_closesocket(g_pProcTable[i].sock, g_pProcTable[i].sock_event); g_pProcTable[i].sock = INVALID_SOCKET; g_pProcTable[i].sock_event = NULL; } } } if (g_bUseBNR) BNR_Finalize(); WSACleanup(); ExitProcess(value);}void PrintWinSockError(int error){ HLOCAL str; int num_bytes; num_bytes = FormatMessage( FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER, 0, error, MAKELANGID( LANG_NEUTRAL, SUBLANG_DEFAULT ), (LPTSTR) &str, 0,0); if (strlen((const char*)str)) printf("%s", str); else printf("\n"); LocalFree(str);}// Function name : nt_error_socket// Description : Prints an error message and exits// Return type : void // Argument : char *string// Argument : int valuevoid nt_error_socket(char *string, int value){ printf("Error %d, process %d:\n %s\n ", value, g_nIproc, string); PrintWinSockError(value); fflush(stdout); // Signal the threads to stop and close their socket connections DPRINTF(("process %d: nt_error signalling CommunicationThread to exit.\n", g_nIproc);fflush(stdout)); g_nCommPortCommand = NT_COMM_CMD_EXIT; SetEvent(g_hCommPortEvent); // Close all the communication sockets if (g_pProcTable != NULL) { for (int i=0; i<g_nNproc; i++) { if (g_pProcTable[i].sock_event != NULL) { NT_Tcp_closesocket(g_pProcTable[i].sock, g_pProcTable[i].sock_event); g_pProcTable[i].sock = INVALID_SOCKET; g_pProcTable[i].sock_event = NULL; } } } if (g_bUseBNR) BNR_Finalize(); WSACleanup(); ExitProcess(value);}// Function name : NT_PIbsend// Description : Sends the buffer to process to, establishing a connection if necessary// Return type : int // Argument : int type// Argument : void *buffer// Argument : int length// Argument : int to// Argument : int datatypeint NT_PIbsend(int type, void *buffer, int length, int to, int datatype){ DPRINTF(("NT_PIbsend called: %d to %d, tag: %d, length: %d\n", g_nIproc, to, type, length)); // Handle the special case of sending to oneself if (to == g_nIproc) { MessageQueue::MsgQueueElement *pElement; void *pBuf = g_MsgQueue.GetBufferToFill(type, length, g_nIproc, &pElement); if (pBuf == NULL) nt_error("NT_PIbsend: MessageQueue.GetBuffer failed.", 1); memcpy(pBuf, buffer, length); if (!g_MsgQueue.SetElementEvent(pElement)) nt_error("NT_PIbsend: MessageQueue.SetElementEvent failed", 1); return 0; } // Check bounds if (to < 0 || to >= g_nNproc) MakeErrMsg(1, "Send out of range: %d is not between 0 and %d", to, g_nNproc); if (g_pProcTable[to].shm) { NT_ShmSend(type, buffer, length, to); } else { if (g_pProcTable[to].via) { NT_ViSend(type, buffer, length, to); } else { if (g_pProcTable[to].sock == INVALID_SOCKET) { DPRINTF(("making a connection to %d\n", to)); if (!ConnectTo(to)) MakeErrMsg(1, "NT_PIbsend: Unable to connect to process %d", to); } /* if (SendBlocking(g_pProcTable[to].sock, (char*)&type, sizeof(int), 0) == SOCKET_ERROR) nt_error_socket("NT_PIbsend: send type failed.", WSAGetLastError()); if (SendBlocking(g_pProcTable[to].sock, (char*)&length, sizeof(int), 0) == SOCKET_ERROR) nt_error_socket("NT_PIbsend: send length failed", WSAGetLastError()); if (SendBlocking(g_pProcTable[to].sock, (char*)buffer, length, 0) == SOCKET_ERROR) nt_error_socket("NT_PIbsend: send buffer failed", WSAGetLastError()); */ if (SendStreamBlocking(g_pProcTable[to].sock, (char*)buffer, length, type) == SOCKET_ERROR) nt_error_socket("NT_PIbsend: send msg failed.", WSAGetLastError()); } } DPRINTF(("type: %d, length: %d sent to %d\n", type, length, to)); return 0;}// Function name : NT_PInsend// Description : // Return type : int // Argument : int type// Argument : void *buffer// Argument : int length// Argument : int to// Argument : int datatype// Argument : int *pIdint NT_PInsend(int type, void *buffer, int length, int to, int datatype, int *pId){ // Do a blocking send NT_PIbsend(type, buffer, length, to, datatype); // Set the handle to be finished pId[0] = 0; return 0;}// Function name : NT_PIbrecv// Description : // Return type : int // Argument : int type// Argument : void *buffer// Argument : int length// Argument : int datatypeint NT_PIbrecv(int type, void *buffer, int length, int datatype){ /* DPRINTF(("NT_PIbrecv called: %d type: %d, length: %d\n", g_nIproc, type, length)); if (!g_MsgQueue.FillThisBuffer(type, buffer, &length, &g_nLastRecvFrom)) { if (length == -1) return MPI_ERR_COUNT; else nt_error("Recv:FillBuffer failed.\n", 1); } DPRINTF(("type: %d len: %d received from %d\n", type, length, g_nLastRecvFrom)); return 0; /*/ int pId[10]; g_MsgQueue.PostBufferForFilling(type, buffer, length, pId); g_MsgQueue.Wait(pId); g_nLastRecvFrom = pId[3]; return 0; //*/}// Function name : NT_PInrecv// Description : // Return type : int // Argument : int type// Argument : void *buffer// Argument : int length// Argument : int datatype// Argument : int *pIdint NT_PInrecv(int type, void *buffer, int length, int datatype, int *pId){ DPRINTF(("NT_PInrecv called: %d type: %d, length: %d\n", g_nIproc, type, length)); return (g_MsgQueue.PostBufferForFilling(type, buffer, length, pId)) ? 0 : 1;}// Function name : NT_PIwait// Description : // Return type : int // Argument : int *pIdint NT_PIwait(int *pId){ if (pId == NULL) nt_error("wait called on invalid object", 1); if (pId[0] == 0) return 1; return (g_MsgQueue.Wait(pId)) ? 1 : 0;}// Function name : NT_PInstatus// Description : // Return type : int // Argument : int *pIdint NT_PInstatus(int *pId){ if (pId[0] == 0) return 1; //return (g_MsgQueue.Test(pId)) ? 1 : 0; if (g_MsgQueue.Test(pId)) return 1; Sleep(0); return 0;}// Function name : NT_PInprobe// Description : Returns true if a message is available with the tag 'type'// Return type : int // Argument : int typeint NT_PInprobe(int type){ //DPRINTF(("NT_PInprobe called.\n")); if (g_MsgQueue.Available(type, g_nLastRecvFrom)) { return 1; } Sleep(0); return 0;}// Function name : MPID_Wtime// Description : // Return type : void // Argument : double *tvoid MPID_Wtime(double *t){ LARGE_INTEGER nLargeInt; QueryPerformanceCounter(&nLargeInt); *t = double(nLargeInt.QuadPart) / (double)g_nPerfFrequency.QuadPart;}// Function name : MPID_Wtick// Description : // Return type : void // Argument : double *tvoid MPID_Wtick(double *t){ *t = 1.0 / (double)g_nPerfFrequency.QuadPart;}// Function name : NT_PIgimax// Description : Does a global max operation. Used for setting up // heterogeneous environments.// Needed when MPID_HAS_HETERO is defined.// What it is supposed to do, I have no idea.// Return type : int // Argument : void *val// Argument : int n// Argument : int work// Argument : int procset#ifdef MPID_HAS_HETEROint NT_PIgimax(void *val, int n, int work, int procset){ DPRINTF(("NT_PIgimax called.\n")); return -1;}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -