📄 datagrams.cpp
字号:
//
// Datagrams.cpp
//
// Copyright (c) Shareaza Development Team, 2002-2004.
// This file is part of SHAREAZA (www.shareaza.com)
//
// Shareaza is free software; you can redistribute it
// and/or modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2 of
// the License, or (at your option) any later version.
//
// Shareaza is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Shareaza; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
#include "StdAfx.h"
#include "Shareaza.h"
#include "Settings.h"
#include "Statistics.h"
#include "Network.h"
#include "Datagrams.h"
#include "Datagram.h"
#include "DatagramPart.h"
#include "Buffer.h"
#include "Handshakes.h"
#include "Neighbours.h"
#include "Neighbour.h"
#include "RouteCache.h"
#include "LocalSearch.h"
#include "SearchManager.h"
#include "QuerySearch.h"
#include "QueryHit.h"
#include "GProfile.h"
#include "CrawlSession.h"
#include "G2Neighbour.h"
#include "G2Packet.h"
#include "EDClients.h"
#include "EDPacket.h"
#include "Security.h"
#include "HostCache.h"
#include "QueryKeys.h"
#include "LibraryMaps.h"
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
#define HASH_SIZE 32
#define HASH_MASK 31
#define TEMP_BUFFER 4096
#define METER_MINIMUM 100
#define METER_LENGTH 24
#define METER_PERIOD 2000
#define METER_SECOND 1000
CDatagrams Datagrams;
//////////////////////////////////////////////////////////////////////
// CDatagrams construction
CDatagrams::CDatagrams()
{
m_hSocket = INVALID_SOCKET;
m_nSequence = 0;
m_bStable = FALSE;
ZeroMemory( &m_mInput, sizeof(m_mInput) );
ZeroMemory( &m_mOutput, sizeof(m_mOutput) );
m_nInBandwidth = m_nInFrags = m_nInPackets = 0;
m_nOutBandwidth = m_nOutFrags = m_nOutPackets = 0;
}
CDatagrams::~CDatagrams()
{
Disconnect();
}
//////////////////////////////////////////////////////////////////////
// CDatagrams listen
BOOL CDatagrams::Listen()
{
if ( m_hSocket != INVALID_SOCKET ) return FALSE;
m_hSocket = socket( PF_INET, SOCK_DGRAM, IPPROTO_UDP );
if ( m_hSocket == INVALID_SOCKET ) return FALSE;
SOCKADDR_IN saHost;
if ( Network.Resolve( Settings.Connection.InHost, Settings.Connection.InPort, &saHost ) )
{
// Inbound resolved
if ( ! Settings.Connection.InBind ) saHost.sin_addr.S_un.S_addr = 0;
}
else if ( Network.Resolve( Settings.Connection.OutHost, Settings.Connection.InPort, &saHost ) )
{
// Outbound resolved
}
else
{
saHost = Network.m_pHost;
if ( ! Settings.Connection.InBind ) saHost.sin_addr.S_un.S_addr = 0;
}
if ( bind( m_hSocket, (SOCKADDR*)&saHost, sizeof(saHost) ) == 0 )
{
theApp.Message( MSG_DEFAULT, IDS_NETWORK_LISTENING_UDP,
(LPCTSTR)CString( inet_ntoa( saHost.sin_addr ) ), htons( saHost.sin_port ) );
}
WSAEventSelect( m_hSocket, Network.m_pWakeup, FD_READ );
m_nBufferBuffer = Settings.Gnutella2.UdpBuffers; // 256;
m_pBufferBuffer = new CBuffer[ m_nBufferBuffer ];
m_pBufferFree = m_pBufferBuffer;
m_nBufferFree = m_nBufferBuffer;
CBuffer* pBuffer = m_pBufferBuffer;
for ( DWORD nPos = m_nBufferBuffer ; nPos ; nPos--, pBuffer++ )
{
pBuffer->m_pNext = ( nPos == 1 ) ? NULL : ( pBuffer + 1 );
}
m_nInputBuffer = Settings.Gnutella2.UdpInFrames; // 128;
m_pInputBuffer = new CDatagramIn[ m_nInputBuffer ];
m_pInputFree = m_pInputBuffer;
CDatagramIn* pDGI = m_pInputBuffer;
for ( nPos = m_nInputBuffer ; nPos ; nPos--, pDGI++ )
{
pDGI->m_pNextHash = ( nPos == 1 ) ? NULL : ( pDGI + 1 );
}
m_nOutputBuffer = Settings.Gnutella2.UdpOutFrames; // 128;
m_pOutputBuffer = new CDatagramOut[ m_nOutputBuffer ];
m_pOutputFree = m_pOutputBuffer;
CDatagramOut* pDGO = m_pOutputBuffer;
for ( nPos = m_nOutputBuffer ; nPos ; nPos--, pDGO++ )
{
pDGO->m_pNextHash = ( nPos == 1 ) ? NULL : ( pDGO + 1 );
}
ZeroMemory( m_pInputHash, sizeof(CDatagramIn*) * HASH_SIZE );
ZeroMemory( m_pOutputHash, sizeof(CDatagramIn*) * HASH_SIZE );
m_pInputFirst = m_pInputLast = NULL;
m_pOutputFirst = m_pOutputLast = NULL;
m_tLastWrite = 0;
m_nInFrags = m_nInPackets = 0;
m_nOutFrags = m_nOutPackets = 0;
return TRUE;
}
//////////////////////////////////////////////////////////////////////
// CDatagrams disconnect
void CDatagrams::Disconnect()
{
if ( m_hSocket == INVALID_SOCKET ) return;
closesocket( m_hSocket );
m_hSocket = INVALID_SOCKET;
delete [] m_pOutputBuffer;
m_pOutputBuffer = NULL;
m_nOutputBuffer = 0;
m_pOutputFirst = m_pOutputLast = m_pOutputFree = NULL;
delete [] m_pInputBuffer;
m_pInputBuffer = NULL;
m_nInputBuffer = 0;
m_pInputFirst = m_pInputLast = m_pInputFree = NULL;
delete [] m_pBufferBuffer;
m_pBufferBuffer = NULL;
m_nBufferBuffer = 0;
m_nInBandwidth = m_nInFrags = m_nInPackets = 0;
m_nOutBandwidth = m_nOutFrags = m_nOutPackets = 0;
}
//////////////////////////////////////////////////////////////////////
// CDatagrams stable test
BOOL CDatagrams::IsStable()
{
if ( m_hSocket == INVALID_SOCKET ) return FALSE;
if ( ! Network.IsListening() ) return FALSE;
// Are we stable OR know we are not firewalled
return m_bStable || ! Settings.Connection.Firewalled;
}
//////////////////////////////////////////////////////////////////////
// CDatagrams send
BOOL CDatagrams::Send(IN_ADDR* pAddress, WORD nPort, CPacket* pPacket, BOOL bRelease, LPVOID pToken, BOOL bAck)
{
SOCKADDR_IN pHost;
pHost.sin_family = PF_INET;
pHost.sin_addr = *pAddress;
pHost.sin_port = htons( nPort );
return Send( &pHost, pPacket, bRelease, pToken, bAck );
}
BOOL CDatagrams::Send(SOCKADDR_IN* pHost, CPacket* pPacket, BOOL bRelease, LPVOID pToken, BOOL bAck)
{
ASSERT( pHost != NULL && pPacket != NULL );
if ( m_hSocket == INVALID_SOCKET || Security.IsDenied( &pHost->sin_addr ) )
{
if ( bRelease ) pPacket->Release();
return FALSE;
}
if ( pPacket->m_nProtocol == PROTOCOL_ED2K )
{
CBuffer pBuffer;
((CEDPacket*)pPacket)->ToBufferUDP( &pBuffer );
pPacket->SmartDump( NULL, &pHost->sin_addr, TRUE );
if ( bRelease ) pPacket->Release();
if ( ntohs( pHost->sin_port ) != 4669 ) // Hack
{
sendto( m_hSocket, (LPSTR)pBuffer.m_pBuffer, pBuffer.m_nLength, 0,
(SOCKADDR*)pHost, sizeof(SOCKADDR_IN) );
}
return TRUE;
}
else if ( pPacket->m_nProtocol != PROTOCOL_G2 )
{
if ( bRelease ) pPacket->Release();
return FALSE;
}
if ( m_pOutputFree == NULL || m_pBufferFree == NULL )
{
if ( m_pOutputLast == NULL )
{
if ( bRelease ) pPacket->Release();
theApp.Message( MSG_DEBUG, _T("CDatagrams output frames exhausted.") );
return FALSE;
}
Remove( m_pOutputLast );
}
if ( m_pBufferFree == NULL )
{
if ( bRelease ) pPacket->Release();
theApp.Message( MSG_DEBUG, _T("CDatagrams output frames really exhausted.") );
return FALSE;
}
CDatagramOut* pDG = m_pOutputFree;
m_pOutputFree = m_pOutputFree->m_pNextHash;
if ( m_nInFrags < 1 ) bAck = FALSE;
pDG->Create( pHost, (CG2Packet*)pPacket, m_nSequence++, m_pBufferFree, bAck );
m_pBufferFree = m_pBufferFree->m_pNext;
m_nBufferFree--;
pDG->m_pToken = pToken;
pDG->m_pNextTime = NULL;
pDG->m_pPrevTime = m_pOutputFirst;
if ( m_pOutputFirst )
m_pOutputFirst->m_pNextTime = pDG;
else
m_pOutputLast = pDG;
m_pOutputFirst = pDG;
BYTE nHash = pHost->sin_addr.S_un.S_un_b.s_b1
+ pHost->sin_addr.S_un.S_un_b.s_b2
+ pHost->sin_addr.S_un.S_un_b.s_b3
+ pHost->sin_addr.S_un.S_un_b.s_b4
+ pHost->sin_port
+ pDG->m_nSequence;
CDatagramOut** pHash = m_pOutputHash + ( nHash & HASH_MASK );
if ( *pHash ) (*pHash)->m_pPrevHash = &pDG->m_pNextHash;
pDG->m_pNextHash = *pHash;
pDG->m_pPrevHash = pHash;
*pHash = pDG;
m_nOutPackets++;
pPacket->SmartDump( NULL, &pHost->sin_addr, TRUE );
#ifdef DEBUG_UDP
pPacket->Debug( _T("UDP Out") );
theApp.Message( MSG_DEBUG, _T("UDP: Queued (#%i) x%i for %s:%lu"),
pDG->m_nSequence, pDG->m_nCount,
(LPCTSTR)CString( inet_ntoa( pDG->m_pHost.sin_addr ) ),
htons( pDG->m_pHost.sin_port ) );
#endif
if ( bRelease ) pPacket->Release();
TryWrite();
return TRUE;
}
//////////////////////////////////////////////////////////////////////
// CDatagrams purge outbound fragments with a specified token
void CDatagrams::PurgeToken(LPVOID pToken)
{
CSingleLock pLock( &Network.m_pSection );
if ( ! pLock.Lock( 100 ) ) return;
int nCount = 0;
for ( CDatagramOut* pDG = m_pOutputLast ; pDG ; )
{
CDatagramOut* pNext = pDG->m_pNextTime;
if ( pDG->m_pToken == pToken )
{
Remove( pDG );
nCount++;
}
pDG = pNext;
}
if ( nCount ) theApp.Message( MSG_DEBUG, _T("CDatagrams::PurgeToken() = %i"), nCount );
}
//////////////////////////////////////////////////////////////////////
// CDatagrams run event handler
void CDatagrams::OnRun()
{
if ( m_hSocket == INVALID_SOCKET ) return;
TryWrite();
ManageOutput();
do
{
ManagePartials();
}
while ( TryRead() );
Measure();
}
//////////////////////////////////////////////////////////////////////
// CDatagrams measure
void CDatagrams::Measure()
{
DWORD tCutoff = GetTickCount() - METER_PERIOD;
DWORD* pInHistory = m_mInput.pHistory;
DWORD* pInTime = m_mInput.pTimes;
DWORD* pOutHistory = m_mOutput.pHistory;
DWORD* pOutTime = m_mOutput.pTimes;
DWORD nInput = 0;
DWORD nOutput = 0;
for ( int tNow = METER_LENGTH ; tNow ; tNow-- )
{
if ( *pInTime >= tCutoff ) nInput += *pInHistory;
if ( *pOutTime >= tCutoff ) nOutput += *pOutHistory;
pInHistory++, pInTime++;
pOutHistory++, pOutTime++;
}
m_nInBandwidth = m_mInput.nMeasure = nInput * 1000 / METER_PERIOD;
m_nOutBandwidth = m_mOutput.nMeasure = nOutput * 1000 / METER_PERIOD;
}
//////////////////////////////////////////////////////////////////////
// CDatagrams write datagrams
BOOL CDatagrams::TryWrite()
{
DWORD tNow = GetTickCount();
DWORD nLimit = 0xFFFFFFFF;
DWORD nTotal = 0;
if ( Settings.Live.BandwidthScale <= 100 )
{
DWORD tCutoff = tNow - METER_SECOND;
DWORD* pHistory = m_mOutput.pHistory;
DWORD* pTime = m_mOutput.pTimes;
DWORD nUsed = 0;
for ( int nSeek = METER_LENGTH ; nSeek ; nSeek--, pHistory++, pTime++ )
{
if ( *pTime >= tCutoff ) nUsed += *pHistory;
}
nLimit = Settings.Connection.OutSpeed * 128;
if ( Settings.Bandwidth.UdpOut != 0 ) nLimit = Settings.Bandwidth.UdpOut;
if ( Settings.Live.BandwidthScale < 100 )
{
nLimit = nLimit * Settings.Live.BandwidthScale / 100;
}
nLimit = ( nUsed >= nLimit ) ? 0 : ( nLimit - nUsed );
}
DWORD nLastHost = 0;
while ( nLimit > 0 )
{
for ( CDatagramOut* pDG = m_pOutputFirst ; pDG ; pDG = pDG->m_pPrevTime )
{
BYTE* pPacket;
DWORD nPacket;
if ( nLastHost == pDG->m_pHost.sin_addr.S_un.S_addr )
{
// Same host, skip it
}
else if ( pDG->GetPacket( tNow, &pPacket, &nPacket, m_nInFrags > 0 ) )
{
sendto( m_hSocket, (LPCSTR)pPacket, nPacket, 0,
(SOCKADDR*)&pDG->m_pHost, sizeof(SOCKADDR_IN) );
nLastHost = pDG->m_pHost.sin_addr.S_un.S_addr;
if ( nLimit >= nPacket )
nLimit -= nPacket;
else
nLimit = 0;
m_tLastWrite = GetTickCount();
nTotal += nPacket;
m_nOutFrags++;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -