📄 peerlink.cpp
字号:
int CPeerLink::processRecvData(){ if(!_handShaked) { if(_recvBuffer.size() >= 68) { std::string handShakeStr = _recvBuffer.substr(0, 68); _recvBuffer.erase(0, 68); if(!checkHandshake(handShakeStr)) { return -1; } } } else { for(;_recvBuffer.size() >= 4;) { unsigned int len = *((unsigned int*)_recvBuffer.data()); len = ntohl(len); if(len == 0) { //LOG_DEBUG("++++++++++++++ "<<_ip<< " ++++++++++recv cmd keepalive"); _recvBuffer.erase(0, 4); continue; } if(_recvBuffer.size() < 5) { break; } unsigned char cmdID = _recvBuffer[4]; if(_recvBuffer.size() >= size_t(len+4)) { if(-1 == processCmd(cmdID, (void*)(_recvBuffer.data()+5), len-1)) { return -1; } _recvBuffer.erase(0, len+4); } else { break; } } } return 0;}bool CPeerLink::checkHandshake(std::string info){ if(info[0] != 19) { LOG_DEBUG("handshake[0] != 19, ip="<<_ip); return false; } std::string str = info.substr(1, 19); if(str != "BitTorrent protocol") { LOG_DEBUG("handshake[protocol] != BitTorrent protocol"); return false; } str = info.substr(28, 20); std::string str2; str2.append((const char*)_manager->getBTTask()->getTorrentFile()->getInfoHash(), 20); if(str != str2) { LOG_DEBUG("handshake[info_hash] error"); return false; } _id = info.substr(48, 20); //LOG_DEBUG("+++++++++++++++++ handshake ok, ip="<<_ip); _handShaked = true; sendBitfield(); sendInterested(true); //alloc peer's bitSet _bitSet.alloc(_manager->getBTTask()->getTorrentFile()->getPieceCount()); return true;}int CPeerLink::processCmd(unsigned cmd, void* data, size_t dataLen){// LOG_DEBUG("++++++++++++++ "<<_ip<< " ++++++++++recv cmd "<<cmd<<", dateLen="<<dataLen); switch(cmd) { case 0: if(processCmdChoke(data, dataLen) == -1) { return -1; } break; case 1: if(processCmdUnchoke(data, dataLen) == -1) { return -1; } break; case 2: if(processCmdInterested(data, dataLen) == -1) { return -1; } break; case 3: if(processCmdNotInterested(data, dataLen) == -1) { return -1; } break; case 4: if(processCmdHave(data, dataLen) == -1) { return -1; } break; case 5: if(processCmdBitfield(data,dataLen) == -1) { return -1; } break; case 6: if(processCmdRequest(data,dataLen) == -1) { return -1; } break; case 7: if(processCmdPiece(data,dataLen) == -1) { return -1; } break; case 8: if(processCmdCancel(data,dataLen) == -1) { return -1; } break; } return 0;}int CPeerLink::processCmdChoke(void* data, size_t dataLen){ LOG_DEBUG("choked by " << _ip); _peerChoking = true; //_manager->getBTTask()->getStorage()->abandonPieceTask(_pieceRequest.getPieceIndex()); //_pieceRequest.alloc(NONE_PIECE_INDEX, 0, 0); _pieceRequest.clearRequest(); return 0;}int CPeerLink::processCmdUnchoke(void* data, size_t dataLen){ LOG_DEBUG("unchoked by " << _ip); _peerChoking = false; getNewPieceTask(); doPieceRequest(); return 0;}int CPeerLink::processCmdInterested(void* data, size_t dataLen){ LOG_DEBUG("interested by " << _ip); _peerInterested = true; return 0;}int CPeerLink::processCmdNotInterested(void* data, size_t dataLen){ LOG_DEBUG("uninterested by " << _ip); _peerInterested = false; return 0;}int CPeerLink::processCmdHave(void* data, size_t dataLen){ if(dataLen != 4) { return -1; } unsigned int index = *((unsigned int*)data); index = ntohl(index); _bitSet.set(index, true); if(!_peerChoking) { getNewPieceTask(); doPieceRequest(); } return 0;}int CPeerLink::processCmdBitfield(void* data, size_t dataLen){ //LOG_DEBUG("CPeerLink::processCmdBitfield"); std::string bitset; bitset.append((const char*)data, dataLen); _bitSet.alloc(bitset, _manager->getBTTask()->getTorrentFile()->getPieceCount()); std::string log; for(unsigned int i =0; i<_bitSet.getSize(); ++i) { if(_bitSet.isSet(i)) { log.append("1"); } else { log.append("0"); } } _bitSetRecved = true; //LOG_DEBUG("++++++bitset "<<log); return 0;}int CPeerLink::processCmdRequest(void* data, size_t dataLen){ if(dataLen != 12) { return -1; } if(_amChoking) { //I am choking this client return 0; } unsigned int index = *((unsigned int*)data); index = ntohl(index); unsigned int offset = *((unsigned int*)((char*)data+4)); offset = ntohl(offset); unsigned int len = *((unsigned int*)((char*)data+8)); len = ntohl(len); //LOG_DEBUG(">>>>>>>>>>>> PieceRequest, index="<<index<<" offset="<<offset<<" len="<<len<<" ip="<<_ip); TPeerPieceRequestList::iterator iter = _peerRequestList.begin(); for(;iter!=_peerRequestList.end();++iter) { if(iter->index == index && iter->offset == offset) { break; } } if(iter == _peerRequestList.end()) { TPeerPieceRequest peerRequest; peerRequest.index = index; peerRequest.offset = offset; peerRequest.len = len; _peerRequestList.push_back(peerRequest); } doPieceSend(); return 0;}void CPeerLink::doPieceSend(){ if(!_amChoking && _sendBuffer.size() == 0 && _peerRequestList.size() > 0) { TPeerPieceRequest peerRequest = _peerRequestList.front(); _peerRequestList.pop_front(); std::string pieceDdata = _manager->getBTTask()->getStorage()->readData(peerRequest.index, peerRequest.offset, peerRequest.len); if(pieceDdata.size() == peerRequest.len) { //LOG_DEBUG(">>>>>>>>>> send piece data, index="<<peerRequest.index<<" offset="<<peerRequest.offset<<" len="<<peerRequest.len<<" ip="<<_ip); sendPieceData(peerRequest.index, peerRequest.offset, pieceDdata); } }}int CPeerLink::processCmdPiece(void* data, size_t dataLen){ if(dataLen <= 8) { return -1; } unsigned int index = *((unsigned int*)data); index = ntohl(index); unsigned int offset = *((unsigned int*)((char*)data+4)); offset = ntohl(offset); unsigned len = dataLen - 8; if(index != _pieceRequest.getPieceIndex()) { return -1; } if(!_pieceRequest.addPieceData(offset, (const char*)data+8, len)) { return -1; } else { //LOG_DEBUG("<<<<<<<<< recv piece data, index="<<index<<" offset="<<offset<<" len="<<len); } if( _pieceRequest.complete()) { std::string pieceData = _pieceRequest.getPiece(); if(shaString((char*)pieceData.data(), pieceData.size()) == _manager->getBTTask()->getTorrentFile()->getPieceHash(index)) { //LOG_DEBUG("<<<<<<<<<< piece "<<index<<" download completed"); _manager->getBTTask()->getStorage()->writePiece(index, pieceData); _manager->broadcastHave(index); getNewPieceTask(); } else { LOG_DEBUG("<<<<<<<< piece hash error! "<<index); _pieceRequest.alloc(NONE_PIECE_INDEX, 0, 0); closeLink(); } } doPieceRequest(); return 0;}int CPeerLink::processCmdCancel(void* data, size_t dataLen){ if(dataLen != 12) { return -1; } unsigned int index = *((unsigned int*)data); index = ntohl(index); unsigned int offset = *((unsigned int*)((char*)data+4)); offset = ntohl(offset); unsigned int len = *((unsigned int*)((char*)data+8)); len = ntohl(len); TPeerPieceRequestList::iterator iter = _peerRequestList.begin(); for(;iter!=_peerRequestList.end();++iter) { if(iter->index == index && iter->offset == offset) { _peerRequestList.erase(iter); break; } } return 0;}void CPeerLink::notifyHavePiece(unsigned int pieceIndex){ if(_handShaked) { if(_pieceRequest.getPieceIndex() == pieceIndex) { cancelPieceRequest(pieceIndex); } sendHave(pieceIndex); }}void CPeerLink::cancelPieceRequest(unsigned int pieceIndex){ if(_pieceRequest.getPieceIndex() == pieceIndex) { unsigned int index; unsigned int offset; unsigned int len; while(_pieceRequest.cancelPendingRequest(index, offset, len)) { sendPieceCancel(index, offset, len); } _pieceRequest.alloc(NONE_PIECE_INDEX, 0, 0); if(!_peerChoking) { getNewPieceTask(); doPieceRequest(); } }}void CPeerLink::getNewPieceTask(){ if(!_pieceRequest.complete()) { return; } if(_manager->getBTTask()->getStorage()->finished()) { return; } if(_peerChoking) { return; } //get new task; unsigned int pieceIndex = _manager->getBTTask()->getStorage()->getPieceTask(&_bitSet); if( pieceIndex != NONE_PIECE_INDEX) { if(!_amInterested) { sendInterested(true); } LOG_DEBUG("new piece task, index="<< pieceIndex<<" ip="<<_ip); _pieceRequest.alloc(pieceIndex, _manager->getBTTask()->getStorage()->getPieceLength(pieceIndex), REQUEST_BLOCK_SIZE); } else { if(_amInterested) { sendInterested(false); } }}void CPeerLink::doPieceRequest(){ if(_peerChoking) { return; } //do request for(;_pieceRequest.getPendingCount()<5;) { unsigned int index, offset, len; if(!_pieceRequest.getRequest(index, offset, len)) { break; } //LOG_DEBUG("<<<<<<<<<< doPieceRequest, index="<<index<<" offset="<<offset<<" len="<<len<<" ip="<<_ip); sendPieceRequest(index, offset, len); } }bool CPeerLink::peerChoked(){ return _amChoking;}bool CPeerLink::peerInterested(){ return _peerInterested;}void CPeerLink::chokePeer(bool choke){ if(choke) { LOG_DEBUG("********chokePeer, ip="<<_ip<<" downloadspeed="<<_downloadSpeed); _peerRequestList.clear(); } else { LOG_DEBUG("********unchokePeer, ip="<<_ip<<" downloadspeed="<<_downloadSpeed); } sendChoke(choke);}unsigned int CPeerLink::getDownloadCount(){ return _downloadCount;}unsigned int CPeerLink::getUploadCount(){ return _uploadCount;}void CPeerLink::countSpeed(){ _uploadSpeed = (_uploadCount - _lastUploadCount)*1000/(GetTickCount()-_lastCountSpeedTime+1); _downloadSpeed = (_downloadCount - _lastDownloadCount)*1000/(GetTickCount()-_lastCountSpeedTime+1); //LOG_DEBUG("+++++++++ "<<_ip<<" _uploadSpeed="<<_uploadSpeed<<" _downloadSpeed="<<_downloadSpeed); _lastCountSpeedTime = GetTickCount(); _lastUploadCount = _uploadCount; _lastDownloadCount = _downloadCount;}bool CPeerLink::checkNeedClose(){ if(!_bitSetRecved) { return false; } if( _manager->getBTTask()->getStorage()->finished()) { if(_bitSet.isAllSet()) { return true; } } for(int i=0; i<_bitSet.getSize(); ++i) { if(_manager->getBTTask()->getStorage()->getBanedBitSet()->isSet(i)) { continue; } if(_bitSet.isSet(i) && (!_manager->getBTTask()->getStorage()->getBitSet()->isSet(i))) { return false; } } return true;}unsigned int CPeerLink::getUploadSpeed(){ return _uploadSpeed;}unsigned int CPeerLink::getDownloadSpeed(){ return _downloadSpeed;}void CPeerLink::onDownloadComplete(){ sendInterested(false); _pieceRequest.alloc(NONE_PIECE_INDEX, 0, 0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -