vmpi_filesystem.cpp

来自「hl2 source code. Do not use it illegal.」· C++ 代码 · 共 1,653 行 · 第 1/4 页

CPP
1,653
字号

void CMasterMulticastThread::OnChunkReceived( int fileID, int clientID, int iChunk )
{
	if ( !m_Files.IsValidIndex( fileID ) )
	{
		Warning( "CMasterMulticastThread::OnChunkReceived: invalid file (%d) from client %d\n", fileID, clientID );
		return;
	}

	CMulticastFile *pFile = m_Files[fileID];
	CClientFileInfo *pClient = NULL;
	FOR_EACH_LL( pFile->m_Clients, iClient )
	{
		if ( pFile->m_Clients[iClient]->m_ClientID == clientID )
		{
			pClient = pFile->m_Clients[iClient];
			break;
		}
	}
	if ( !pClient )
	{
		Warning( "CMasterMulticastThread::OnChunkReceived: invalid client ID (%d) for file %s\n", clientID, pFile->GetFilename() );
		return;
	}

	if ( !pFile->m_Chunks.IsValidIndex( iChunk ) )
	{
		Warning( "CMasterMulticastThread::OnChunkReceived: invalid chunk index (%d) for file %s\n", iChunk, pFile->GetFilename() );
		return;
	}

	// Mark that this client doesn't need this chunk anymore.
	pClient->m_ChunksToSend[iChunk >> 3] &= ~(1 << (iChunk & 7));
	pClient->m_nChunksLeft--;

	if ( pClient->m_nChunksLeft == 0 && g_iVMPIVerboseLevel >= 2 )
		Warning( "Client %d got file %s\n", clientID, pFile->GetFilename() );
	
	EnterCriticalSection( &m_CS );
		DecrementChunkRefCount( fileID, iChunk );
	LeaveCriticalSection( &m_CS );
}


void CMasterMulticastThread::OnFileReceived( int fileID, int clientID )
{
	if ( !m_Files.IsValidIndex( fileID ) )
	{
		Warning( "CMasterMulticastThread::OnChunkReceived: invalid file (%d) from client %d\n", fileID, clientID );
		return;
	}

	CMulticastFile *pFile = m_Files[fileID];
	for ( int i=0; i < pFile->m_Info.m_nChunks; i++ )
		OnChunkReceived( fileID, clientID, i );
}


void CMasterMulticastThread::OnClientDisconnect( int clientID )
{
	EnterCriticalSection( &m_CS );

	// Remove all references from this client.
	FOR_EACH_LL( m_Files, iFile )
	{
		CMulticastFile *pFile = m_Files[iFile];

		FOR_EACH_LL( pFile->m_Clients, iClient )
		{
			CClientFileInfo *pClient = pFile->m_Clients[iClient];
			
			if ( pClient->m_ClientID != clientID )
				continue;

			// Ok, this is our man. Decrement the refcount of any chunks this client wanted.
			for ( int iChunk=0; iChunk < pFile->m_Info.m_nChunks; iChunk++ )
			{
				if ( pClient->NeedsChunk( iChunk ) )
				{
					DecrementChunkRefCount( iFile, iChunk );
				}
			}

			delete pClient;
			pFile->m_Clients.Remove( iClient );

			break;
		}
	}

	LeaveCriticalSection( &m_CS );
}


void CMasterMulticastThread::CreateVirtualFile( const char *pFilename, const void *pData, unsigned long fileLength )
{
	int iFile = FindFile( pFilename );
	if ( iFile != -1 )
		Error( "CMasterMulticastThread::CreateVirtualFile( %s ) - file already exists!", pFilename );

	CMulticastFile *pFile = new CMulticastFile;
	pFile->m_UncompressedData.CopyArray( (const char*)pData, fileLength );

	FinishFileSetup( pFile, pFilename );
}


DWORD WINAPI CMasterMulticastThread::StaticMulticastThread( LPVOID pParameter )
{
	return ((CMasterMulticastThread*)pParameter)->MulticastThread();
}


DWORD CMasterMulticastThread::MulticastThread()
{
	double nMicrosecondsPerByte = 1000000.0 / (double)MULTICAST_TRANSMIT_RATE;
	unsigned long waitAccumulator = 0;

	DWORD msToWait = 0;
	while ( WaitForSingleObject( m_hTermEvent, msToWait ) != WAIT_OBJECT_0 )
	{
		EnterCriticalSection( &m_CS );
			
			// If we have nothing to send then kick back for a while.
			if ( m_nTotalActiveChunks == 0 )
			{
				LeaveCriticalSection( &m_CS );
				msToWait = 50;
				continue;
			}

			
			// We're going to time how long this chunk took to send.
			CFastTimer timer;
			timer.Start();

			// Make sure we're on a valid chunk.
			if ( m_iCurFile == -1 )
			{
				Assert( m_Files.Count() > 0 );
				m_iCurFile = m_Files.Head();
				m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Head();
			}

			int iStartFile = m_iCurFile;
			while ( 1 )
			{
				if ( m_iCurActiveChunk == m_Files[m_iCurFile]->m_ActiveChunks.InvalidIndex() ||
					m_Files[m_iCurFile]->m_ActiveChunks[m_iCurActiveChunk]->m_RefCount == 0 )
				{
					// Finished with that file. Send the next one.
					m_iCurFile = m_Files.Next( m_iCurFile );
					if ( m_iCurFile == m_Files.InvalidIndex() )
						m_iCurFile = m_Files.Head();

					m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Head();
				}

				if ( m_iCurActiveChunk != m_Files[m_iCurFile]->m_ActiveChunks.InvalidIndex() )
				{
					// Only break if we're on an active chunk.
					if ( m_Files[m_iCurFile]->m_ActiveChunks[m_iCurActiveChunk]->m_RefCount != 0 )
					{
						break;
					}

					m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Next( m_iCurActiveChunk );
				}
			}

			// Send the next chunk (file, size, time, chunk data).
			CMulticastFile *pFile = m_Files[m_iCurFile];
			
			int iStartByte = m_iCurActiveChunk * MULTICAST_CHUNK_PAYLOAD_SIZE;
			int iEndByte = min( iStartByte + MULTICAST_CHUNK_PAYLOAD_SIZE, pFile->m_Data.Count() );

			WSABUF bufs[4];
			bufs[0].buf = (char*)&pFile->m_Info;
			bufs[0].len = sizeof( pFile->m_Info );

			bufs[1].buf = (char*)&m_iCurActiveChunk;
			bufs[1].len = sizeof( m_iCurActiveChunk );

			bufs[2].buf = (char*)pFile->GetFilename();
			bufs[2].len = strlen( pFile->GetFilename() ) + 1;

			bufs[3].buf = &pFile->m_Data[iStartByte];
			bufs[3].len = iEndByte - iStartByte;

			DWORD nBytesSent = 0;
			int ret = WSASendTo( 
				m_Socket, 
				bufs, 
				ARRAYSIZE( bufs ), 
				&nBytesSent, 
				0, 
				(sockaddr*)&m_MulticastAddr, 
				sizeof( m_MulticastAddr ),
				NULL,
				NULL );			

			g_nMulticastBytesSent += (int)nBytesSent;

			// Move to the next chunk.
			m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Next( m_iCurActiveChunk );

		LeaveCriticalSection( &m_CS );


		// Measure how long it took to send this.
		timer.End();
		unsigned long timeTaken = timer.GetDuration().GetMicroseconds();

		
		// Measure how long it should have taken.
		unsigned long optimalTimeTaken = (unsigned long)( nMicrosecondsPerByte * nBytesSent );

		
		// If we went faster than we should have, then wait for the difference in time.
		if ( timeTaken < optimalTimeTaken )
		{
			waitAccumulator += optimalTimeTaken - timeTaken;
		}

		// Since we can only wait in milliseconds, it accumulates the wait time in microseconds until 
		// we've got a couple seconds of wait time.
		unsigned long testWait = waitAccumulator / 1000;
		if ( testWait >= MINIMUM_SLEEP_MS )
		{
			msToWait = testWait;
			waitAccumulator -= testWait * 1000;
		}
		else
		{
			msToWait = 0;
		}
	}

	return 0;
}


void CMasterMulticastThread::IncrementChunkRefCount( CMasterMulticastThread::CMulticastFile *pFile, int iChunk )
{
	CChunkInfo *pChunk = &pFile->m_Chunks[iChunk];

	if ( pChunk->m_RefCount == 0 )
	{
		++m_nTotalActiveChunks;
		
		// Move the chunk to the head of the list since it is now active.
		pFile->m_ActiveChunks.Remove( pChunk->m_iActiveChunksIndex );
		pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToHead( pChunk );
	}

	pChunk->m_RefCount++;
}


void CMasterMulticastThread::DecrementChunkRefCount( int iFile, int iChunk )
{
	CMulticastFile *pFile = m_Files[iFile];
	CChunkInfo *pChunk = &pFile->m_Chunks[iChunk];

	if ( pChunk->m_RefCount == 0 )
	{
		Error( "CMasterMulticastThread::DecrementChunkRefCount - refcount already zero!\n" );
	}

	pChunk->m_RefCount--;
	if ( pChunk->m_RefCount == 0 )
	{
		--m_nTotalActiveChunks;
		
		// If this is the current chunk the thread is reading on, seek up to the next chunk so
		// the thread doesn't spin off into the next file and skip its current file's contents.
		if ( iFile == m_iCurFile && pChunk->m_iActiveChunksIndex == m_iCurActiveChunk )
		{
			m_iCurActiveChunk = pFile->m_ActiveChunks.Next( pChunk->m_iActiveChunksIndex );
		}

		// Move the chunk to the end of the list since it is now inactive.
		pFile->m_ActiveChunks.Remove( pChunk->m_iActiveChunksIndex );
		pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToTail( pChunk );
	}
}


int CMasterMulticastThread::FindFile( const char *pName )
{
	FOR_EACH_LL( m_Files, i )
	{
		if ( stricmp( m_Files[i]->GetFilename(), pName ) == 0 )
			return i;
	}
	return -1;
}


bool CMasterMulticastThread::FindWarningSuppression( const char *pFilename )
{
	FOR_EACH_LL( m_WarningSuppressions, i )
	{
		if ( Q_stricmp( m_WarningSuppressions[i], pFilename ) == 0 )
			return true;
	}
	return false;
}


void CMasterMulticastThread::AddWarningSuppression( const char *pFilename )
{
	char *pBlah = new char[ strlen( pFilename ) + 1 ];
	strcpy( pBlah, pFilename );
	m_WarningSuppressions.AddToTail( pBlah );
}


int CMasterMulticastThread::FindOrAddFile( const char *pFilename )
{
	// See if we've already opened this file.
	int iFile = FindFile( pFilename );
	if ( iFile != -1 )
		return iFile;

	// Read in the file's data.
	FILE *fp = fopen( pFilename, "rb" );
	if ( !fp )
	{
		// It tends to show this once per worker, so only let it show a warning once.
		//if ( !FindWarningSuppression( pFilename ) )
		//{
		//	AddWarningSuppression( pFilename );
		//	Warning( "CMasterMulticastThread::AddFileRequest( %s ): fopen() failed\n", pFilename );
		//}

		return -1;
	}

	CMulticastFile *pFile = new CMulticastFile;

	fseek( fp, 0, SEEK_END );
	pFile->m_UncompressedData.SetSize( ftell( fp ) );
	fseek( fp, 0, SEEK_SET );
	fread( pFile->m_UncompressedData.Base(), 1, pFile->m_UncompressedData.Count(), fp );

	fclose( fp );

	return FinishFileSetup( pFile, pFilename );	
}


int CMasterMulticastThread::FinishFileSetup( CMulticastFile *pFile, const char *pFilename )
{
	// Compress the file's contents.
	if ( !ZLibCompress( pFile->m_UncompressedData.Base(), pFile->m_UncompressedData.Count(), pFile->m_Data ) )
	{
		delete pFile;
		return -1;
	}

	pFile->m_Filename.SetSize( strlen( pFilename ) + 1 );
	strcpy( pFile->m_Filename.Base(), pFilename );
	pFile->m_nCycles = 0;

	pFile->m_Info.m_CompressedSize = pFile->m_Data.Count();
	pFile->m_Info.m_UncompressedSize = pFile->m_UncompressedData.Count();

	pFile->m_Info.m_nChunks = PAD_NUMBER( pFile->m_Info.m_CompressedSize, MULTICAST_CHUNK_PAYLOAD_SIZE ) / MULTICAST_CHUNK_PAYLOAD_SIZE;

	// Initialize the chunks.
	pFile->m_Chunks.SetSize( pFile->m_Info.m_nChunks );
	for ( int i=0; i < pFile->m_Chunks.Count(); i++ )
	{
		CChunkInfo *pChunk = &pFile->m_Chunks[i];

		pChunk->m_iChunk = (unsigned short)i;
		pChunk->m_RefCount = 0;
		pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToTail( pChunk );
	}

	// Get this file in the queue.
	EnterCriticalSection( &m_CS );
		pFile->m_Info.m_FileID = m_Files.AddToTail( pFile );
	LeaveCriticalSection( &m_CS );

	return pFile->m_Info.m_FileID;
}


const CUtlVector<char>& CMasterMulticastThread::GetFileData( int iFile ) const
{
	return m_Files[iFile]->m_UncompressedData;
}



// -------------------------------------------------------------------------------------------------------------- //
// The VMPI file system. The master runs a thread that multicasts the contents of all the 
// files being used in the session. The workers read whatever files they need out of the stream.
// -------------------------------------------------------------------------------------------------------------- //

class CWorkerFile
{
public:
	const char* GetFilename() { return m_Filename.Base(); }
	bool IsReadyToRead() const { return m_nChunksToReceive == 0; }


public:
	CFastTimer m_Timer; // To see how long it takes to download the file.

	// This is false until we get any packets about the file. In the packets,
	// we find out what the size is supposed to be.

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?