📄 servent.cpp.svn-base
字号:
// ------------------------------------------------// File : servent.cpp// Date: 4-apr-2002// Author: giles// Desc: // Servents are the actual connections between clients. They do the handshaking,// transfering of data and processing of GnuPackets. Each servent has one socket allocated// to it on connect, it uses this to transfer all of its data.//// (c) 2002 peercast.org// ------------------------------------------------// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details.// ------------------------------------------------#include <stdlib.h>#include "servent.h"#include "sys.h"#include "gnutella.h"#include "xml.h"#include "html.h"#include "http.h"#include "stats.h"#include "servmgr.h"#include "peercast.h"// -----------------------------------char *Servent::statusMsgs[]={ "NONE", "CONNECTING", "PROTOCOL", "HANDSHAKE", "CONNECTED", "CLOSING", "LISTENING", "TIMEOUT", "REFUSED", "VERIFIED", "ERROR", "WAIT"};// -----------------------------------char *Servent::typeMsgs[]={ "NONE", "ALLOCATED", "OUT", "IN", "SERVER", "CHECK", "STREAM", "LOOKUP"};// -----------------------------------bool Servent::isPrivate() { Host h = getHost(); return servMgr->isFiltered(ServFilter::F_PRIVATE,h);}// -----------------------------------bool Servent::isAllowed(int a) { Host h = getHost(); if (!h.isValid()) return false; if (servMgr->isFiltered(ServFilter::F_BAN,h)) return false; if (!servMgr->isFiltered(ServFilter::F_ALLOW,h)) return false; return (allow&a)!=0;}// -----------------------------------void Servent::init(){ outPacketsPri.init(MAX_OUTPACKETS); outPacketsNorm.init(MAX_OUTPACKETS); seenIDs.init(MAX_HASH); permAlloc = false; sock = NULL; reset();}// -----------------------------------void Servent::reset(bool doClose){ if (doClose) close(); agent.clear(); sock = NULL; outputBitrate = 0; chanID.clear(); chanIndex = -1; allow = ALLOW_ALL; currPos = 0; addMetadata = false; pack.func = 255; lastConnect = lastPing = lastPacket = 0; loginPassword[0] = 0; loginMount[0] = 0; bytesPerSecond = 0; outPacketsNorm.reset(); outPacketsPri.reset(); seenIDs.clearAll(); status = S_NONE; if (!permAlloc) type = T_NONE;}// -----------------------------------Host Servent::getHost(){ Host h(0,0); if (sock) h = sock->host; return h;}// -----------------------------------bool Servent::outputPacket(GnuPacket &p, bool pri){ if (pri) return outPacketsPri.write(p); else return outPacketsNorm.write(p);}// -----------------------------------bool Servent::initServer(Host &h){ try { checkFree(); status = S_WAIT; createSocket(); sock->bind(h); thread.data = this; thread.func = serverProc; type = T_SERVER; if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { LOG_ERROR("Bad server: %s",e.msg); reset(); return false; } return true;}// -----------------------------------void Servent::checkFree(){ if (sock) throw StreamException("Socket already set"); if (thread.active) throw StreamException("Thread already active");}// -----------------------------------void Servent::initIncoming(ClientSocket *s, unsigned int a){ try{ checkFree(); type = T_INCOMING; sock = s; allow = a; thread.data = this; thread.func = incomingProc; setStatus(S_PROTOCOL); if (!sys->startThread(&thread)) { reset(); LOG_ERROR("Unable to start incoming thread"); } }catch(StreamException &e) { LOG_ERROR("Incoming error: %s",e.msg); reset(); }}// -----------------------------------void Servent::initOutgoing(char *hostName, int port, TYPE ty){ try { checkFree(); createSocket(); type = ty; sock->timeout = 10000; // 10 seconds to connect sock->open(hostName,port); if (!isAllowed(ALLOW_SERVENT)) throw StreamException("Servent not allowed"); thread.data = this; thread.func = outgoingProc; if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { LOG_ERROR("Unable to open connection to %s:%d - %s",hostName,port,e.msg); reset(); }}// -----------------------------------void Servent::initGiv(Host &h, int idx, GnuID &id){ char hostName[64]; h.IPtoStr(hostName); try { checkFree(); Channel *ch = chanMgr->findChannelByIndex(idx); if (!ch) throw StreamException("No channel"); chanID = ch->info.id; chanIndex = idx; pushID = id; createSocket(); sock->open(hostName,h.port); if (!isAllowed(ALLOW_DATA)) throw StreamException("Servent not allowed"); sock->connect(); thread.data = this; thread.func = givProc; type = T_STREAM; if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { LOG_ERROR("Giv error: 0x%x to %s: %s",idx,hostName,e.msg); reset(); }}// -----------------------------------void Servent::createSocket(){ if (sock) LOG_ERROR("Servent::createSocket attempt made while active"); sock = sys->createSocket();}// -----------------------------------void Servent::endThread(){ reset(true); thread.unlock();}// -----------------------------------void Servent::abort(){ thread.active = false; if (sock) sock->close();}// -----------------------------------void Servent::close(){ thread.active = false; setStatus(S_CLOSING); if (sock) { sock->close(); delete sock; sock = NULL; }}// -----------------------------------void Servent::setStatus(STATUS s){ if (s != status) { status = s; if ((s == S_CONNECTED) || (s == S_LISTENING)) lastConnect = sys->getTime(); }}// -----------------------------------void Servent::handshakeOut(){ sock->timeout = 10000; if (servMgr->allowGnutella) sock->writeLine(GNU_CONNECT); else sock->writeLine(GNU_PEERCONN); sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT); sock->writeLine("%s %s",PCX_HS_SUBNET,servMgr->network.cstr()); char str[64]; servMgr->sessionID.toStr(str); sock->writeLine("%s %s",PCX_HS_ID,str); sock->writeLine("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS()); sock->writeLine(""); HTTP http(*sock); http.checkResponse(200); bool subnetValid = false; bool versionValid = false; while (http.nextHeader()) { char *arg = http.getArgStr(); if (!arg) continue; if (http.isHeader(PCX_HS_SUBNET)) subnetValid = stricmp(arg,servMgr->network.cstr())==0; else if (http.isHeader(HTTP_HS_AGENT)) { agent.set(arg); if (strnicmp(arg,"PeerCast/",9)==0) versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0) && (stricmp(arg+9,MAX_CONNECTVER)<=0); } if (type == T_LOOKUP) { if (http.isHeader("remote-ip")) { if (!servMgr->forceIP[0]) { Host nh; nh.fromStr(arg); if (nh.ip != servMgr->serverHost.ip) { LOG_DEBUG("Got new IP: %s",arg); servMgr->serverHost.ip = nh.ip; } } }else if (http.isHeader(PCX_HS_MSG)) { if (strcmp(servMgr->rootMsg.cstr(),arg)) { servMgr->rootMsg.set(arg); peercastApp->notifyMessage(ServMgr::NT_PEERCAST,arg); } }else if (http.isHeader(PCX_HS_DL)) { if (servMgr->downloadURL[0]==0) { strcpy(servMgr->downloadURL,arg); peercastApp->notifyMessage(ServMgr::NT_UPGRADE,"There is a new version available, please click here to upgrade your client."); } }else if (http.isHeader(PCX_HS_FULLHIT)) chanMgr->fullHitFrequency = atoi(arg); else if (http.isHeader(PCX_HS_BCTTL)) chanMgr->broadcastTTL = atoi(arg); } } if ((stricmp(servMgr->network.cstr(),"peercast")!=0) && (!subnetValid)) throw HTTPException(HTTP_SC_NOTFOUND,404); if (!versionValid) throw HTTPException(HTTP_SC_UNAUTHORIZED,401); sock->writeLine(GNU_OK); sock->writeLine("");}// -----------------------------------void Servent::handshakeIn(){ sock->timeout = 10000; int osType=0; HTTP http(*sock); bool subnetValid = false; bool versionValid = false; bool diffRootVer = false; while (http.nextHeader()) { char *arg = http.getArgStr(); if (!arg) continue; if (http.isHeader(PCX_HS_SUBNET)) subnetValid = stricmp(arg,servMgr->network)==0; else if (http.isHeader(HTTP_HS_AGENT)) { agent.set(arg); if (strnicmp(arg,"PeerCast/",9)==0) { versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0) && (stricmp(arg+9,MAX_CONNECTVER)<=0); diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0; } }else if (http.isHeader(PCX_HS_ID)) { GnuID id; id.fromStr(arg); if (id.isSame(servMgr->sessionID)) throw StreamException("Servent loopback"); }else if (http.isHeader(PCX_HS_OS)) { if (stricmp(arg,PCX_OS_LINUXDYN)==0) osType = 1; else if (stricmp(arg,PCX_OS_LINUXSTA)==0) osType = 2; else if (stricmp(arg,PCX_OS_WIN32)==0) osType = 3; else if (stricmp(arg,PCX_OS_MACOSX)==0) osType = 4; else if (stricmp(arg,PCX_OS_WINAMP2)==0) osType = 5; } } if ((stricmp(servMgr->network.cstr(),"peercast")!=0) && (!subnetValid)) throw HTTPException(HTTP_SC_NOTFOUND,404); if (!versionValid) { throw HTTPException(HTTP_SC_UNAUTHORIZED,401); } sock->writeLine(GNU_OK); sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT); sock->writeLine("%s %s",PCX_HS_SUBNET,servMgr->network.cstr()); if (servMgr->isRoot) { sock->writeLine("%s %d",PCX_HS_BCTTL,9); if (diffRootVer) { sock->writeString(PCX_HS_DL); switch(osType) { case 1: sock->writeLine(PCX_DL_LINUXDYN); break; case 2: sock->writeLine(PCX_DL_LINUXSTA); break; case 3: sock->writeLine(PCX_DL_WIN32); break; case 4: sock->writeLine(PCX_DL_MACOSX); break; case 5: sock->writeLine(PCX_DL_WINAMP2); break; default: sock->writeLine(PCX_DL_URL); break; } } sock->writeLine("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr()); } char hostIP[64]; Host h = sock->host; h.IPtoStr(hostIP); sock->writeLine("Remote-IP: %s",hostIP); sock->writeLine(""); while (http.nextHeader());}// -----------------------------------void Servent::handshakeStream(Channel *ch, bool isRaw){ sock->timeout = 10000; HTTP http(*sock); char idStr[64]; ch->info.id.toStr(idStr); while (http.nextHeader()) { char *arg = http.getArgStr(); if (!arg) continue; if (http.isHeader(HTTP_HS_AGENT)) agent.set(arg); if (ch->info.contentType == ChanInfo::T_MP3) if (http.isHeader("icy-metadata")) addMetadata = atoi(arg) > 0; LOG_DEBUG("Stream: %s",http.cmdLine); } if (addMetadata && isRaw) // winamp mp3 metadata check { sock->writeLine(ICY_OK); sock->writeLine("icy-name:%s",ch->getName()); sock->writeLine("icy-br:%d",ch->getBitrate()); sock->writeLine("icy-genre:%s",ch->info.genre.cstr()); sock->writeLine("icy-url:%s",ch->info.url.cstr()); sock->writeLine("%s %s",PCX_HS_CHANNELID,idStr); sock->writeLine("icy-metaint:%d",chanMgr->icyMetaInterval); sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MP3); }else { sock->writeLine(HTTP_SC_OK); sock->writeLine("%s %s",HTTP_HS_SERVER,PCX_AGENT); sock->writeLine("x-audiocast-name: %s",ch->getName()); sock->writeLine("x-audiocast-bitrate: %d",ch->getBitrate()); sock->writeLine("x-audiocast-genre: %s",ch->info.genre.cstr()); sock->writeLine("x-audiocast-description: %s",ch->info.desc.cstr()); sock->writeLine("x-audiocast-url: %s",ch->info.url.cstr()); sock->writeLine("%s %s",PCX_HS_CHANNELID,idStr); if (isRaw) { switch (ch->info.contentType) { case ChanInfo::T_OGG: sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XOGG); break; case ChanInfo::T_MP3: sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MP3); break; case ChanInfo::T_MOV: sock->writeLine("Accept-Ranges: bytes"); sock->writeLine("Connection: close"); sock->writeLine("Content-Length: 10000000"); sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MOV); break; case ChanInfo::T_MPG: sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MPG); break; } }else { sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST); } } sock->writeLine("");}// -----------------------------------void Servent::handshakeGiv(Channel *ch){ sock->timeout = 10000; char idstr[64]; pushID.toStr(idstr); sock->writeLine("GIV %d:%s/%s",chanIndex,idstr,ch->info.name.cstr()); handshakeStream(ch,false);}// -----------------------------------void Servent::process(){ try { gnuStream.init(sock); setStatus(S_CONNECTED); sock->timeout = 10000; // 10 seconds gnuStream.ping(2); if (type != T_LOOKUP) chanMgr->broadcastRelays(2,this); lastPacket = lastPing = sys->getTime(); bool doneBigPing=false; const unsigned int abortTimeoutSecs = 60; // abort connection after 60 secs of no activitiy const unsigned int packetTimeoutSecs = 30; // ping connection after 30 secs of no activity unsigned int currBytes=0,lastTotal=0; unsigned int lastWait=0; while (thread.active && sock->active()) { if (sock->readPending()) { lastPacket = sys->getTime(); if (gnuStream.readPacket(pack)) { unsigned int ver = pack.id.getVersion(); char ipstr[64]; sock->host.toStr(ipstr); GnuID routeID; GnuStream::R_TYPE ret = GnuStream::R_PROCESS; if ((ver < MIN_PACKETVER) || (ver >= MAX_PACKETVER)) ret = GnuStream::R_BADVERSION; if (pack.func != GNU_FUNC_PONG) if (servMgr->seenPacket(pack)) ret = GnuStream::R_DUPLICATE; seenIDs.addGnuID(pack.id); servMgr->addVersion(ver);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -