⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 run.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
字号:
#include "mpdimpl.h"#include "database.h"#define EXIT_WORKER_KEY	-1int g_NumCommPortThreads = 4;HANDLE g_hCommPort;HANDLE g_hCommPortEvent = CreateEvent(NULL, TRUE, FALSE, NULL);BOOL RunRead(MPD_Context *p, int *pRunReturn);BOOL RunWrite(MPD_Context *p, int *pRunReturn);void ErrorExit(char *str, int exitcode = -1){    err_printf("****%s", str);    ContextFinalize();    dbs_finalize();    easy_socket_finalize();    err_printf("****EXITING\n");    ExitProcess(exitcode);}BOOL RunRead(MPD_Context *p, int *pRunReturn){    int error;        if (p->pszIn[0] == '\0')    {	err_printf("RunRead: %s(%d): Error, empty string read.\n", ContextTypeToString(p), p->sock);	p->bDeleteMe = true;	p->nState = MPD_INVALID;	return TRUE;    }    p->nState = MPD_READING;    if (ReadString(p->sock, &p->pszIn[1]))    {	//dbg_printf("RunRead: '%s'\n", p->pszIn);	p->nState = MPD_IDLE;	p->nCurPos = 0;	switch(p->nType)	{	case MPD_SOCKET:	    err_printf("RunRead: Error, MPD_SOCKET read a string '%s'.\n", p->pszIn);	    break;	case MPD_LEFT_SOCKET:	    //dbg_printf("__left socket read '%s'\n", p->pszIn);	    HandleLeftRead(p);	    break;	case MPD_RIGHT_SOCKET:	    //err_printf("RunRead: Error, MPD_RIGHT_SOCKET read '%s'\n", p->pszIn);	    HandleRightRead(p);	    break;	case MPD_CONSOLE_SOCKET:	    //dbg_printf("__console socket read '%s'\n", p->pszIn);	    HandleConsoleRead(p);	    break;	default:	    err_printf("string '%s' read on socket %d of unknown type %d\n", p->pszIn, p->sock, p->nType);	    break;	}    }    else    {	error = WSAGetLastError();	err_printf("RunRead: ReadString failed for %s(%d), error %d\n", ContextTypeToString(p), p->sock, error);	p->bDeleteMe = true;	p->nState = MPD_INVALID;    }    return TRUE;}void RunWorkerThread(){    DWORD dwKey, nBytes;    OVERLAPPED *p_Ovl;    int error;    MPD_Context *pContext;    int ret_val;        while (true)    {	if (GetQueuedCompletionStatus(g_hCommPort, &nBytes, &dwKey, &p_Ovl, INFINITE))	{	    //dbg_printf("RunWorkerThread::%d bytes\n", nBytes);	    if (dwKey == EXIT_WORKER_KEY)		ExitThread(0);	    pContext = (MPD_Context*)dwKey;	    if (nBytes)	    {		if (nBytes == 1)		{		    pContext->bReadPosted = false;		    if (!RunRead(pContext, &ret_val))			ErrorExit("RunRead returned FALSE", ret_val);		    if (pContext->bDeleteMe)		    {			CheckContext(pContext);			RemoveContext(pContext);			pContext = NULL;		    }		    else		    {			// post the next read			error = PostContextRead(pContext);			if (error)			{			    if (error == ERROR_NETNAME_DELETED || error == ERROR_IO_PENDING || error == WSAECONNABORTED)				dbg_printf("RunWorkerThread:Post read for %s(%d) failed, error %d\n", ContextTypeToString(pContext), pContext->sock, error);			    else				err_printf("RunWorkerThread:Post read for %s(%d) failed, error %d\n", ContextTypeToString(pContext), pContext->sock, error);			    CheckContext(pContext);			    RemoveContext(pContext);			    pContext = NULL;			}		    }		}		else		{		    dbg_printf("RunWorkerThread: nBytes = %d, *** unexpected ***\n", nBytes);		    error = PostContextRead(pContext);		    if (error)		    {			err_printf("RunWorkerThread:Post read for %s(%d) failed, error %d\n", ContextTypeToString(pContext), pContext->sock, error);			CheckContext(pContext);			RemoveContext(pContext);			pContext = NULL;		    }		}	    }	    else	    {		dbg_printf("RunWorkerThread::closing context %s(%d)\n", ContextTypeToString(pContext), pContext->sock);		CheckContext(pContext);		RemoveContext(pContext);		pContext = NULL;	    }	}	else	{	    error = GetLastError();	    if (error == ERROR_NETNAME_DELETED || error == ERROR_IO_PENDING || error == WSAECONNABORTED)	    {		dbg_printf("RunWorkerThread: GetQueuedCompletionStatus failed, error %d\n", error);	    }	    else	    {		err_printf("RunWorkerThread: GetQueuedCompletionStatus failed, error %d\n", error);	    }	    //return;	}    }}// Run ///////////////////////////////////////////////////////////////////////////int Run(){    SOCKET listen_socket;    HANDLE ahEvent[2];    int error = 0, num_handles=2;    SOCKET temp_socket;    DWORD ret_val;    int i;    BOOL opt;    DWORD dwThreadID;    char host[100];    int listen_port;    HANDLE *hWorkers;    easy_get_ip(&g_nIP);    easy_get_ip_string(g_pszIP);    if (ConnectToSelf() == false)	ErrorExit("Run: ConnectToSelf failed\n");    if (!g_bStartAlone)    {	if (stricmp(g_pszHost, g_pszInsertHost))	{	    if (InsertIntoRing(g_pszInsertHost, false) == false)	    {		if (stricmp(g_pszHost, g_pszInsertHost2))		    InsertIntoRing(g_pszInsertHost2, false);	    }	}    }    ahEvent[0] = g_hCommPortEvent;    // create a listening socket    if (error = easy_create(&listen_socket, g_nPort))	ErrorExit("Run: easy_create(listen socket) failed", error);    ahEvent[1] = WSACreateEvent();    // associate listen_socket_event with listen_socket    if (WSAEventSelect(listen_socket, ahEvent[1], FD_ACCEPT) == SOCKET_ERROR)	ErrorExit("Run: WSAEventSelect failed for listen_socket", 1);    if (listen(listen_socket, SOMAXCONN) == SOCKET_ERROR)	ErrorExit("Run: listen failed", WSAGetLastError());    // get the port and local hostname for the listening socket    if (error = easy_get_sock_info(listen_socket, host, &listen_port))	ErrorExit("Run: Unable to get host and port of listening socket", error);    dbg_printf("%s:%d\n", host, listen_port);    // Create the completion port    g_hCommPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, g_NumCommPortThreads);    if (g_hCommPort == NULL)	ErrorExit("Run: CreateIoCompletionPort failed", GetLastError());    // Start the completion port threads    hWorkers = new HANDLE[g_NumCommPortThreads];    for (i=0; i<g_NumCommPortThreads; i++)    {	hWorkers[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)RunWorkerThread, NULL, 0, &dwThreadID);	if (hWorkers[i] == NULL)	    ErrorExit("Run: CreateThread(RunWorkerThread) failed", GetLastError());    }    // associate the left and right contexts with the completion port    if (CreateIoCompletionPort((HANDLE)g_pRightContext->sock, g_hCommPort, (DWORD)g_pRightContext, g_NumCommPortThreads) == NULL)	ErrorExit("Run: Unable to associate completion port with socket", GetLastError());    if (CreateIoCompletionPort((HANDLE)g_pLeftContext->sock, g_hCommPort, (DWORD)g_pLeftContext, g_NumCommPortThreads) == NULL)	ErrorExit("Run: Unable to associate completion port with socket", GetLastError());    // post the first reads on the left and right contexts    error = PostContextRead(g_pRightContext);    if (error)	ErrorExit("Run:First posted read for g_pRightContext failed, error %d", error);    error = PostContextRead(g_pLeftContext);    if (error)	ErrorExit("Run:First posted read for g_pLeftContext failed, error %d", error);    // loop, accepting new connections, until g_hCommPortEvent is signalled    while (true)    {	ret_val = WaitForMultipleObjects(num_handles, ahEvent, FALSE, INFINITE);	if (ret_val != WAIT_OBJECT_0 && ret_val != WAIT_OBJECT_0+1)	{	    ErrorExit("Run: Wait failed, restarting mpd...", GetLastError());	    return RUN_RESTART;	}	// Event[0] is the event used by the ServiceStop thread to communicate with this thread	if (WaitForSingleObject(ahEvent[0], 0) == WAIT_OBJECT_0)	{	    dbg_printf("Run exiting\n");	    for (i=0; i<g_NumCommPortThreads; i++)		PostQueuedCompletionStatus(g_hCommPort, 0, EXIT_WORKER_KEY, NULL);	    for (i=0; i<g_NumCommPortThreads; i++)	    {		WaitForSingleObject(hWorkers[i], INFINITE);		CloseHandle(hWorkers[i]);	    }	    delete hWorkers;	    CloseHandle(g_hCommPortEvent); 	    CloseHandle(g_hCommPort);	    closesocket(listen_socket);	    WSACloseEvent(ahEvent[1]);	    // cleanup everything	    ShutdownAllProcesses();	    AbortAllForwarders();	    RemoveAllTmpFiles();	    RemoveAllCachedUsers();	    return RUN_EXIT;	}	// Event[1] is the listen socket event, which is signalled when other processes whish to establish a socket connection with this mpd	if (WaitForSingleObject(ahEvent[1], 0) == WAIT_OBJECT_0)	{	    i=0;	    opt = TRUE;	    // Something in my code is causing the listen_socket event to fail to be reset by the accept call.	    // For now I manually reset it here.	    WSAResetEvent(ahEvent[1]);	    temp_socket = accept(listen_socket, NULL, NULL);	    if (temp_socket == INVALID_SOCKET)		ErrorExit("Run: accept failed", WSAGetLastError());	    dbg_printf("socket accepted: %d\n", temp_socket);	    if (setsockopt(temp_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(BOOL)) == SOCKET_ERROR)		ErrorExit("Run: setsockopt failed", WSAGetLastError());	    MPD_Context *pContext = CreateContext();	    pContext->nLLState = MPD_AUTHENTICATE_WRITING_APPEND;	    pContext->sock = temp_socket;	    if (AuthenticateAcceptedConnection(&pContext))	    {		// Associate the socket with the completion port		if (CreateIoCompletionPort((HANDLE)temp_socket, g_hCommPort, (DWORD)pContext, g_NumCommPortThreads) == NULL)		    ErrorExit("Run: Unable to associate completion port with socket", GetLastError());				// Post the first read from the socket		error = PostContextRead(pContext);		if (error)		{		    ErrorExit("Run: First posted read failed, error %d", error);		}	    }	    else	    {		dbg_printf("Run: AuthenticateConnection failed.\n");	    }	}    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -