⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 peerlink.cpp

📁 这是一个嵌入式linux系统下的BT下载工具包
💻 CPP
📖 第 1 页 / 共 2 页
字号:
/*************************************************************************** *   Copyright (C) 2005-2006 Gao Xianchao                                  * *                 2007 Gao Xianchao gnap_an linux_lyb ahlongxp            * *                                                                         * *   This program is free software; you can redistribute it and/or modify  * *   it under the terms of the GNU General Public License as published by  * *   the Free Software Foundation; either version 2 of the License, or     * *   (at your option) any later version.                                   * *                                                                         * *   This program is distributed in the hope that it will be useful,       * *   but WITHOUT ANY WARRANTY; without even the implied warranty of        * *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         * *   GNU General Public License for more details.                          * *                                                                         * *   You should have received a copy of the GNU General Public License     * *   along with this program; if not, write to the                         * *   Free Software Foundation, Inc.,                                       * *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             * ***************************************************************************//* * Author:	gxc * Create data:	2005-10-14 20:09 */ #include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <errno.h>#include "PeerLink.h"#include "log.h"#include "utils.h"#define REQUEST_BLOCK_SIZE 16*1024#define RECF_BUFFER_SIZE 8*1024CPeerLink::CPeerLink(): _state(PS_INIT), _accepted(false), _bitSetRecved(false), _canRead(false), _canWrite(true){}CPeerLink::~CPeerLink(){}void CPeerLink::connect(const char* ip, unsigned short port){	CSocket::createTCPSocket();	CSocket::setReactor(_manager->getBTTask()->getSocketReactor());	_ip = ip;	_port = port;		//LOG_DEBUG("CPeerLink::connect, fd = "<<getHandle()<<" ip = "<<_ip<<" port = "<<_port);		maskWrite(true);	CSocket::connect(ip, port);	_state = PS_CONNECTING;		_connectTimeoutTimer = getReactor()->addTimer(this, 10000, true);}void CPeerLink::attach(int handle, const char* ip, unsigned short port, IPeerManager* manager){	_ip = ip;	_port = port;		_accepted = true;	_state = PS_ESTABLISHED;	_connectTimeoutTimer = 0;	CSocket::attach(handle);	setPeerManager(manager);	CSocket::setReactor(_manager->getBTTask()->getSocketReactor());	onConnect();}bool CPeerLink::isAccepted(){	return _accepted;}void CPeerLink::closeLink(){	handleClose();}void CPeerLink::onConnect(){	//LOG_DEBUG("CPeerLink::onConnect, fd = "<<getHandle()<<" ip = "<<_ip<<" port = "<<_port);		maskRead(true);	_sendBuffer.clear();	_recvBuffer.clear();	_handShaked = false;		_pieceRequest.alloc(NONE_PIECE_INDEX, 0, 0);		 _amChoking = true;	_amInterested = false;	_peerChoking = true;;	_peerInterested = false;		_downloadCount = 0;	_uploadCount = 0;	_lastCountSpeedTime = GetTickCount();	_lastDownloadCount = 0;	_lastUploadCount = 0;	_uploadSpeed = 0;	_downloadSpeed = 0;		_canRead = false;	_canWrite = true;		LOG_DEBUG("+++++ Add RateMeasure Client, ip="<<_ip);	_manager->getBTTask()->getRateMeasure()->addClient(this);			sendHandshake();}void CPeerLink::onConnectFailed(){	//LOG_DEBUG("CPeerLink::onConnectFailed, fd = "<<getHandle()<<" ip = "<<_ip<<" port = "<<_port);}void CPeerLink::onConnectClosed(){	//LOG_DEBUG("CPeerLink::onConnectClose, fd = "<<getHandle()<<" ip = "<<_ip<<" port = "<<_port);	LOG_DEBUG("+++++ remove RateMeasure Client, ip="<<_ip);	_manager->getBTTask()->getRateMeasure()->removeClient(this);		_manager->getBTTask()->getStorage()->abandonPieceTask(_pieceRequest.getPieceIndex());	_pieceRequest.alloc(NONE_PIECE_INDEX, 0, 0);}void CPeerLink::onSendComplete(){	doPieceSend();}int CPeerLink::handleRead(){	LOG_DEBUG("CPeerLink::handleRead, ip = "<<_ip<<", _canRead="<<_canRead);	_canRead = true;		return 0;}int CPeerLink::handleWrite(){	if(_state == PS_CONNECTING)	{		_state = PS_ESTABLISHED;		maskWrite(false);		onConnect();				return 0;	}		LOG_DEBUG("CPeerLink::handleWrite, ip = "<<_ip<<", _canWrite="<<_canWrite);		_canWrite = true;				return 0;}void CPeerLink::handleClose(){	//LOG_DEBUG("CPeerLink::handleClose, fd = "<<getHandle());		if(_state == PS_CONNECTING)	{		_state = PS_CONNECTFAILED;		onConnectFailed();	}		if(_state == PS_ESTABLISHED)	{		_state = PS_CLOSED;		onConnectClosed();			}		if(_connectTimeoutTimer != 0)	{	 	getReactor()->removeTimer(_connectTimeoutTimer);	 	_connectTimeoutTimer = 0;	}			_canRead = false;	_canWrite = false;	setReactor(NULL);	CSocket::close();	}void CPeerLink::blockWrite(bool block){	if(!block)	{		if((_sendBuffer.size() > 0)			&& !_canWrite)		{			if(!maskWrite())			{				maskWrite(true);			}			}		return;	}		if(block		&& maskWrite())	{		maskWrite(false);	}	}void CPeerLink::blockRead(bool block){	if(block		&& maskRead())	{		maskRead(false);		return;	}		if(!block		&& !maskRead())	{		maskRead(true);	}}void CPeerLink::setWritePriority(unsigned int Priority){	_writePriority = Priority;}unsigned int CPeerLink::getWritePriority(){	return _writePriority;}void CPeerLink::setReadPriority(unsigned int Priority){	_readPriority = Priority;}unsigned int CPeerLink::getReadPriority(){	return _readPriority;}bool CPeerLink::canWrite(){	return _canWrite && (_sendBuffer.size() > 0);}bool CPeerLink::canRead(){	return _canRead;}int CPeerLink::doWrite(unsigned int count){		LOG_DEBUG("+++ doWrite, count = "<<count<<" ip="<<_ip<<" _sendBuffer.size()="<<_sendBuffer.size());	unsigned int sendCount = 0;		if(_state == PS_ESTABLISHED)	{		for(;(_sendBuffer.size()>0) && (sendCount<count);)		{				size_t sendSize = _sendBuffer.size();			if(sendSize > count)			{				sendSize = count;			}			int ret = send(getHandle(), _sendBuffer.data(), sendSize, 0);			if(ret == -1)			{				if(errno == EAGAIN					|| errno == EINTR)				{					//LOG_DEBUG("+++ _canWrite = false ip="<<_ip);					_canWrite = false;							}				else				{					//error					closeLink();					return sendCount;				}								break;			}						sendCount += ret;			_uploadCount += ret;			_manager->getBTTask()->incUploadCount(ret);						_sendBuffer.erase(0, ret);						if(_sendBuffer.size() == 0)			{				onSendComplete();				break;			}		}	}		//LOG_DEBUG("_canWrite="<<_canWrite<<", maskWrite()="<<maskWrite());		if(_canWrite)	{		if(maskWrite())		{			//LOG_DEBUG("++++ maskwrite(false), ip="<<_ip<<" sendCount="<<sendCount);			maskWrite(false);		}	}	else	{		if(!maskWrite())		{			//LOG_DEBUG("++++ maskwrite(true), ip="<<_ip<<" sendCount="<<sendCount);			maskWrite(true);		}	}		return 	sendCount;}int CPeerLink::doRead(unsigned int count){	LOG_DEBUG("+++ doRead, count = "<<count<<" ip="<<_ip);	unsigned int readCount = 0;		char* buf = new char[RECF_BUFFER_SIZE];		for(;readCount<count;)	{		size_t readSize = RECF_BUFFER_SIZE;		if(readSize > count - readCount)		{			readSize = count - readCount;		}				int ret = recv(getHandle(), buf, readSize, 0);		if(ret == 0)		{			closeLink();			//LOG_DEBUG("recv return 0, fd="<<getHandle());			delete[] buf;			return readCount;		}				if(ret == -1)		{			if(errno == EAGAIN)			{				LOG_DEBUG("+++ _canRead = false ip="<<_ip);				_canRead = false;				break;			}						if(errno == EINTR)			{				//LOG_DEBUG("recv return EINTR, fd="<<getHandle());				continue;			}						closeLink();			//error			delete[] buf;			return readCount;		}				//LOG_DEBUG("recv "<<ret<<" bytes, ip="<<_ip);				if(ret > 0)		{			readCount += ret;			_downloadCount += ret;			_manager->getBTTask()->incDownlaodCount(ret);						_recvBuffer.append((const char*)buf, ret);		}	}		delete[] buf;		processRecvData();			LOG_DEBUG("_canRead="<<_canRead<<", maskRead()="<<maskRead()<<", readCount="<<readCount);		if(_canRead)	{		if(maskRead())		{			LOG_DEBUG("++++ maskRead(false), ip="<<_ip);			maskRead(false);		}	}	else	{		if(!maskRead())		{			LOG_DEBUG("++++ maskRead(true), ip="<<_ip);			maskRead(true);		}	}		return readCount;}TPeerState CPeerLink::getState(){	return _state;}void CPeerLink::setPeerManager(IPeerManager* manager){	_manager = manager;}const char* CPeerLink::getIP(){	return _ip.c_str();}unsigned short CPeerLink::getPort(){	return _port;}void CPeerLink::onTimer(unsigned int id){	if(id == _connectTimeoutTimer)	{		_connectTimeoutTimer = 0;				if(_state == PS_CONNECTING)		{			//LOG_DEBUG("connection timeout, fd = "<<getHandle());			closeLink();		}	}}void CPeerLink::sendHandshake(){	char buf[68];	memset(buf, 0, sizeof(buf));		buf[0] = 19;	strncpy(buf+1, "BitTorrent protocol", 19);		strncpy(buf+28, (const char*)_manager->getBTTask()->getTorrentFile()->getInfoHash(), 20);	strncpy(buf+48, (const char*)_manager->getBTTask()->getPeerID().c_str(), 20);		sendData(buf, sizeof(buf));}void CPeerLink::sendChoke(bool chocke){	char buf[5];	*((int*)buf) = htonl(1);		if(chocke)	{		*((char*)(buf+4)) = 0;		_amChoking = true;	}	else	{		*((char*)(buf+4)) = 1;		_amChoking = false;	}		sendData(buf, sizeof(buf));}void CPeerLink::sendInterested(bool interested){	char buf[5];	*((int*)buf) = htonl(1);		if(interested)	{		*((char*)(buf+4)) = 2;		_amInterested = true;	}	else	{		*((char*)(buf+4)) = 3;		_amInterested = false;	}		sendData(buf, sizeof(buf));	}void  CPeerLink::sendBitfield(){	std::string bitfield = _manager->getBTTask()->getStorage()->getBitfield();	if(bitfield.size() == 0)	{		return;	}		std::string buf;	char header[5];		*((int*)(header)) =  htonl(1 + bitfield.size());	*((char*)(header+4))  = 5;		buf.append(header, 5);	buf.append(bitfield.data(), bitfield.size());		sendData(buf.data(), buf.size());}void CPeerLink::sendHave(unsigned int pieceIndex){	char buf[9];	*((unsigned int*)buf) = htonl(5);	*((unsigned char*)(buf+4)) = 4;	*((unsigned int*)(buf+5)) = htonl(pieceIndex);		sendData(buf, sizeof(buf));}void CPeerLink::sendPieceRequest(unsigned int pieceIndex, unsigned int offset, unsigned int len){	char buf[17];		*((unsigned int*)buf) = htonl(13);	*((unsigned char*)(buf+4)) = 6;	*((unsigned int*)(buf+5)) = htonl(pieceIndex);	*((unsigned int*)(buf+9)) = htonl(offset);	*((unsigned int*)(buf+13)) = htonl(len);		sendData(buf, sizeof(buf));}void CPeerLink::sendPieceData(unsigned int pieceIndex, unsigned int offset, std::string& data){	char buf[13];	*((unsigned int*)buf) = htonl(9+data.size());	*((unsigned char*)(buf+4)) = 7;	*((unsigned int*)(buf+5)) = htonl(pieceIndex);	*((unsigned int*)(buf+9)) = htonl(offset);			std::string pkg;	pkg.append((const char*)buf, sizeof(buf));	pkg.append(data.c_str(), data.size());		sendData(pkg.data(), pkg.size());}void CPeerLink::sendPieceCancel(unsigned int pieceIndex, unsigned int offset, unsigned int len){	char buf[17];		*((unsigned int*)buf) = htonl(13);	*((unsigned char*)(buf+4)) = 8;	*((unsigned int*)(buf+5)) = htonl(pieceIndex);	*((unsigned int*)(buf+9)) = htonl(offset);	*((unsigned int*)(buf+13)) = htonl(len);		sendData(buf, sizeof(buf));	}void CPeerLink::sendData(const void* data, size_t len){	_sendBuffer.append((const char*)data, len);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -