vmpi_filesystem.cpp

来自「hl2 source code. Do not use it illegal.」· C++ 代码 · 共 1,653 行 · 第 1/4 页

CPP
1,653
字号

#include <winsock2.h>
#include "filesystem.h"
#include "vmpi_filesystem.h"
#include "tier0/dbg.h"
#include "vstdlib/strtools.h"
#include "vstdlib/random.h"
#include "messbuf.h"
#include "vmpi_dispatch.h"
#include "utllinkedlist.h"
#include <io.h>
#include <sys/stat.h>
#include "vmpi.h"
#include "utlvector.h"
#include "filesystem_tools.h"
#include "zlib.h"
#include "iphelpers.h"
#include "vmpi_tools_shared.h"


#define MULTICAST_CHUNK_PAYLOAD_SIZE	(1024*1)
#define MULTICAST_TRANSMIT_RATE			(1024*1024*6)	// N megs per second
#define MINIMUM_SLEEP_MS				1

#define NUM_BUFFERED_CHUNK_ACKS	512
#define ACK_FLUSH_INTERVAL		500	// Flush the ack queue twice per second.
												  

#define VMPI_PACKETID_FILESYSTEM	0	// The file system reserves this packet ID.
										// All application traffic must set its first byte to something other
										// than this value.

// Sub packet IDs specific to the VMPI file system.
#define VMPI_FSPACKETID_FILE_REQUEST	1	// Sent by the worker to request a file.
#define VMPI_FSPACKETID_FILE_RESPONSE	2	// Master's response to a file request.
#define VMPI_FSPACKETID_CHUNK_RECEIVED	3	// Sent by workers to tell the master they received a chunk.
#define VMPI_FSPACKETID_FILE_RECEIVED	4	// Sent by workers to tell the master they received the whole file.


char g_cPacketID_FileSystem = VMPI_PACKETID_FILESYSTEM;


extern void IPAddrToSockAddr( const CIPAddr *pIn, sockaddr_in *pOut );


bool g_bDisableFileAccess = false;
static IVMPIFileSystemHelpers *g_pHelpers = NULL;



// This does a fast zlib compression of the source data into the 'out' buffer.
bool ZLibCompress( const void *pData, int len, CUtlVector<char> &out )
{
	int outStartLen = len;
RETRY:;

	// Prepare the compression stream.
	z_stream zs;
	memset( &zs, 0, sizeof( zs ) );
	
	if ( deflateInit( &zs, 1 ) != Z_OK )
		return false;
	

	// Now compress it into the output buffer.
	out.SetSize( outStartLen );

	zs.next_in = (unsigned char*)pData;
	zs.avail_in = len;

	zs.next_out = (unsigned char*)out.Base();
	zs.avail_out = out.Count();

	int ret = deflate( &zs, Z_FINISH );
	deflateEnd( &zs );

	if ( ret == Z_STREAM_END )
	{
		// Get rid of whatever was left over.
		out.RemoveMultiple( zs.total_out, out.Count() - zs.total_out );
		return true;
	}
	else if ( ret == Z_OK )
	{
		// Need more space in the output buffer.
		outStartLen += 1024 * 128;
		goto RETRY;
	}
	else
	{
		return false;
	}
}


bool ZLibDecompress( const void *pInput, int inputLen, void *pOut, int outLen )
{
	z_stream decompressStream;
	
	// Initialize the decompression stream.
	memset( &decompressStream, 0, sizeof( decompressStream ) );
	if ( inflateInit( &decompressStream ) != Z_OK )
		return false;

	// Decompress all this stuff and write it to the file.
	decompressStream.next_in = (unsigned char*)pInput;
	decompressStream.avail_in = inputLen;

	char *pOutChar = (char*)pOut;
	while ( decompressStream.avail_in )
	{
		decompressStream.total_out = 0;
		decompressStream.next_out = (unsigned char*)pOutChar;
		decompressStream.avail_out = outLen - (pOutChar - (char*)pOut);

		int ret = inflate( &decompressStream, Z_NO_FLUSH );
		if ( ret != Z_OK && ret != Z_STREAM_END )
			return false;


		pOutChar += decompressStream.total_out;

		if ( ret == Z_STREAM_END )
		{
			if ( (pOutChar - (char*)pOut) == outLen )
			{
				return true;
			}
			else
			{
				Assert( false );
				return false;
			}
		}
	}

	Assert( false ); // Should have gotten to Z_STREAM_END.
	return false;
}



// ----------------------------------------------------------------------------- //
// This is the thread that sits and multicasts any chunks of files that need to
// be sent out to clients.
// ----------------------------------------------------------------------------- //

class CMulticastFileInfo
{
public:
	unsigned long m_CompressedSize;
	unsigned long m_UncompressedSize;
	unsigned short m_FileID;
	unsigned short m_nChunks;
};


class CMasterMulticastThread
{
public:

					CMasterMulticastThread();
					~CMasterMulticastThread();

	// This creates the socket and starts the thread (initially in an idle state since it doesn't
	// know of any files anyone wants).
	bool			Init( unsigned short localPort, const CIPAddr *pAddr );
	void			Term();

	// Returns -1 if there is an error.
	int				FindOrAddFile( const char *pFilename );	
	const CUtlVector<char>& GetFileData( int iFile ) const;
	
	// When a client requests a files, this is called to tell the thread to start
	// adding chunks from the specified file into the queue it's multicasting.
	//
	// Returns -1 if the file isn't there. Otherwise, it returns the file ID
	// that will be sent along with the file's chunks in the multicast packets.
	int				AddFileRequest( const char *pFilename, int clientID );
	
	// As each client receives multicasted chunks, they ack them so the master can
	// stop transmitting any chunks it knows nobody wants.
	void			OnChunkReceived( int fileID, int clientID, int iChunk );
	void			OnFileReceived( int fileID, int clientID );

	// Call this if a client disconnects so it can stop sending anything this client wants.
	void			OnClientDisconnect( int clientID );

	void CreateVirtualFile( const char *pFilename, const void *pData, unsigned long fileLength );

private:

	class CChunkInfo
	{
	public:
		unsigned short	m_iChunk;
		unsigned short	m_RefCount;				// How many clients want this chunk.
		unsigned short	m_iActiveChunksIndex;	// Index into m_ActiveChunks.
	};


	// This stores a client's reference to a file so it knows which pieces of the file the client needs.
	class CClientFileInfo
	{
	public:
		bool NeedsChunk( int i ) const { return (m_ChunksToSend[i>>3] & (1 << (i&7))) != 0; }	
	
	public:
		int							m_ClientID;
		CUtlVector<unsigned char>	m_ChunksToSend;	// One bit for each chunk that this client still wants.
		int m_nChunksLeft;
	};


	class CMulticastFile
	{
	public:
		~CMulticastFile()
		{
			m_Clients.PurgeAndDeleteElements();
		}

		const char* GetFilename() { return m_Filename.Base(); }


	public:
		int m_nCycles; // How many times has the multicast thread visited this file?

		// This is sent along with every packet. If a client gets a chunk and doesn't have that file's
		// info, the client will receive that file too.
		CUtlVector<char> m_Filename;
		CMulticastFileInfo m_Info;

		// This is stored so the app can read out the uncompressed data.
		CUtlVector<char>				m_UncompressedData;

		// zlib-compressed file data
		CUtlVector<char>				m_Data;	

		// m_Chunks holds the chunks by index.
		// m_ActiveChunks holds them sorted by whether they're active or not.
		// 
		// Each chunk has a refcount. While the refcount is > 0, the chunk is in the first
		// half of m_ActiveChunks. When the refcount gets to 0, the chunk is moved to the end of 
		// m_ActiveChunks. That way, we can iterate through the chunks that need to be sent and 
		// stop iterating the first time we hit one with a refcount of 0.
		CUtlVector<CChunkInfo>			m_Chunks;
		CUtlLinkedList<CChunkInfo*,int>	m_ActiveChunks;

		// This tells which clients want pieces of this file.
		CUtlLinkedList<CClientFileInfo*,int>	m_Clients;
	};


private:

	static DWORD WINAPI StaticMulticastThread( LPVOID pParameter );
	DWORD MulticastThread();

	// Called after pFile->m_UncompressedData has been setup. This compresses the data, prepares the header,
	// copies the filename, and adds it into the queue for the multicast thread.
	int FinishFileSetup( CMulticastFile *pFile, const char *pFilename );

	void IncrementChunkRefCount( CMasterMulticastThread::CMulticastFile *pFile, int iChunk );
	void DecrementChunkRefCount( int iFile, int iChunk );
	
	int FindFile( const char *pName );

	bool FindWarningSuppression( const char *pFilename );
	void AddWarningSuppression( const char *pFilename );

private:
	
	CUtlLinkedList<CMulticastFile*,int>		m_Files;

	// This tracks how many chunks we have that want to be sent.
	int m_nTotalActiveChunks;

	SOCKET m_Socket;
	sockaddr_in m_MulticastAddr;
	
	HANDLE m_hThread;
	CRITICAL_SECTION m_CS;

	// Events used to communicate with our thread.
	HANDLE m_hTermEvent;

	// The thread walks through this as it spews chunks of data.
	volatile int m_iCurFile;			// Index into m_Files.
	volatile int m_iCurActiveChunk;		// Current index into CMulticastFile::m_ActiveChunks.

	CUtlLinkedList<char*,int> m_WarningSuppressions;
};


CMasterMulticastThread::CMasterMulticastThread()
{
	m_hThread = NULL;
	m_Socket = INVALID_SOCKET;
	m_nTotalActiveChunks = 0;
	m_iCurFile = m_iCurActiveChunk = -1;
	
	m_hTermEvent = CreateEvent( NULL, FALSE, FALSE, NULL );
	InitializeCriticalSection( &m_CS );
}


CMasterMulticastThread::~CMasterMulticastThread()
{
	Term();
	
	CloseHandle( m_hTermEvent );

	DeleteCriticalSection( &m_CS );
}


bool CMasterMulticastThread::Init( unsigned short localPort, const CIPAddr *pAddr )
{
	Term();

	// First, create our socket.
	m_Socket = socket( AF_INET, SOCK_DGRAM, IPPROTO_IP );
	if ( m_Socket == INVALID_SOCKET )
	{
		Warning( "CMasterMulticastThread::Init - socket() failed\n" );
		return false;
	}

		
	// Bind to INADDR_ANY.
	CIPAddr localAddr( 0, 0, 0, 0, localPort );
	
	sockaddr_in addr;
	IPAddrToSockAddr( &localAddr, &addr );

	int status = bind( m_Socket, (sockaddr*)&addr, sizeof(addr) );
	if ( status != 0 )
	{
		Term();
		Warning( "CMasterMulticastThread::Init - bind( %d.%d.%d.%d:%d ) failed\n", EXPAND_ADDR( *pAddr ) );
		return false;
	}

	
	// Remember the address we want to send to.
	IPAddrToSockAddr( pAddr, &m_MulticastAddr );
	

	// Now create our thread.
	DWORD dwThreadID = 0;
	m_hThread = CreateThread( NULL, 0, &CMasterMulticastThread::StaticMulticastThread, this, 0, &dwThreadID );
	if ( !m_hThread )
	{
		Term();
		Warning( "CMasterMulticastThread::Init - CreateThread failed\n", EXPAND_ADDR( *pAddr ) );
		return false;
	}

	SetThreadPriority( m_hThread, THREAD_PRIORITY_LOWEST );
	return true;
}


void CMasterMulticastThread::Term()
{
	// Stop the thread if it is running.
	if ( m_hThread )
	{
		SetEvent( m_hTermEvent );
		WaitForSingleObject( m_hThread, INFINITE );
		CloseHandle( m_hThread );

		m_hThread = NULL;
	}

	// Close the socket.
	closesocket( m_Socket );
	m_Socket = INVALID_SOCKET;

	// Free up other data.
	m_Files.PurgeAndDeleteElements();
}


int CMasterMulticastThread::AddFileRequest( const char *pFilename, int clientID )
{
	// Firstly, do we already have this file?
	int iFile = FindOrAddFile( pFilename );
	if ( iFile == -1 )
		return -1;

	CMulticastFile *pFile = m_Files[iFile];

	// Now that we have a file setup, merge in this client's info.
	EnterCriticalSection( &m_CS );

		CClientFileInfo *pClient = new CClientFileInfo;
		pClient->m_ClientID = clientID;
		pClient->m_nChunksLeft = pFile->m_Info.m_nChunks;
		pClient->m_ChunksToSend.SetSize( PAD_NUMBER( pFile->m_Info.m_nChunks, 8 ) / 8 );
		memset( pClient->m_ChunksToSend.Base(), 0xFF, pClient->m_ChunksToSend.Count() );
		pFile->m_Clients.AddToTail( pClient );

		for ( int i=0; i < pFile->m_Chunks.Count(); i++ )
		{
			IncrementChunkRefCount( pFile, i );
		}
	
	LeaveCriticalSection( &m_CS );

	return iFile;
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?