⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 forwarder.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
字号:
#include "GetStringOpt.h"#include "mpdimpl.h"#include <stdio.h>HANDLE g_hForwarderMutex = NULL;/*#define dbg_printf err_printf*/struct ForwarderEntry{    ForwarderEntry();    ~ForwarderEntry();    char pszFwdHost[MAX_HOST_LENGTH];    int nFwdPort;    int nPort;    SOCKET sockStop;    ForwarderEntry *pNext;};ForwarderEntry::ForwarderEntry(){    sockStop = INVALID_SOCKET;}ForwarderEntry::~ForwarderEntry(){    if (sockStop != INVALID_SOCKET)	easy_closesocket(sockStop);    sockStop = INVALID_SOCKET;}struct ForwardIOThreadArg{    ForwardIOThreadArg();    ~ForwardIOThreadArg();    SOCKET sockStop;    SOCKET sockListen;    SOCKET sockForward;    int nPort;};ForwardIOThreadArg::ForwardIOThreadArg(){    sockStop = INVALID_SOCKET;    sockListen = INVALID_SOCKET;    sockForward = INVALID_SOCKET;    nPort = 0;}ForwardIOThreadArg::~ForwardIOThreadArg(){    if (sockStop != INVALID_SOCKET)	easy_closesocket(sockStop);    sockStop = INVALID_SOCKET;    if (sockListen != INVALID_SOCKET)	easy_closesocket(sockListen);    sockListen = INVALID_SOCKET;    if (sockForward != INVALID_SOCKET)	easy_closesocket(sockForward);    sockForward = INVALID_SOCKET;}ForwarderEntry *g_pForwarderList = NULL;static void ForwarderToString(ForwarderEntry *p, char *pszStr, int length){    if (!snprintf_update(pszStr, length, "FORWARDER:\n"))	return;    if (!snprintf_update(pszStr, length, " inport: %d\n outhost: %s:%d\n stop socket: %d\n",	p->nPort, p->pszFwdHost, p->nFwdPort, p->sockStop))	return;}void statForwarders(char *pszOutput, int length){    ForwarderEntry *p;    *pszOutput = '\0';    length--; // leave room for the null character    if (g_pForwarderList == NULL)	return;    WaitForSingleObject(g_hForwarderMutex, INFINITE);    p = g_pForwarderList;    while (p)    {	ForwarderToString(p, pszOutput, length);	length = length - strlen(pszOutput);	pszOutput = &pszOutput[strlen(pszOutput)];	p = p->pNext;    }    ReleaseMutex(g_hForwarderMutex);}void ConcatenateForwardersToString(char *pszStr){    char pszLine[100];        WaitForSingleObject(g_hForwarderMutex, INFINITE);    ForwarderEntry *p = g_pForwarderList;    while (p)    {	_snprintf(pszLine, 100, "%s:%d -> %s:%d\n", g_pszHost, p->nPort, p->pszFwdHost, p->nFwdPort);	strncat(pszStr, pszLine, MAX_CMD_LENGTH - 1 - strlen(pszStr));	p = p->pNext;    }    ReleaseMutex(g_hForwarderMutex);}static void RemoveForwarder(int nPort){    WaitForSingleObject(g_hForwarderMutex, INFINITE);    ForwarderEntry *pEntry = g_pForwarderList;    if (pEntry != NULL)    {	if (pEntry->nPort == nPort)	{	    g_pForwarderList = g_pForwarderList->pNext;	    delete pEntry;	    ReleaseMutex(g_hForwarderMutex);	    return;	}	while (pEntry->pNext)	{	    if (pEntry->pNext->nPort == nPort)	    {		ForwarderEntry *pTemp = pEntry->pNext;		pEntry->pNext = pEntry->pNext->pNext;		delete pTemp;		ReleaseMutex(g_hForwarderMutex);		return;	    }	    pEntry = pEntry->pNext;	}    }    ReleaseMutex(g_hForwarderMutex);}static void MakeLoop(SOCKET *psockRead, SOCKET *psockWrite){    SOCKET sock;    char host[100];    int port;    // Create a listener    if (easy_create(&sock, ADDR_ANY, INADDR_ANY) == SOCKET_ERROR)    {	*psockRead = INVALID_SOCKET;	*psockWrite = INVALID_SOCKET;	return;    }    listen(sock, 5);    easy_get_sock_info(sock, host, &port);        // Connect to myself    if (easy_create(psockWrite, ADDR_ANY, INADDR_ANY) == SOCKET_ERROR)    {	easy_closesocket(sock);	*psockRead = INVALID_SOCKET;	*psockWrite = INVALID_SOCKET;	return;    }    if (easy_connect(*psockWrite, host, port) == SOCKET_ERROR)    {	easy_closesocket(*psockWrite);	easy_closesocket(sock);	*psockRead = INVALID_SOCKET;	*psockWrite = INVALID_SOCKET;	return;    }    // Accept the connection from myself    *psockRead = easy_accept(sock);    easy_closesocket(sock);}static int ReadWriteAlloc(SOCKET sock, SOCKET sockForward, int n){    int num_to_receive, num_received;    char *pBuffer;    pBuffer = new char[n + sizeof(int) + sizeof(char) + sizeof(int)];    *(int*)pBuffer = n;    num_to_receive = n + sizeof(int) + sizeof(char);        num_received = easy_receive(sock, &pBuffer[sizeof(int)], num_to_receive);    if (num_received == SOCKET_ERROR || num_received == 0)    {	delete pBuffer;	return SOCKET_ERROR;    }    if (easy_send(sockForward, pBuffer, num_received + sizeof(int)) == SOCKET_ERROR)    {	delete pBuffer;	return SOCKET_ERROR;    }    delete pBuffer;    return 0;}static int ReadWrite(SOCKET sock, SOCKET sockForward, int n){    int num_to_receive, num_received;    char pBuffer[1024+sizeof(int)+sizeof(char)+sizeof(int)];    if (n > 1024)	return ReadWriteAlloc(sock, sockForward, n);    *(int*)pBuffer = n;    num_to_receive = n + sizeof(int) + sizeof(char);        num_received = easy_receive(sock, &pBuffer[sizeof(int)], num_to_receive);    if (num_received == SOCKET_ERROR || num_received == 0)    {	return SOCKET_ERROR;    }    if (easy_send(sockForward, pBuffer, num_received + sizeof(int)) == SOCKET_ERROR)    {	return SOCKET_ERROR;    }    return 0;}void ForwardIOThread(ForwardIOThreadArg *pArg){    SOCKET client_sock, stop_sock, listen_sock, forward_sock;    int n, i;    DWORD num_read;    fd_set total_set, readset;    SOCKET sockActive[FD_SETSIZE];    int nActive = 0;    int nDatalen;    bool bDeleteOnEmpty = false;    int nPort;        listen_sock = pArg->sockListen;    pArg->sockListen = INVALID_SOCKET;    stop_sock = pArg->sockStop;    pArg->sockStop = INVALID_SOCKET;    forward_sock = pArg->sockForward;    pArg->sockForward = INVALID_SOCKET;    nPort = pArg->nPort;    delete pArg;    pArg = NULL;    FD_ZERO(&total_set);        FD_SET(listen_sock, &total_set);    FD_SET(stop_sock, &total_set);    FD_SET(forward_sock, &total_set);    while (true)    {	readset = total_set;	dbg_printf("ForwardIOThread: select, nActive %d\n", nActive);	n = select(0, &readset, NULL, NULL, NULL);	if (n == SOCKET_ERROR)	{	    err_printf("ForwardIOThread: select failed, error %d\n", WSAGetLastError());	    break;	}	if (n == 0)	{	    err_printf("ForwardIOThread: select returned zero sockets available\n");	    break;	}	else	{	    if (FD_ISSET(stop_sock, &readset))	    {		char c;		num_read = easy_receive(stop_sock, &c, 1);		if (num_read == SOCKET_ERROR || num_read == 0)		    break;		if (c == 0)		{		    if (nActive == 0)		    {			dbg_printf("ForwardIOThread: %d breaking\n", nPort);			break;		    }		    dbg_printf("ForwardIOThread: ------ %d signalled to exit on empty, %d sockets remaining\n", nPort, nActive);		    //if (total_set.fd_count == 3)			//err_printf("ForwardIOThread: ERROR: total_set is empty\n");		    bDeleteOnEmpty = true;		}		else		{		    dbg_printf("ForwardIOThread: aborting forwarder %d\n", nPort);		    break;		}		n--;	    }	    if (FD_ISSET(listen_sock, &readset))	    {		if ((nActive + 3) >= FD_SETSIZE)		{		    client_sock = easy_accept(listen_sock);		    easy_closesocket(client_sock);		    dbg_printf("ForwardIOThread: too many clients connecting to the forwarder, connect rejected: nActive = %d\n", nActive);		}		else		{		    client_sock = easy_accept(listen_sock);		    if (client_sock == INVALID_SOCKET)		    {			int error = WSAGetLastError();			err_printf("ForwardIOThread: easy_accept failed: %d\n", error);			break;		    }		    		    char cType;		    if (easy_receive(client_sock, &cType, sizeof(char)) == SOCKET_ERROR)		    {			int error = WSAGetLastError();			err_printf("ForwardIOThread: easy_receive failed, error %d\n", error);			break;		    }		    		    if (cType == 0)		    {			easy_closesocket(client_sock);			err_printf("ForwardIOThread: stdin redirection not handled by forwarder thread, socket closed.\n");		    }		    else		    {			sockActive[nActive] = client_sock;			FD_SET(client_sock, &total_set);			nActive++;			dbg_printf("ForwardIOThread: %d adding socket %d (+%d)\n", nPort, client_sock, nActive);		    }		}		n--;	    }	    if (FD_ISSET(forward_sock, &readset))	    {		easy_closesocket(forward_sock);		err_printf("ForwardIOThread: forward socket unexpectedly closed\n");		break;	    }	    if (n > 0)	    {		if (nActive < 1)		{		    err_printf("ForwardIOThread: Error, n=%d while nActive=%d\n", n, nActive);		    break;		}		else		{		    for (i=0; n > 0; i++)		    {			if (FD_ISSET(sockActive[i], &readset))			{			    num_read = easy_receive(sockActive[i], (char*)&nDatalen, sizeof(int));			    if (num_read == SOCKET_ERROR || num_read == 0)			    {				dbg_printf("ForwardIOThread: port %d, removing socket[%d]=%d (%d active)\n", nPort, i, sockActive[i], nActive);				FD_CLR(sockActive[i], &total_set);				easy_closesocket(sockActive[i]);				nActive--;				sockActive[i] = sockActive[nActive];				i--;			    }			    else			    {				if (ReadWrite(sockActive[i], forward_sock, nDatalen) == SOCKET_ERROR)				{				    dbg_printf("ForwardIOThread: port %d, abandoning socket[%d]=%d (%d active)\n", nPort, i, sockActive[i], nActive);				    FD_CLR(sockActive[i], &total_set);				    easy_closesocket(sockActive[i]);				    nActive--;				    sockActive[i] = sockActive[nActive];				    i--;				}			    }			    n--;			}		    }		}	    }	    if (nActive == 0 && bDeleteOnEmpty)	    {		dbg_printf("ForwardIOThread: %d breaking on empty\n", nPort);		break;	    }	}    }    easy_closesocket(forward_sock);    easy_closesocket(stop_sock);    for (i=0; i<nActive; i++)	easy_closesocket(sockActive[i]);    easy_closesocket(listen_sock);    RemoveForwarder(nPort);    dbg_printf("ForwardIOThread: %d exiting\n", nPort);    return;}int CreateIOForwarder(char *pszFwdHost, int nFwdPort){    int error;    char pszHost[100];    HANDLE hThread;    DWORD dwThreadId;    ForwarderEntry *pEntry;    ForwardIOThreadArg *pArg;    int nPort;    char ch = 1;    pArg = new ForwardIOThreadArg;    // Connect to the forwardee    if (easy_create(&pArg->sockForward, ADDR_ANY, INADDR_ANY) == SOCKET_ERROR)    {	error = WSAGetLastError();	err_printf("CreateIOForwarder: easy_create failed: error %d\n", error);	delete pArg;	return INVALID_SOCKET;    }    if (easy_connect(pArg->sockForward, pszFwdHost, nFwdPort) == SOCKET_ERROR)    {	error = WSAGetLastError();	err_printf("CreateIOForwarder: easy_connect(%s:%d) failed: error %d\n", pszFwdHost, nFwdPort, error);	delete pArg;	return INVALID_SOCKET;    }    if (easy_send(pArg->sockForward, &ch, sizeof(char)) == SOCKET_ERROR)    {	error = WSAGetLastError();	err_printf("CreateIOForwarder: easy_send failed: error %d\n", error);	delete pArg;	return INVALID_SOCKET;    }    pEntry = new ForwarderEntry;    // Save the forwardee stuff.  Used only by the forwarders command.    strncpy(pEntry->pszFwdHost, pszFwdHost, MAX_HOST_LENGTH);    pEntry->nFwdPort = nFwdPort;    // Create a listener    if (easy_create(&pArg->sockListen, ADDR_ANY, INADDR_ANY) == SOCKET_ERROR)    {	error = WSAGetLastError();	err_printf("CreateIOForwarder: easy_create listen socket failed: error %d\n", error);	delete pEntry;	delete pArg;	return INVALID_SOCKET;    }    listen(pArg->sockListen, 10);    easy_get_sock_info(pArg->sockListen, pszHost, &pEntry->nPort);    nPort = pEntry->nPort;    dbg_printf("create forwarder %s:%d -> %s:%d\n", pszHost, pEntry->nPort, pszFwdHost, nFwdPort);    // Create a stop signal socket    MakeLoop(&pArg->sockStop, &pEntry->sockStop);    if (pArg->sockStop == INVALID_SOCKET || pEntry->sockStop == INVALID_SOCKET)    {	delete pEntry;	delete pArg;	return INVALID_SOCKET;    }    // Let the forward thread know what port it is connected to    pArg->nPort = nPort;    // Create the forwarder thread    hThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ForwardIOThread, pArg, 0, &dwThreadId);    if (hThread == NULL)    {	error = GetLastError();	err_printf("CreateIOForwarder: CreateThread failed, error %d\n", error);	delete pEntry;	delete pArg;	return INVALID_SOCKET;    }    CloseHandle(hThread);        // Add the new entry to the list    WaitForSingleObject(g_hForwarderMutex, INFINITE);    pEntry->pNext = g_pForwarderList;    g_pForwarderList = pEntry;    ReleaseMutex(g_hForwarderMutex);    return nPort;}void StopIOForwarder(int nPort, bool bWaitForEmpty){    WaitForSingleObject(g_hForwarderMutex, INFINITE);    ForwarderEntry *pEntry = g_pForwarderList;    while (pEntry)    {	if (pEntry->nPort == nPort)	{	    if (bWaitForEmpty)	    {		char ch = 0;		easy_send(pEntry->sockStop, &ch, 1);		ReleaseMutex(g_hForwarderMutex);	    }	    else	    {		int nPort = pEntry->nPort;		easy_send(pEntry->sockStop, "x", 1);		easy_closesocket(pEntry->sockStop);		pEntry->sockStop = INVALID_SOCKET;		ReleaseMutex(g_hForwarderMutex);		RemoveForwarder(nPort);	    }	    return;	}	pEntry = pEntry->pNext;    }    ReleaseMutex(g_hForwarderMutex);    err_printf("StopIOForwarder: forwarder port %d not found\n", nPort);}void AbortAllForwarders(){    int nPort;        while (g_pForwarderList)    {	WaitForSingleObject(g_hForwarderMutex, INFINITE);	if (g_pForwarderList)	{	    nPort = g_pForwarderList->nPort;	    ReleaseMutex(g_hForwarderMutex);	    StopIOForwarder(g_pForwarderList->nPort, false);	}	else	    ReleaseMutex(g_hForwarderMutex);    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -