📄 peerlink.cpp
字号:
/*************************************************************************** * Copyright (C) 2005-2006 Gao Xianchao * * 2007 Gao Xianchao gnap_an linux_lyb ahlongxp * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the * * Free Software Foundation, Inc., * * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * ***************************************************************************//* * Author: gxc * Create data: 2005-10-14 20:09 */ #include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <errno.h>#include "PeerLink.h"#include "log.h"#include "utils.h"#define REQUEST_BLOCK_SIZE 16*1024#define RECF_BUFFER_SIZE 8*1024CPeerLink::CPeerLink(): _state(PS_INIT), _accepted(false), _bitSetRecved(false), _canRead(false), _canWrite(true){}CPeerLink::~CPeerLink(){}void CPeerLink::connect(const char* ip, unsigned short port){ CSocket::createTCPSocket(); CSocket::setReactor(_manager->getBTTask()->getSocketReactor()); _ip = ip; _port = port; //LOG_DEBUG("CPeerLink::connect, fd = "<<getHandle()<<" ip = "<<_ip<<" port = "<<_port); maskWrite(true); CSocket::connect(ip, port); _state = PS_CONNECTING; _connectTimeoutTimer = getReactor()->addTimer(this, 10000, true);}void CPeerLink::attach(int handle, const char* ip, unsigned short port, IPeerManager* manager){ _ip = ip; _port = port; _accepted = true; _state = PS_ESTABLISHED; _connectTimeoutTimer = 0; CSocket::attach(handle); setPeerManager(manager); CSocket::setReactor(_manager->getBTTask()->getSocketReactor()); onConnect();}bool CPeerLink::isAccepted(){ return _accepted;}void CPeerLink::closeLink(){ handleClose();}void CPeerLink::onConnect(){ //LOG_DEBUG("CPeerLink::onConnect, fd = "<<getHandle()<<" ip = "<<_ip<<" port = "<<_port); maskRead(true); _sendBuffer.clear(); _recvBuffer.clear(); _handShaked = false; _pieceRequest.alloc(NONE_PIECE_INDEX, 0, 0); _amChoking = true; _amInterested = false; _peerChoking = true;; _peerInterested = false; _downloadCount = 0; _uploadCount = 0; _lastCountSpeedTime = GetTickCount(); _lastDownloadCount = 0; _lastUploadCount = 0; _uploadSpeed = 0; _downloadSpeed = 0; _canRead = false; _canWrite = true; LOG_DEBUG("+++++ Add RateMeasure Client, ip="<<_ip); _manager->getBTTask()->getRateMeasure()->addClient(this); sendHandshake();}void CPeerLink::onConnectFailed(){ //LOG_DEBUG("CPeerLink::onConnectFailed, fd = "<<getHandle()<<" ip = "<<_ip<<" port = "<<_port);}void CPeerLink::onConnectClosed(){ //LOG_DEBUG("CPeerLink::onConnectClose, fd = "<<getHandle()<<" ip = "<<_ip<<" port = "<<_port); LOG_DEBUG("+++++ remove RateMeasure Client, ip="<<_ip); _manager->getBTTask()->getRateMeasure()->removeClient(this); _manager->getBTTask()->getStorage()->abandonPieceTask(_pieceRequest.getPieceIndex()); _pieceRequest.alloc(NONE_PIECE_INDEX, 0, 0);}void CPeerLink::onSendComplete(){ doPieceSend();}int CPeerLink::handleRead(){ LOG_DEBUG("CPeerLink::handleRead, ip = "<<_ip<<", _canRead="<<_canRead); _canRead = true; return 0;}int CPeerLink::handleWrite(){ if(_state == PS_CONNECTING) { _state = PS_ESTABLISHED; maskWrite(false); onConnect(); return 0; } LOG_DEBUG("CPeerLink::handleWrite, ip = "<<_ip<<", _canWrite="<<_canWrite); _canWrite = true; return 0;}void CPeerLink::handleClose(){ //LOG_DEBUG("CPeerLink::handleClose, fd = "<<getHandle()); if(_state == PS_CONNECTING) { _state = PS_CONNECTFAILED; onConnectFailed(); } if(_state == PS_ESTABLISHED) { _state = PS_CLOSED; onConnectClosed(); } if(_connectTimeoutTimer != 0) { getReactor()->removeTimer(_connectTimeoutTimer); _connectTimeoutTimer = 0; } _canRead = false; _canWrite = false; setReactor(NULL); CSocket::close(); }void CPeerLink::blockWrite(bool block){ if(!block) { if((_sendBuffer.size() > 0) && !_canWrite) { if(!maskWrite()) { maskWrite(true); } } return; } if(block && maskWrite()) { maskWrite(false); } }void CPeerLink::blockRead(bool block){ if(block && maskRead()) { maskRead(false); return; } if(!block && !maskRead()) { maskRead(true); }}void CPeerLink::setWritePriority(unsigned int Priority){ _writePriority = Priority;}unsigned int CPeerLink::getWritePriority(){ return _writePriority;}void CPeerLink::setReadPriority(unsigned int Priority){ _readPriority = Priority;}unsigned int CPeerLink::getReadPriority(){ return _readPriority;}bool CPeerLink::canWrite(){ return _canWrite && (_sendBuffer.size() > 0);}bool CPeerLink::canRead(){ return _canRead;}int CPeerLink::doWrite(unsigned int count){ LOG_DEBUG("+++ doWrite, count = "<<count<<" ip="<<_ip<<" _sendBuffer.size()="<<_sendBuffer.size()); unsigned int sendCount = 0; if(_state == PS_ESTABLISHED) { for(;(_sendBuffer.size()>0) && (sendCount<count);) { size_t sendSize = _sendBuffer.size(); if(sendSize > count) { sendSize = count; } int ret = send(getHandle(), _sendBuffer.data(), sendSize, 0); if(ret == -1) { if(errno == EAGAIN || errno == EINTR) { //LOG_DEBUG("+++ _canWrite = false ip="<<_ip); _canWrite = false; } else { //error closeLink(); return sendCount; } break; } sendCount += ret; _uploadCount += ret; _manager->getBTTask()->incUploadCount(ret); _sendBuffer.erase(0, ret); if(_sendBuffer.size() == 0) { onSendComplete(); break; } } } //LOG_DEBUG("_canWrite="<<_canWrite<<", maskWrite()="<<maskWrite()); if(_canWrite) { if(maskWrite()) { //LOG_DEBUG("++++ maskwrite(false), ip="<<_ip<<" sendCount="<<sendCount); maskWrite(false); } } else { if(!maskWrite()) { //LOG_DEBUG("++++ maskwrite(true), ip="<<_ip<<" sendCount="<<sendCount); maskWrite(true); } } return sendCount;}int CPeerLink::doRead(unsigned int count){ LOG_DEBUG("+++ doRead, count = "<<count<<" ip="<<_ip); unsigned int readCount = 0; char* buf = new char[RECF_BUFFER_SIZE]; for(;readCount<count;) { size_t readSize = RECF_BUFFER_SIZE; if(readSize > count - readCount) { readSize = count - readCount; } int ret = recv(getHandle(), buf, readSize, 0); if(ret == 0) { closeLink(); //LOG_DEBUG("recv return 0, fd="<<getHandle()); delete[] buf; return readCount; } if(ret == -1) { if(errno == EAGAIN) { LOG_DEBUG("+++ _canRead = false ip="<<_ip); _canRead = false; break; } if(errno == EINTR) { //LOG_DEBUG("recv return EINTR, fd="<<getHandle()); continue; } closeLink(); //error delete[] buf; return readCount; } //LOG_DEBUG("recv "<<ret<<" bytes, ip="<<_ip); if(ret > 0) { readCount += ret; _downloadCount += ret; _manager->getBTTask()->incDownlaodCount(ret); _recvBuffer.append((const char*)buf, ret); } } delete[] buf; processRecvData(); LOG_DEBUG("_canRead="<<_canRead<<", maskRead()="<<maskRead()<<", readCount="<<readCount); if(_canRead) { if(maskRead()) { LOG_DEBUG("++++ maskRead(false), ip="<<_ip); maskRead(false); } } else { if(!maskRead()) { LOG_DEBUG("++++ maskRead(true), ip="<<_ip); maskRead(true); } } return readCount;}TPeerState CPeerLink::getState(){ return _state;}void CPeerLink::setPeerManager(IPeerManager* manager){ _manager = manager;}const char* CPeerLink::getIP(){ return _ip.c_str();}unsigned short CPeerLink::getPort(){ return _port;}void CPeerLink::onTimer(unsigned int id){ if(id == _connectTimeoutTimer) { _connectTimeoutTimer = 0; if(_state == PS_CONNECTING) { //LOG_DEBUG("connection timeout, fd = "<<getHandle()); closeLink(); } }}void CPeerLink::sendHandshake(){ char buf[68]; memset(buf, 0, sizeof(buf)); buf[0] = 19; strncpy(buf+1, "BitTorrent protocol", 19); strncpy(buf+28, (const char*)_manager->getBTTask()->getTorrentFile()->getInfoHash(), 20); strncpy(buf+48, (const char*)_manager->getBTTask()->getPeerID().c_str(), 20); sendData(buf, sizeof(buf));}void CPeerLink::sendChoke(bool chocke){ char buf[5]; *((int*)buf) = htonl(1); if(chocke) { *((char*)(buf+4)) = 0; _amChoking = true; } else { *((char*)(buf+4)) = 1; _amChoking = false; } sendData(buf, sizeof(buf));}void CPeerLink::sendInterested(bool interested){ char buf[5]; *((int*)buf) = htonl(1); if(interested) { *((char*)(buf+4)) = 2; _amInterested = true; } else { *((char*)(buf+4)) = 3; _amInterested = false; } sendData(buf, sizeof(buf)); }void CPeerLink::sendBitfield(){ std::string bitfield = _manager->getBTTask()->getStorage()->getBitfield(); if(bitfield.size() == 0) { return; } std::string buf; char header[5]; *((int*)(header)) = htonl(1 + bitfield.size()); *((char*)(header+4)) = 5; buf.append(header, 5); buf.append(bitfield.data(), bitfield.size()); sendData(buf.data(), buf.size());}void CPeerLink::sendHave(unsigned int pieceIndex){ char buf[9]; *((unsigned int*)buf) = htonl(5); *((unsigned char*)(buf+4)) = 4; *((unsigned int*)(buf+5)) = htonl(pieceIndex); sendData(buf, sizeof(buf));}void CPeerLink::sendPieceRequest(unsigned int pieceIndex, unsigned int offset, unsigned int len){ char buf[17]; *((unsigned int*)buf) = htonl(13); *((unsigned char*)(buf+4)) = 6; *((unsigned int*)(buf+5)) = htonl(pieceIndex); *((unsigned int*)(buf+9)) = htonl(offset); *((unsigned int*)(buf+13)) = htonl(len); sendData(buf, sizeof(buf));}void CPeerLink::sendPieceData(unsigned int pieceIndex, unsigned int offset, std::string& data){ char buf[13]; *((unsigned int*)buf) = htonl(9+data.size()); *((unsigned char*)(buf+4)) = 7; *((unsigned int*)(buf+5)) = htonl(pieceIndex); *((unsigned int*)(buf+9)) = htonl(offset); std::string pkg; pkg.append((const char*)buf, sizeof(buf)); pkg.append(data.c_str(), data.size()); sendData(pkg.data(), pkg.size());}void CPeerLink::sendPieceCancel(unsigned int pieceIndex, unsigned int offset, unsigned int len){ char buf[17]; *((unsigned int*)buf) = htonl(13); *((unsigned char*)(buf+4)) = 8; *((unsigned int*)(buf+5)) = htonl(pieceIndex); *((unsigned int*)(buf+9)) = htonl(offset); *((unsigned int*)(buf+13)) = htonl(len); sendData(buf, sizeof(buf)); }void CPeerLink::sendData(const void* data, size_t len){ _sendBuffer.append((const char*)data, len);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -