vmpi_filesystem.cpp
来自「hl2 source code. Do not use it illegal.」· C++ 代码 · 共 1,653 行 · 第 1/4 页
CPP
1,653 行
void CMasterMulticastThread::OnChunkReceived( int fileID, int clientID, int iChunk )
{
if ( !m_Files.IsValidIndex( fileID ) )
{
Warning( "CMasterMulticastThread::OnChunkReceived: invalid file (%d) from client %d\n", fileID, clientID );
return;
}
CMulticastFile *pFile = m_Files[fileID];
CClientFileInfo *pClient = NULL;
FOR_EACH_LL( pFile->m_Clients, iClient )
{
if ( pFile->m_Clients[iClient]->m_ClientID == clientID )
{
pClient = pFile->m_Clients[iClient];
break;
}
}
if ( !pClient )
{
Warning( "CMasterMulticastThread::OnChunkReceived: invalid client ID (%d) for file %s\n", clientID, pFile->GetFilename() );
return;
}
if ( !pFile->m_Chunks.IsValidIndex( iChunk ) )
{
Warning( "CMasterMulticastThread::OnChunkReceived: invalid chunk index (%d) for file %s\n", iChunk, pFile->GetFilename() );
return;
}
// Mark that this client doesn't need this chunk anymore.
pClient->m_ChunksToSend[iChunk >> 3] &= ~(1 << (iChunk & 7));
pClient->m_nChunksLeft--;
if ( pClient->m_nChunksLeft == 0 && g_iVMPIVerboseLevel >= 2 )
Warning( "Client %d got file %s\n", clientID, pFile->GetFilename() );
EnterCriticalSection( &m_CS );
DecrementChunkRefCount( fileID, iChunk );
LeaveCriticalSection( &m_CS );
}
void CMasterMulticastThread::OnFileReceived( int fileID, int clientID )
{
if ( !m_Files.IsValidIndex( fileID ) )
{
Warning( "CMasterMulticastThread::OnChunkReceived: invalid file (%d) from client %d\n", fileID, clientID );
return;
}
CMulticastFile *pFile = m_Files[fileID];
for ( int i=0; i < pFile->m_Info.m_nChunks; i++ )
OnChunkReceived( fileID, clientID, i );
}
void CMasterMulticastThread::OnClientDisconnect( int clientID )
{
EnterCriticalSection( &m_CS );
// Remove all references from this client.
FOR_EACH_LL( m_Files, iFile )
{
CMulticastFile *pFile = m_Files[iFile];
FOR_EACH_LL( pFile->m_Clients, iClient )
{
CClientFileInfo *pClient = pFile->m_Clients[iClient];
if ( pClient->m_ClientID != clientID )
continue;
// Ok, this is our man. Decrement the refcount of any chunks this client wanted.
for ( int iChunk=0; iChunk < pFile->m_Info.m_nChunks; iChunk++ )
{
if ( pClient->NeedsChunk( iChunk ) )
{
DecrementChunkRefCount( iFile, iChunk );
}
}
delete pClient;
pFile->m_Clients.Remove( iClient );
break;
}
}
LeaveCriticalSection( &m_CS );
}
void CMasterMulticastThread::CreateVirtualFile( const char *pFilename, const void *pData, unsigned long fileLength )
{
int iFile = FindFile( pFilename );
if ( iFile != -1 )
Error( "CMasterMulticastThread::CreateVirtualFile( %s ) - file already exists!", pFilename );
CMulticastFile *pFile = new CMulticastFile;
pFile->m_UncompressedData.CopyArray( (const char*)pData, fileLength );
FinishFileSetup( pFile, pFilename );
}
DWORD WINAPI CMasterMulticastThread::StaticMulticastThread( LPVOID pParameter )
{
return ((CMasterMulticastThread*)pParameter)->MulticastThread();
}
DWORD CMasterMulticastThread::MulticastThread()
{
double nMicrosecondsPerByte = 1000000.0 / (double)MULTICAST_TRANSMIT_RATE;
unsigned long waitAccumulator = 0;
DWORD msToWait = 0;
while ( WaitForSingleObject( m_hTermEvent, msToWait ) != WAIT_OBJECT_0 )
{
EnterCriticalSection( &m_CS );
// If we have nothing to send then kick back for a while.
if ( m_nTotalActiveChunks == 0 )
{
LeaveCriticalSection( &m_CS );
msToWait = 50;
continue;
}
// We're going to time how long this chunk took to send.
CFastTimer timer;
timer.Start();
// Make sure we're on a valid chunk.
if ( m_iCurFile == -1 )
{
Assert( m_Files.Count() > 0 );
m_iCurFile = m_Files.Head();
m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Head();
}
int iStartFile = m_iCurFile;
while ( 1 )
{
if ( m_iCurActiveChunk == m_Files[m_iCurFile]->m_ActiveChunks.InvalidIndex() ||
m_Files[m_iCurFile]->m_ActiveChunks[m_iCurActiveChunk]->m_RefCount == 0 )
{
// Finished with that file. Send the next one.
m_iCurFile = m_Files.Next( m_iCurFile );
if ( m_iCurFile == m_Files.InvalidIndex() )
m_iCurFile = m_Files.Head();
m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Head();
}
if ( m_iCurActiveChunk != m_Files[m_iCurFile]->m_ActiveChunks.InvalidIndex() )
{
// Only break if we're on an active chunk.
if ( m_Files[m_iCurFile]->m_ActiveChunks[m_iCurActiveChunk]->m_RefCount != 0 )
{
break;
}
m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Next( m_iCurActiveChunk );
}
}
// Send the next chunk (file, size, time, chunk data).
CMulticastFile *pFile = m_Files[m_iCurFile];
int iStartByte = m_iCurActiveChunk * MULTICAST_CHUNK_PAYLOAD_SIZE;
int iEndByte = min( iStartByte + MULTICAST_CHUNK_PAYLOAD_SIZE, pFile->m_Data.Count() );
WSABUF bufs[4];
bufs[0].buf = (char*)&pFile->m_Info;
bufs[0].len = sizeof( pFile->m_Info );
bufs[1].buf = (char*)&m_iCurActiveChunk;
bufs[1].len = sizeof( m_iCurActiveChunk );
bufs[2].buf = (char*)pFile->GetFilename();
bufs[2].len = strlen( pFile->GetFilename() ) + 1;
bufs[3].buf = &pFile->m_Data[iStartByte];
bufs[3].len = iEndByte - iStartByte;
DWORD nBytesSent = 0;
int ret = WSASendTo(
m_Socket,
bufs,
ARRAYSIZE( bufs ),
&nBytesSent,
0,
(sockaddr*)&m_MulticastAddr,
sizeof( m_MulticastAddr ),
NULL,
NULL );
g_nMulticastBytesSent += (int)nBytesSent;
// Move to the next chunk.
m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Next( m_iCurActiveChunk );
LeaveCriticalSection( &m_CS );
// Measure how long it took to send this.
timer.End();
unsigned long timeTaken = timer.GetDuration().GetMicroseconds();
// Measure how long it should have taken.
unsigned long optimalTimeTaken = (unsigned long)( nMicrosecondsPerByte * nBytesSent );
// If we went faster than we should have, then wait for the difference in time.
if ( timeTaken < optimalTimeTaken )
{
waitAccumulator += optimalTimeTaken - timeTaken;
}
// Since we can only wait in milliseconds, it accumulates the wait time in microseconds until
// we've got a couple seconds of wait time.
unsigned long testWait = waitAccumulator / 1000;
if ( testWait >= MINIMUM_SLEEP_MS )
{
msToWait = testWait;
waitAccumulator -= testWait * 1000;
}
else
{
msToWait = 0;
}
}
return 0;
}
void CMasterMulticastThread::IncrementChunkRefCount( CMasterMulticastThread::CMulticastFile *pFile, int iChunk )
{
CChunkInfo *pChunk = &pFile->m_Chunks[iChunk];
if ( pChunk->m_RefCount == 0 )
{
++m_nTotalActiveChunks;
// Move the chunk to the head of the list since it is now active.
pFile->m_ActiveChunks.Remove( pChunk->m_iActiveChunksIndex );
pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToHead( pChunk );
}
pChunk->m_RefCount++;
}
void CMasterMulticastThread::DecrementChunkRefCount( int iFile, int iChunk )
{
CMulticastFile *pFile = m_Files[iFile];
CChunkInfo *pChunk = &pFile->m_Chunks[iChunk];
if ( pChunk->m_RefCount == 0 )
{
Error( "CMasterMulticastThread::DecrementChunkRefCount - refcount already zero!\n" );
}
pChunk->m_RefCount--;
if ( pChunk->m_RefCount == 0 )
{
--m_nTotalActiveChunks;
// If this is the current chunk the thread is reading on, seek up to the next chunk so
// the thread doesn't spin off into the next file and skip its current file's contents.
if ( iFile == m_iCurFile && pChunk->m_iActiveChunksIndex == m_iCurActiveChunk )
{
m_iCurActiveChunk = pFile->m_ActiveChunks.Next( pChunk->m_iActiveChunksIndex );
}
// Move the chunk to the end of the list since it is now inactive.
pFile->m_ActiveChunks.Remove( pChunk->m_iActiveChunksIndex );
pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToTail( pChunk );
}
}
int CMasterMulticastThread::FindFile( const char *pName )
{
FOR_EACH_LL( m_Files, i )
{
if ( stricmp( m_Files[i]->GetFilename(), pName ) == 0 )
return i;
}
return -1;
}
bool CMasterMulticastThread::FindWarningSuppression( const char *pFilename )
{
FOR_EACH_LL( m_WarningSuppressions, i )
{
if ( Q_stricmp( m_WarningSuppressions[i], pFilename ) == 0 )
return true;
}
return false;
}
void CMasterMulticastThread::AddWarningSuppression( const char *pFilename )
{
char *pBlah = new char[ strlen( pFilename ) + 1 ];
strcpy( pBlah, pFilename );
m_WarningSuppressions.AddToTail( pBlah );
}
int CMasterMulticastThread::FindOrAddFile( const char *pFilename )
{
// See if we've already opened this file.
int iFile = FindFile( pFilename );
if ( iFile != -1 )
return iFile;
// Read in the file's data.
FILE *fp = fopen( pFilename, "rb" );
if ( !fp )
{
// It tends to show this once per worker, so only let it show a warning once.
//if ( !FindWarningSuppression( pFilename ) )
//{
// AddWarningSuppression( pFilename );
// Warning( "CMasterMulticastThread::AddFileRequest( %s ): fopen() failed\n", pFilename );
//}
return -1;
}
CMulticastFile *pFile = new CMulticastFile;
fseek( fp, 0, SEEK_END );
pFile->m_UncompressedData.SetSize( ftell( fp ) );
fseek( fp, 0, SEEK_SET );
fread( pFile->m_UncompressedData.Base(), 1, pFile->m_UncompressedData.Count(), fp );
fclose( fp );
return FinishFileSetup( pFile, pFilename );
}
int CMasterMulticastThread::FinishFileSetup( CMulticastFile *pFile, const char *pFilename )
{
// Compress the file's contents.
if ( !ZLibCompress( pFile->m_UncompressedData.Base(), pFile->m_UncompressedData.Count(), pFile->m_Data ) )
{
delete pFile;
return -1;
}
pFile->m_Filename.SetSize( strlen( pFilename ) + 1 );
strcpy( pFile->m_Filename.Base(), pFilename );
pFile->m_nCycles = 0;
pFile->m_Info.m_CompressedSize = pFile->m_Data.Count();
pFile->m_Info.m_UncompressedSize = pFile->m_UncompressedData.Count();
pFile->m_Info.m_nChunks = PAD_NUMBER( pFile->m_Info.m_CompressedSize, MULTICAST_CHUNK_PAYLOAD_SIZE ) / MULTICAST_CHUNK_PAYLOAD_SIZE;
// Initialize the chunks.
pFile->m_Chunks.SetSize( pFile->m_Info.m_nChunks );
for ( int i=0; i < pFile->m_Chunks.Count(); i++ )
{
CChunkInfo *pChunk = &pFile->m_Chunks[i];
pChunk->m_iChunk = (unsigned short)i;
pChunk->m_RefCount = 0;
pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToTail( pChunk );
}
// Get this file in the queue.
EnterCriticalSection( &m_CS );
pFile->m_Info.m_FileID = m_Files.AddToTail( pFile );
LeaveCriticalSection( &m_CS );
return pFile->m_Info.m_FileID;
}
const CUtlVector<char>& CMasterMulticastThread::GetFileData( int iFile ) const
{
return m_Files[iFile]->m_UncompressedData;
}
// -------------------------------------------------------------------------------------------------------------- //
// The VMPI file system. The master runs a thread that multicasts the contents of all the
// files being used in the session. The workers read whatever files they need out of the stream.
// -------------------------------------------------------------------------------------------------------------- //
class CWorkerFile
{
public:
const char* GetFilename() { return m_Filename.Base(); }
bool IsReadyToRead() const { return m_nChunksToReceive == 0; }
public:
CFastTimer m_Timer; // To see how long it takes to download the file.
// This is false until we get any packets about the file. In the packets,
// we find out what the size is supposed to be.
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?