vmpi_filesystem.cpp
来自「hl2 source code. Do not use it illegal.」· C++ 代码 · 共 1,653 行 · 第 1/4 页
CPP
1,653 行
#include <winsock2.h>
#include "filesystem.h"
#include "vmpi_filesystem.h"
#include "tier0/dbg.h"
#include "vstdlib/strtools.h"
#include "vstdlib/random.h"
#include "messbuf.h"
#include "vmpi_dispatch.h"
#include "utllinkedlist.h"
#include <io.h>
#include <sys/stat.h>
#include "vmpi.h"
#include "utlvector.h"
#include "filesystem_tools.h"
#include "zlib.h"
#include "iphelpers.h"
#include "vmpi_tools_shared.h"
#define MULTICAST_CHUNK_PAYLOAD_SIZE (1024*1)
#define MULTICAST_TRANSMIT_RATE (1024*1024*6) // N megs per second
#define MINIMUM_SLEEP_MS 1
#define NUM_BUFFERED_CHUNK_ACKS 512
#define ACK_FLUSH_INTERVAL 500 // Flush the ack queue twice per second.
#define VMPI_PACKETID_FILESYSTEM 0 // The file system reserves this packet ID.
// All application traffic must set its first byte to something other
// than this value.
// Sub packet IDs specific to the VMPI file system.
#define VMPI_FSPACKETID_FILE_REQUEST 1 // Sent by the worker to request a file.
#define VMPI_FSPACKETID_FILE_RESPONSE 2 // Master's response to a file request.
#define VMPI_FSPACKETID_CHUNK_RECEIVED 3 // Sent by workers to tell the master they received a chunk.
#define VMPI_FSPACKETID_FILE_RECEIVED 4 // Sent by workers to tell the master they received the whole file.
char g_cPacketID_FileSystem = VMPI_PACKETID_FILESYSTEM;
extern void IPAddrToSockAddr( const CIPAddr *pIn, sockaddr_in *pOut );
bool g_bDisableFileAccess = false;
static IVMPIFileSystemHelpers *g_pHelpers = NULL;
// This does a fast zlib compression of the source data into the 'out' buffer.
bool ZLibCompress( const void *pData, int len, CUtlVector<char> &out )
{
int outStartLen = len;
RETRY:;
// Prepare the compression stream.
z_stream zs;
memset( &zs, 0, sizeof( zs ) );
if ( deflateInit( &zs, 1 ) != Z_OK )
return false;
// Now compress it into the output buffer.
out.SetSize( outStartLen );
zs.next_in = (unsigned char*)pData;
zs.avail_in = len;
zs.next_out = (unsigned char*)out.Base();
zs.avail_out = out.Count();
int ret = deflate( &zs, Z_FINISH );
deflateEnd( &zs );
if ( ret == Z_STREAM_END )
{
// Get rid of whatever was left over.
out.RemoveMultiple( zs.total_out, out.Count() - zs.total_out );
return true;
}
else if ( ret == Z_OK )
{
// Need more space in the output buffer.
outStartLen += 1024 * 128;
goto RETRY;
}
else
{
return false;
}
}
bool ZLibDecompress( const void *pInput, int inputLen, void *pOut, int outLen )
{
z_stream decompressStream;
// Initialize the decompression stream.
memset( &decompressStream, 0, sizeof( decompressStream ) );
if ( inflateInit( &decompressStream ) != Z_OK )
return false;
// Decompress all this stuff and write it to the file.
decompressStream.next_in = (unsigned char*)pInput;
decompressStream.avail_in = inputLen;
char *pOutChar = (char*)pOut;
while ( decompressStream.avail_in )
{
decompressStream.total_out = 0;
decompressStream.next_out = (unsigned char*)pOutChar;
decompressStream.avail_out = outLen - (pOutChar - (char*)pOut);
int ret = inflate( &decompressStream, Z_NO_FLUSH );
if ( ret != Z_OK && ret != Z_STREAM_END )
return false;
pOutChar += decompressStream.total_out;
if ( ret == Z_STREAM_END )
{
if ( (pOutChar - (char*)pOut) == outLen )
{
return true;
}
else
{
Assert( false );
return false;
}
}
}
Assert( false ); // Should have gotten to Z_STREAM_END.
return false;
}
// ----------------------------------------------------------------------------- //
// This is the thread that sits and multicasts any chunks of files that need to
// be sent out to clients.
// ----------------------------------------------------------------------------- //
class CMulticastFileInfo
{
public:
unsigned long m_CompressedSize;
unsigned long m_UncompressedSize;
unsigned short m_FileID;
unsigned short m_nChunks;
};
class CMasterMulticastThread
{
public:
CMasterMulticastThread();
~CMasterMulticastThread();
// This creates the socket and starts the thread (initially in an idle state since it doesn't
// know of any files anyone wants).
bool Init( unsigned short localPort, const CIPAddr *pAddr );
void Term();
// Returns -1 if there is an error.
int FindOrAddFile( const char *pFilename );
const CUtlVector<char>& GetFileData( int iFile ) const;
// When a client requests a files, this is called to tell the thread to start
// adding chunks from the specified file into the queue it's multicasting.
//
// Returns -1 if the file isn't there. Otherwise, it returns the file ID
// that will be sent along with the file's chunks in the multicast packets.
int AddFileRequest( const char *pFilename, int clientID );
// As each client receives multicasted chunks, they ack them so the master can
// stop transmitting any chunks it knows nobody wants.
void OnChunkReceived( int fileID, int clientID, int iChunk );
void OnFileReceived( int fileID, int clientID );
// Call this if a client disconnects so it can stop sending anything this client wants.
void OnClientDisconnect( int clientID );
void CreateVirtualFile( const char *pFilename, const void *pData, unsigned long fileLength );
private:
class CChunkInfo
{
public:
unsigned short m_iChunk;
unsigned short m_RefCount; // How many clients want this chunk.
unsigned short m_iActiveChunksIndex; // Index into m_ActiveChunks.
};
// This stores a client's reference to a file so it knows which pieces of the file the client needs.
class CClientFileInfo
{
public:
bool NeedsChunk( int i ) const { return (m_ChunksToSend[i>>3] & (1 << (i&7))) != 0; }
public:
int m_ClientID;
CUtlVector<unsigned char> m_ChunksToSend; // One bit for each chunk that this client still wants.
int m_nChunksLeft;
};
class CMulticastFile
{
public:
~CMulticastFile()
{
m_Clients.PurgeAndDeleteElements();
}
const char* GetFilename() { return m_Filename.Base(); }
public:
int m_nCycles; // How many times has the multicast thread visited this file?
// This is sent along with every packet. If a client gets a chunk and doesn't have that file's
// info, the client will receive that file too.
CUtlVector<char> m_Filename;
CMulticastFileInfo m_Info;
// This is stored so the app can read out the uncompressed data.
CUtlVector<char> m_UncompressedData;
// zlib-compressed file data
CUtlVector<char> m_Data;
// m_Chunks holds the chunks by index.
// m_ActiveChunks holds them sorted by whether they're active or not.
//
// Each chunk has a refcount. While the refcount is > 0, the chunk is in the first
// half of m_ActiveChunks. When the refcount gets to 0, the chunk is moved to the end of
// m_ActiveChunks. That way, we can iterate through the chunks that need to be sent and
// stop iterating the first time we hit one with a refcount of 0.
CUtlVector<CChunkInfo> m_Chunks;
CUtlLinkedList<CChunkInfo*,int> m_ActiveChunks;
// This tells which clients want pieces of this file.
CUtlLinkedList<CClientFileInfo*,int> m_Clients;
};
private:
static DWORD WINAPI StaticMulticastThread( LPVOID pParameter );
DWORD MulticastThread();
// Called after pFile->m_UncompressedData has been setup. This compresses the data, prepares the header,
// copies the filename, and adds it into the queue for the multicast thread.
int FinishFileSetup( CMulticastFile *pFile, const char *pFilename );
void IncrementChunkRefCount( CMasterMulticastThread::CMulticastFile *pFile, int iChunk );
void DecrementChunkRefCount( int iFile, int iChunk );
int FindFile( const char *pName );
bool FindWarningSuppression( const char *pFilename );
void AddWarningSuppression( const char *pFilename );
private:
CUtlLinkedList<CMulticastFile*,int> m_Files;
// This tracks how many chunks we have that want to be sent.
int m_nTotalActiveChunks;
SOCKET m_Socket;
sockaddr_in m_MulticastAddr;
HANDLE m_hThread;
CRITICAL_SECTION m_CS;
// Events used to communicate with our thread.
HANDLE m_hTermEvent;
// The thread walks through this as it spews chunks of data.
volatile int m_iCurFile; // Index into m_Files.
volatile int m_iCurActiveChunk; // Current index into CMulticastFile::m_ActiveChunks.
CUtlLinkedList<char*,int> m_WarningSuppressions;
};
CMasterMulticastThread::CMasterMulticastThread()
{
m_hThread = NULL;
m_Socket = INVALID_SOCKET;
m_nTotalActiveChunks = 0;
m_iCurFile = m_iCurActiveChunk = -1;
m_hTermEvent = CreateEvent( NULL, FALSE, FALSE, NULL );
InitializeCriticalSection( &m_CS );
}
CMasterMulticastThread::~CMasterMulticastThread()
{
Term();
CloseHandle( m_hTermEvent );
DeleteCriticalSection( &m_CS );
}
bool CMasterMulticastThread::Init( unsigned short localPort, const CIPAddr *pAddr )
{
Term();
// First, create our socket.
m_Socket = socket( AF_INET, SOCK_DGRAM, IPPROTO_IP );
if ( m_Socket == INVALID_SOCKET )
{
Warning( "CMasterMulticastThread::Init - socket() failed\n" );
return false;
}
// Bind to INADDR_ANY.
CIPAddr localAddr( 0, 0, 0, 0, localPort );
sockaddr_in addr;
IPAddrToSockAddr( &localAddr, &addr );
int status = bind( m_Socket, (sockaddr*)&addr, sizeof(addr) );
if ( status != 0 )
{
Term();
Warning( "CMasterMulticastThread::Init - bind( %d.%d.%d.%d:%d ) failed\n", EXPAND_ADDR( *pAddr ) );
return false;
}
// Remember the address we want to send to.
IPAddrToSockAddr( pAddr, &m_MulticastAddr );
// Now create our thread.
DWORD dwThreadID = 0;
m_hThread = CreateThread( NULL, 0, &CMasterMulticastThread::StaticMulticastThread, this, 0, &dwThreadID );
if ( !m_hThread )
{
Term();
Warning( "CMasterMulticastThread::Init - CreateThread failed\n", EXPAND_ADDR( *pAddr ) );
return false;
}
SetThreadPriority( m_hThread, THREAD_PRIORITY_LOWEST );
return true;
}
void CMasterMulticastThread::Term()
{
// Stop the thread if it is running.
if ( m_hThread )
{
SetEvent( m_hTermEvent );
WaitForSingleObject( m_hThread, INFINITE );
CloseHandle( m_hThread );
m_hThread = NULL;
}
// Close the socket.
closesocket( m_Socket );
m_Socket = INVALID_SOCKET;
// Free up other data.
m_Files.PurgeAndDeleteElements();
}
int CMasterMulticastThread::AddFileRequest( const char *pFilename, int clientID )
{
// Firstly, do we already have this file?
int iFile = FindOrAddFile( pFilename );
if ( iFile == -1 )
return -1;
CMulticastFile *pFile = m_Files[iFile];
// Now that we have a file setup, merge in this client's info.
EnterCriticalSection( &m_CS );
CClientFileInfo *pClient = new CClientFileInfo;
pClient->m_ClientID = clientID;
pClient->m_nChunksLeft = pFile->m_Info.m_nChunks;
pClient->m_ChunksToSend.SetSize( PAD_NUMBER( pFile->m_Info.m_nChunks, 8 ) / 8 );
memset( pClient->m_ChunksToSend.Base(), 0xFF, pClient->m_ChunksToSend.Count() );
pFile->m_Clients.AddToTail( pClient );
for ( int i=0; i < pFile->m_Chunks.Count(); i++ )
{
IncrementChunkRefCount( pFile, i );
}
LeaveCriticalSection( &m_CS );
return iFile;
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?