vmpi_filesystem.cpp
来自「hl2 source code. Do not use it illegal.」· C++ 代码 · 共 1,653 行 · 第 1/4 页
CPP
1,653 行
FOR_EACH_LL( m_WorkerFiles, i )
{
if ( stricmp( m_WorkerFiles[i]->GetFilename(), pFilename ) == 0 )
return m_WorkerFiles[i];
}
return NULL;
}
CWorkerFile* FindWorkerFile( int fileID )
{
FOR_EACH_LL( m_WorkerFiles, i )
{
if ( m_WorkerFiles[i]->m_FileID == fileID )
return m_WorkerFiles[i];
}
return NULL;
}
private:
CIPAddr m_MulticastAddr;
CUtlLinkedList<CWorkerFile*, int> m_WorkerFiles;
// How many files do we have open that we haven't finished receiving from the server yet?
// We always keep waiting for data until this is zero.
int m_nUnfinishedFiles;
};
class CVMPIFileSystem : public IBaseFileSystem
{
public:
CMasterMulticastThread m_MasterThread;
CWorkerMulticastListener m_Listener;
CIPAddr m_MulticastIP;
~CVMPIFileSystem()
{
}
static void OnClientDisconnect( int procID, const char *pReason )
{
extern CVMPIFileSystem g_VMPIFileSystem;
g_VMPIFileSystem.m_MasterThread.OnClientDisconnect( procID );
}
bool Init()
{
// Pick a random IP in the multicast range (224.0.0.2 to 239.255.255.255);
if ( g_bUseMPI )
{
if ( g_bMPIMaster )
{
CCycleCount cnt;
cnt.Sample();
RandomSeed( (int)cnt.GetMicroseconds() );
int localPort = 23412; // This can be anything.
m_MulticastIP.port = RandomInt( 22000, 25000 ); // Pulled out of something else.
m_MulticastIP.ip[0] = (unsigned char)RandomInt( 225, 238 );
m_MulticastIP.ip[1] = (unsigned char)RandomInt( 0, 255 );
m_MulticastIP.ip[2] = (unsigned char)RandomInt( 0, 255 );
m_MulticastIP.ip[3] = (unsigned char)RandomInt( 3, 255 );
if ( !m_MasterThread.Init( localPort, &m_MulticastIP ) )
return false;
// Send out the multicast addr to all the clients.
SendMulticastIP( &m_MulticastIP );
// Make sure we're notified when a client disconnects so we can unlink them from the
// multicast thread's structures.
VMPI_AddDisconnectHandler( &CVMPIFileSystem::OnClientDisconnect );
return true;
}
else
{
// Get the multicast addr to listen on.
CIPAddr mcAddr;
RecvMulticastIP( &mcAddr );
return m_Listener.Init( mcAddr );
}
}
else
{
return true;
}
}
void Term()
{
m_MasterThread.Term();
m_Listener.Term();
}
FileHandle_t Open_Master( const char *pFilename, const char *pOptions )
{
// Use a stdio file if they want to write to it.
bool bWriteAccess = (Q_stristr( pOptions, "w" ) != 0);
if ( bWriteAccess )
{
FILE *fp = fopen( pFilename, pOptions );
if ( !fp )
return FILESYSTEM_INVALID_HANDLE;
CVMPIFile_Stdio *pFile = new CVMPIFile_Stdio;
pFile->Init( fp );
return (FileHandle_t)pFile;
}
// Have our multicast thread load all the data so it's there when workers want it.
int iFile = m_MasterThread.FindOrAddFile( pFilename );
if ( iFile == -1 )
return FILESYSTEM_INVALID_HANDLE;
const CUtlVector<char> &data = m_MasterThread.GetFileData( iFile );
CVMPIFile_Memory *pFile = new CVMPIFile_Memory;
pFile->Init( data.Base(), data.Count() );
return (FileHandle_t)pFile;
}
FileHandle_t Open_Worker( const char *pFilename, const char *pOptions )
{
// Workers can't open anything for write access.
bool bWriteAccess = (Q_stristr( pOptions, "w" ) != 0);
if ( bWriteAccess )
return FILESYSTEM_INVALID_HANDLE;
// Do we have this file's data already?
CWorkerFile *pFile = m_Listener.FindWorkerFile( pFilename );
if ( !pFile || !pFile->IsReadyToRead() )
{
// Ok, start listening to the multicast stream until we get the file we want.
// NOTE: it might make sense here to have the client ask for a list of ALL the files that
// the master currently has and wait to receive all of them (so we don't come back a bunch
// of times and listen
// NOTE NOTE: really, the best way to do this is to have a thread on the workers that sits there
// and listens to the multicast stream. Any time the master opens a new file up, it assumes
// all the workers need the file, and it starts to send it on the multicast stream until
// the worker threads respond that they all have it.
//
// (NOTE: this probably means that the clients would have to ack the chunks on a UDP socket that
// the thread owns).
//
// This would simplify all the worries about a client missing half the stream and having to
// wait for another cycle through it.
pFile = m_Listener.ListenFor( pFilename );
if ( !pFile )
{
return FILESYSTEM_INVALID_HANDLE;
}
}
// Ok! Got the file. now setup a memory stream they can read out of it with.
CVMPIFile_Memory *pOut = new CVMPIFile_Memory;
pOut->Init( pFile->m_UncompressedData.Base(), pFile->m_UncompressedData.Count() );
return (FileHandle_t)pOut;
}
virtual FileHandle_t Open( const char *pFilename, const char *pOptions, const char *pathID = 0 )
{
// Warning( "(%d) Open( %s, %s )\n", (int)Plat_FloatTime(), pFilename, pOptions );
if ( g_bDisableFileAccess )
Error( "Open( %s, %s ) - file access has been disabled.", pFilename, pOptions );
// If not using VMPI, then use regular file IO.
if ( !g_bUseMPI )
{
FILE *fp = fopen( pFilename, pOptions );
if ( !fp )
return FILESYSTEM_INVALID_HANDLE;
CVMPIFile_Stdio *pFile = new CVMPIFile_Stdio;
pFile->Init( fp );
return (FileHandle_t)pFile;
}
// If MPI is on and we're a worker and we're opening a file for reading, then we use the cache.
if ( g_bMPIMaster )
{
return Open_Master( pFilename, pOptions );
}
else
{
return Open_Worker( pFilename, pOptions );
}
}
virtual void Close( FileHandle_t file )
{
if ( file )
((IVMPIFile*)file)->Close();
}
virtual int Read( void* pOutput, int size, FileHandle_t file )
{
return ((IVMPIFile*)file)->Read( pOutput, size );
}
virtual int Write( void const* pInput, int size, FileHandle_t file )
{
return ((IVMPIFile*)file)->Write( pInput, size );
}
virtual void Seek( FileHandle_t file, int pos, FileSystemSeek_t seekType )
{
((IVMPIFile*)file)->Seek( pos, seekType );
}
virtual unsigned int Tell( FileHandle_t file )
{
return ((IVMPIFile*)file)->Tell();
}
virtual unsigned int Size( FileHandle_t file )
{
return ((IVMPIFile*)file)->Size();
}
virtual unsigned int Size( const char *pFilename, const char *pathID = 0 )
{
FileHandle_t hFile = Open( pFilename, "rb" );
if ( hFile == FILESYSTEM_INVALID_HANDLE )
{
return 0;
}
else
{
unsigned int ret = Size( hFile );
Close( hFile );
return ret;
}
}
virtual long GetFileTime( const char *pFileName, const char *pathID = 0 )
{
struct _stat buf;
int sr = _stat( pFileName, &buf );
if ( sr == -1 )
{
return 0;
}
return buf.st_mtime;
}
virtual void Flush( FileHandle_t file )
{
((IVMPIFile*)file)->Flush();
}
virtual bool Precache( const char* pFileName, const char *pPathID )
{
return false;
}
virtual bool FileExists( const char *pFileName, const char *pPathID )
{
FileHandle_t hFile = Open( pFileName, "rb" );
if ( hFile )
{
Close( hFile );
return true;
}
else
{
return false;
}
}
virtual bool IsFileWritable( const char *pFileName, const char *pPathID )
{
struct _stat buf;
int sr = _stat( pFileName, &buf );
if ( sr == -1 )
{
return false;
}
if( buf.st_mode & _S_IWRITE )
{
return true;
}
return false;
}
};
CVMPIFileSystem g_VMPIFileSystem;
EXPOSE_SINGLE_INTERFACE_GLOBALVAR( CVMPIFileSystem, IBaseFileSystem, BASEFILESYSTEM_INTERFACE_VERSION, g_VMPIFileSystem );
IBaseFileSystem* VMPI_FileSystem_Init( IVMPIFileSystemHelpers *pHelpers )
{
g_pHelpers = pHelpers;
if ( !g_VMPIFileSystem.Init() )
{
Error( "Error in CVMPIFileSystem::Init()\n" );
}
return &g_VMPIFileSystem;
}
void VMPI_FileSystem_Term()
{
if ( g_bUseMPI )
{
g_VMPIFileSystem.Term();
if ( g_bMPIMaster )
Msg( "Multicast send: %dk\n", (g_nMulticastBytesSent + 511) / 1024 );
else
Msg( "Multicast recv: %dk\n", (g_nMulticastBytesReceived + 511) / 1024 );
}
}
void VMPI_FileSystem_DisableFileAccess()
{
g_bDisableFileAccess = true;
}
CreateInterfaceFn VMPI_FileSystem_GetFactory()
{
return Sys_GetFactoryThis();
}
void VMPI_FileSystem_CreateVirtualFile( const char *pFilename, const void *pData, unsigned long fileLength )
{
g_VMPIFileSystem.m_MasterThread.CreateVirtualFile( pFilename, pData, fileLength );
}
// Register our packet ID.
bool FileSystemRecv( MessageBuffer *pBuf, int iSource, int iPacketID )
{
// Handle this packet.
int subPacketID = pBuf->data[1];
switch( subPacketID )
{
case VMPI_FSPACKETID_FILE_REQUEST:
{
const char *pFilename = (const char*)&pBuf->data[2];
if ( g_iVMPIVerboseLevel >= 2 )
Msg( "Client %d requested '%s'\n", iSource, pFilename );
int fileID = g_VMPIFileSystem.m_MasterThread.AddFileRequest( pFilename, iSource );
// Send back the file ID.
unsigned char cPacket[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_FILE_RESPONSE };
void *pChunks[2] = { cPacket, &fileID };
int chunkLen[2] = { sizeof( cPacket ), sizeof( fileID ) };
VMPI_SendChunks( pChunks, chunkLen, ARRAYSIZE( pChunks ), iSource );
return true;
}
break;
case VMPI_FSPACKETID_CHUNK_RECEIVED:
{
unsigned short *pFileID = (unsigned short*)&pBuf->data[2];
unsigned short *pChunkID = pFileID+1;
int nChunks = (pBuf->getLen() - 2) / 4;
for ( int i=0; i < nChunks; i++ )
{
g_VMPIFileSystem.m_MasterThread.OnChunkReceived( *pFileID, iSource, *pChunkID );
pFileID += 2;
pChunkID += 2;
}
return true;
}
break;
case VMPI_FSPACKETID_FILE_RECEIVED:
{
unsigned short *pFileID = (unsigned short*)&pBuf->data[2];
g_VMPIFileSystem.m_MasterThread.OnFileReceived( *pFileID, iSource );
return true;
}
default:
{
return false;
}
}
}
CDispatchReg g_DispatchReg_FileSystem( VMPI_PACKETID_FILESYSTEM, FileSystemRecv );
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?