vmpi_filesystem.cpp

来自「hl2 source code. Do not use it illegal.」· C++ 代码 · 共 1,653 行 · 第 1/4 页

CPP
1,653
字号
		FOR_EACH_LL( m_WorkerFiles, i )
		{
			if ( stricmp( m_WorkerFiles[i]->GetFilename(), pFilename ) == 0 )
				return m_WorkerFiles[i];
		}
		return NULL;
	}

	CWorkerFile* FindWorkerFile( int fileID ) 
	{
		FOR_EACH_LL( m_WorkerFiles, i )
		{
			if ( m_WorkerFiles[i]->m_FileID == fileID )
				return m_WorkerFiles[i];
		}
		return NULL;
	}


private:
	CIPAddr m_MulticastAddr;
	CUtlLinkedList<CWorkerFile*, int> m_WorkerFiles;

	// How many files do we have open that we haven't finished receiving from the server yet?
	// We always keep waiting for data until this is zero.
	int m_nUnfinishedFiles;
};


class CVMPIFileSystem : public IBaseFileSystem
{
public:
	CMasterMulticastThread m_MasterThread;
	CWorkerMulticastListener m_Listener;

	CIPAddr m_MulticastIP;



	
	~CVMPIFileSystem()
	{
	}


	static void OnClientDisconnect( int procID, const char *pReason )
	{
		extern CVMPIFileSystem g_VMPIFileSystem;
		g_VMPIFileSystem.m_MasterThread.OnClientDisconnect( procID );
	}
	

	bool Init()
	{
		// Pick a random IP in the multicast range (224.0.0.2 to 239.255.255.255);
		if ( g_bUseMPI )
		{
			if ( g_bMPIMaster )
			{
				CCycleCount cnt;
				cnt.Sample();
				RandomSeed( (int)cnt.GetMicroseconds() );

				int localPort = 23412; // This can be anything.

				m_MulticastIP.port = RandomInt( 22000, 25000 ); // Pulled out of something else.
				m_MulticastIP.ip[0] = (unsigned char)RandomInt( 225, 238 );
				m_MulticastIP.ip[1] = (unsigned char)RandomInt( 0, 255 );
				m_MulticastIP.ip[2] = (unsigned char)RandomInt( 0, 255 );
				m_MulticastIP.ip[3] = (unsigned char)RandomInt( 3, 255 );

				if ( !m_MasterThread.Init( localPort, &m_MulticastIP ) )
					return false;

				// Send out the multicast addr to all the clients.
				SendMulticastIP( &m_MulticastIP );

				// Make sure we're notified when a client disconnects so we can unlink them from the 
				// multicast thread's structures.
				VMPI_AddDisconnectHandler( &CVMPIFileSystem::OnClientDisconnect );
				return true;
			}
			else
			{
				// Get the multicast addr to listen on.
				CIPAddr mcAddr;
				RecvMulticastIP( &mcAddr );

				return m_Listener.Init( mcAddr );
			}
		}
		else
		{
			return true;
		}
	}

	void Term()
	{
		m_MasterThread.Term();
		m_Listener.Term();
	}


	FileHandle_t Open_Master( const char *pFilename, const char *pOptions )
	{
		// Use a stdio file if they want to write to it.
		bool bWriteAccess = (Q_stristr( pOptions, "w" ) != 0);
		if ( bWriteAccess )
		{
			FILE *fp = fopen( pFilename, pOptions );
			if ( !fp )
				return FILESYSTEM_INVALID_HANDLE;

			CVMPIFile_Stdio *pFile = new CVMPIFile_Stdio;
			pFile->Init( fp );
			return (FileHandle_t)pFile;
		}

		// Have our multicast thread load all the data so it's there when workers want it.
		int iFile = m_MasterThread.FindOrAddFile( pFilename );
		if ( iFile == -1 )
			return FILESYSTEM_INVALID_HANDLE;

		const CUtlVector<char> &data = m_MasterThread.GetFileData( iFile );

		CVMPIFile_Memory *pFile = new CVMPIFile_Memory;
		pFile->Init( data.Base(), data.Count() );
		return (FileHandle_t)pFile;
	}


	FileHandle_t Open_Worker( const char *pFilename, const char *pOptions )
	{
		// Workers can't open anything for write access.
		bool bWriteAccess = (Q_stristr( pOptions, "w" ) != 0);
		if ( bWriteAccess )
			return FILESYSTEM_INVALID_HANDLE;

		// Do we have this file's data already?
		CWorkerFile *pFile = m_Listener.FindWorkerFile( pFilename );
		if ( !pFile || !pFile->IsReadyToRead() )
		{
			// Ok, start listening to the multicast stream until we get the file we want.
			
			// NOTE: it might make sense here to have the client ask for a list of ALL the files that
			// the master currently has and wait to receive all of them (so we don't come back a bunch
			// of times and listen 

			// NOTE NOTE: really, the best way to do this is to have a thread on the workers that sits there
			// and listens to the multicast stream. Any time the master opens a new file up, it assumes
			// all the workers need the file, and it starts to send it on the multicast stream until
			// the worker threads respond that they all have it.
			//
			// (NOTE: this probably means that the clients would have to ack the chunks on a UDP socket that
			// the thread owns).
			//
			// This would simplify all the worries about a client missing half the stream and having to 
			// wait for another cycle through it.
			pFile = m_Listener.ListenFor( pFilename );

			if ( !pFile )
			{
				return FILESYSTEM_INVALID_HANDLE;
			}
		}

		// Ok! Got the file. now setup a memory stream they can read out of it with.
		CVMPIFile_Memory *pOut = new CVMPIFile_Memory;
		pOut->Init( pFile->m_UncompressedData.Base(), pFile->m_UncompressedData.Count() );
		return (FileHandle_t)pOut;
	}


	virtual FileHandle_t	Open( const char *pFilename, const char *pOptions, const char *pathID = 0 )
	{
		// Warning( "(%d) Open( %s, %s )\n", (int)Plat_FloatTime(), pFilename, pOptions );

		if ( g_bDisableFileAccess )
			Error( "Open( %s, %s ) - file access has been disabled.", pFilename, pOptions );

		// If not using VMPI, then use regular file IO.
		if ( !g_bUseMPI )
		{
			FILE *fp = fopen( pFilename, pOptions );
			if ( !fp )
				return FILESYSTEM_INVALID_HANDLE;

			CVMPIFile_Stdio *pFile = new CVMPIFile_Stdio;
			pFile->Init( fp );
			return (FileHandle_t)pFile;
		}

		
		// If MPI is on and we're a worker and we're opening a file for reading, then we use the cache.
		if ( g_bMPIMaster )
		{
			return Open_Master( pFilename, pOptions );
		}
		else
		{ 
			return Open_Worker( pFilename, pOptions );
		}
	}

	virtual void			Close( FileHandle_t file )
	{
		if ( file )
			((IVMPIFile*)file)->Close();
	}

	virtual int				Read( void* pOutput, int size, FileHandle_t file )
	{
		return ((IVMPIFile*)file)->Read( pOutput, size );
	}

	virtual int				Write( void const* pInput, int size, FileHandle_t file )
	{
		return ((IVMPIFile*)file)->Write( pInput, size );
	}

	virtual void			Seek( FileHandle_t file, int pos, FileSystemSeek_t seekType )
	{
		((IVMPIFile*)file)->Seek( pos, seekType );
	}

	virtual unsigned int	Tell( FileHandle_t file )
	{
		return ((IVMPIFile*)file)->Tell();
	}

	virtual unsigned int	Size( FileHandle_t file )
	{
		return ((IVMPIFile*)file)->Size();
	}

	virtual unsigned int	Size( const char *pFilename, const char *pathID = 0 )
	{
		FileHandle_t hFile = Open( pFilename, "rb" );
		if ( hFile == FILESYSTEM_INVALID_HANDLE )
		{
			return 0;
		}
		else
		{
			unsigned int ret = Size( hFile );
			Close( hFile );
			return ret;
		}
	}

	virtual long			GetFileTime( const char *pFileName, const char *pathID = 0 )
	{
		struct	_stat buf;
		int sr = _stat( pFileName, &buf );
		if ( sr == -1 )
		{
			return 0;
		}

		return buf.st_mtime;
	}

	virtual void			Flush( FileHandle_t file )
	{
		((IVMPIFile*)file)->Flush();
	}	

	virtual bool			Precache( const char* pFileName, const char *pPathID )
	{
		return false;
	} 

	virtual bool			FileExists( const char *pFileName, const char *pPathID )
	{
		FileHandle_t hFile = Open( pFileName, "rb" );
		if ( hFile )
		{
			Close( hFile );
			return true;
		}
		else
		{
			return false;
		}
	}
	virtual bool			IsFileWritable( const char *pFileName, const char *pPathID )
	{
		struct	_stat buf;
		int sr = _stat( pFileName, &buf );
		if ( sr == -1 )
		{
			return false;
		}

		if( buf.st_mode & _S_IWRITE )
		{
			return true;
		}

		return false;
	}
};

CVMPIFileSystem g_VMPIFileSystem;
EXPOSE_SINGLE_INTERFACE_GLOBALVAR( CVMPIFileSystem, IBaseFileSystem, BASEFILESYSTEM_INTERFACE_VERSION, g_VMPIFileSystem );


IBaseFileSystem* VMPI_FileSystem_Init( IVMPIFileSystemHelpers *pHelpers )
{
	g_pHelpers = pHelpers;
	if ( !g_VMPIFileSystem.Init() )
	{
		Error( "Error in CVMPIFileSystem::Init()\n" );
	}

	return &g_VMPIFileSystem;
}


void VMPI_FileSystem_Term()
{
	if ( g_bUseMPI )
	{
		g_VMPIFileSystem.Term();

		if ( g_bMPIMaster )
			Msg( "Multicast send: %dk\n", (g_nMulticastBytesSent + 511) / 1024 );
		else
			Msg( "Multicast recv: %dk\n", (g_nMulticastBytesReceived + 511) / 1024 );
	}
}


void VMPI_FileSystem_DisableFileAccess()
{
	g_bDisableFileAccess = true;
}


CreateInterfaceFn VMPI_FileSystem_GetFactory()
{
	return Sys_GetFactoryThis();
}


void VMPI_FileSystem_CreateVirtualFile( const char *pFilename, const void *pData, unsigned long fileLength )
{
	g_VMPIFileSystem.m_MasterThread.CreateVirtualFile( pFilename, pData, fileLength );
}


// Register our packet ID.
bool FileSystemRecv( MessageBuffer *pBuf, int iSource, int iPacketID )
{
	// Handle this packet.
	int subPacketID = pBuf->data[1];
	switch( subPacketID )
	{
		case VMPI_FSPACKETID_FILE_REQUEST:
		{
			const char *pFilename = (const char*)&pBuf->data[2];
			
			if ( g_iVMPIVerboseLevel >= 2 )
				Msg( "Client %d requested '%s'\n", iSource, pFilename );

			int fileID = g_VMPIFileSystem.m_MasterThread.AddFileRequest( pFilename, iSource );
			
			// Send back the file ID.
			unsigned char cPacket[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_FILE_RESPONSE };
			void *pChunks[2] = { cPacket, &fileID };
			int chunkLen[2] = { sizeof( cPacket ), sizeof( fileID ) };

			VMPI_SendChunks( pChunks, chunkLen, ARRAYSIZE( pChunks ), iSource );
			return true;
		}
		break;

		case VMPI_FSPACKETID_CHUNK_RECEIVED:
		{
			unsigned short *pFileID = (unsigned short*)&pBuf->data[2];
			unsigned short *pChunkID = pFileID+1;
			
			int nChunks = (pBuf->getLen() - 2) / 4;
			for ( int i=0; i < nChunks; i++ )
			{
				g_VMPIFileSystem.m_MasterThread.OnChunkReceived( *pFileID, iSource, *pChunkID );
				pFileID += 2;
				pChunkID += 2;
			}
			return true;
		}
		break;

		case VMPI_FSPACKETID_FILE_RECEIVED:
		{
			unsigned short *pFileID = (unsigned short*)&pBuf->data[2];
			g_VMPIFileSystem.m_MasterThread.OnFileReceived( *pFileID, iSource );
			return true;
		}
		
		default:
		{
			return false;
		}
	}
}


CDispatchReg g_DispatchReg_FileSystem( VMPI_PACKETID_FILESYSTEM, FileSystemRecv );

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?