📄 servent.cpp
字号:
// ------------------------------------------------// File : servent.cpp// Date: 4-apr-2002// Author: giles// Desc: // Servents are the actual connections between clients. They do the handshaking,// transfering of data and processing of GnuPackets. Each servent has one socket allocated// to it on connect, it uses this to transfer all of its data.//// (c) 2002 peercast.org// ------------------------------------------------// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details.// ------------------------------------------------// todo: make lan->yp not check firewall#include <stdlib.h>#include "servent.h"#include "sys.h"#include "gnutella.h"#include "xml.h"#include "html.h"#include "http.h"#include "stats.h"#include "servmgr.h"#include "peercast.h"#include "atom.h"
#include "pcp.h"
#include "version2.h"
const int DIRECT_WRITE_TIMEOUT = 60;// -----------------------------------char *Servent::statusMsgs[]={ "NONE", "CONNECTING", "PROTOCOL", "HANDSHAKE", "CONNECTED", "CLOSING", "LISTENING", "TIMEOUT", "REFUSED", "VERIFIED", "ERROR", "WAIT",
"FREE"};// -----------------------------------char *Servent::typeMsgs[]={
"NONE", "INCOMING",
"SERVER", "RELAY", "DIRECT",
"COUT",
"CIN",
"PGNU"
};// -----------------------------------bool Servent::isPrivate() { Host h = getHost(); return servMgr->isFiltered(ServFilter::F_PRIVATE,h) || h.isLocalhost();}// -----------------------------------bool Servent::isAllowed(int a) { Host h = getHost(); if (servMgr->isFiltered(ServFilter::F_BAN,h)) return false; return (allow&a)!=0;}
// -----------------------------------
bool Servent::isFiltered(int f)
{
Host h = getHost();
return servMgr->isFiltered(f,h);
}
// -----------------------------------Servent::Servent(int index):outPacketsPri(MAX_OUTPACKETS),outPacketsNorm(MAX_OUTPACKETS),seenIDs(MAX_HASH),serventIndex(index),sock(NULL),next(NULL){ reset();}// -----------------------------------Servent::~Servent(){ }// -----------------------------------void Servent::kill() {
thread.active = false;
setStatus(S_CLOSING);
if (pcpStream)
{
PCPStream *pcp = pcpStream;
pcpStream = NULL;
pcp->kill();
delete pcp;
}
if (sock)
{
sock->close();
delete sock;
sock = NULL;
}
if (pushSock)
{
pushSock->close();
delete pushSock;
pushSock = NULL;
}
// thread.unlock();
if (type != T_SERVER) { reset(); setStatus(S_FREE); }
}// -----------------------------------void Servent::abort() { thread.active = false; if (sock)
{ sock->close();
}}// -----------------------------------void Servent::reset(){
remoteID.clear();
servPort = 0;
pcpStream = NULL;
flowControl = false; networkID.clear();
chanID.clear();
outputProtocol = ChanInfo::SP_UNKNOWN;
agent.clear(); sock = NULL; allow = ALLOW_ALL; syncPos = 0; addMetadata = false;
nsSwitchNum = 0;
pack.func = 255; lastConnect = lastPing = lastPacket = 0; loginPassword[0] = 0; loginMount[0] = 0; bytesPerSecond = 0; priorityConnect = false;
pushSock = NULL;
sendHeader = true; outPacketsNorm.reset(); outPacketsPri.reset(); seenIDs.clear(); status = S_NONE; type = T_NONE;}
// -----------------------------------
bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
{
if ( (type == t)
&& (isConnected())
&& (!cid.isSet() || chanID.isSame(cid))
&& (!sid.isSet() || !sid.isSame(remoteID))
&& (pcpStream != NULL)
)
{
return pcpStream->sendPacket(pack,did);
}
return false;
}
// -----------------------------------
bool Servent::acceptGIV(ClientSocket *givSock)
{
if (!pushSock)
{
pushSock = givSock;
return true;
}else
return false;
}
// -----------------------------------Host Servent::getHost(){ Host h(0,0); if (sock) h = sock->host; return h;}// -----------------------------------bool Servent::outputPacket(GnuPacket &p, bool pri){ lock.on(); bool r=false; if (pri) r = outPacketsPri.write(p); else { if (servMgr->useFlowControl) { int per = outPacketsNorm.percentFull(); if (per > 50) flowControl = true; else if (per < 25) flowControl = false; } bool send=true; if (flowControl) { // if in flowcontrol, only allow packets with less of a hop count than already in queue if (p.hops >= outPacketsNorm.findMinHop()) send = false; } if (send) r = outPacketsNorm.write(p); } lock.off(); return r;}// -----------------------------------bool Servent::initServer(Host &h){ try { checkFree(); status = S_WAIT; createSocket(); sock->bind(h); thread.data = this; thread.func = serverProc;
type = T_SERVER; if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { LOG_ERROR("Bad server: %s",e.msg); kill(); return false; } return true;}// -----------------------------------void Servent::checkFree(){ if (sock) throw StreamException("Socket already set"); if (thread.active) throw StreamException("Thread already active");}// -----------------------------------void Servent::initIncoming(ClientSocket *s, unsigned int a){ try{ checkFree(); type = T_INCOMING; sock = s; allow = a; thread.data = this; thread.func = incomingProc; setStatus(S_PROTOCOL);
char ipStr[64];
sock->host.toStr(ipStr);
LOG_DEBUG("Incoming from %s",ipStr);
if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { //LOG_ERROR("!!FATAL!! Incoming error: %s",e.msg); //servMgr->shutdownTimer = 1;
kill();
LOG_ERROR("INCOMING FAILED: %s",e.msg);
}}// -----------------------------------void Servent::initOutgoing(TYPE ty){ try { checkFree(); type = ty; thread.data = this; thread.func = outgoingProc; if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { LOG_ERROR("Unable to start outgoing: %s",e.msg); kill(); }}
// -----------------------------------
void Servent::initPCP(Host &rh)
{
char ipStr[64];
rh.toStr(ipStr);
try
{
checkFree();
createSocket();
type = T_COUT;
sock->open(rh);
if (!isAllowed(ALLOW_NETWORK))
throw StreamException("Servent not allowed");
thread.data = this;
thread.func = outgoingProc;
LOG_DEBUG("Outgoing to %s",ipStr);
if (!sys->startThread(&thread))
throw StreamException("Can`t start thread");
}catch(StreamException &e)
{
LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
kill();
}
}
#if 0
// -----------------------------------
void Servent::initChannelFetch(Host &host)
{
type = T_STREAM;
char ipStr[64];
host.toStr(ipStr);
checkFree();
createSocket();
sock->open(host);
if (!isAllowed(ALLOW_DATA))
throw StreamException("Servent not allowed");
sock->connect();
}
#endif
// -----------------------------------void Servent::initGIV(Host &h, GnuID &id){ char ipStr[64]; h.toStr(ipStr); try { checkFree(); givID = id; createSocket(); sock->open(h); if (!isAllowed(ALLOW_NETWORK)) throw StreamException("Servent not allowed"); sock->connect(); thread.data = this; thread.func = givProc; type = T_RELAY; if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { LOG_ERROR("GIV error to %s: %s",ipStr,e.msg); kill(); }}// -----------------------------------void Servent::createSocket(){ if (sock) LOG_ERROR("Servent::createSocket attempt made while active"); sock = sys->createSocket();}// -----------------------------------void Servent::setStatus(STATUS s){ if (s != status) { status = s; if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING)) lastConnect = sys->getTime(); }}
// -----------------------------------void Servent::handshakeOut(){ sock->writeLine(GNU_PEERCONN); char str[64]; sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_AGENT); sock->writeLineF("%s %d",PCX_HS_PCP,1);
if (priorityConnect) sock->writeLineF("%s %d",PCX_HS_PRIORITY,1); if (networkID.isSet()) { networkID.toStr(str); sock->writeLineF("%s %s",PCX_HS_NETWORKID,str); } servMgr->sessionID.toStr(str); sock->writeLineF("%s %s",PCX_HS_ID,str); sock->writeLineF("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS()); sock->writeLine(""); HTTP http(*sock);
int r = http.readResponse();
if (r != 200)
{
LOG_ERROR("Expected 200, got %d",r);
throw StreamException("Unexpected HTTP response");
}
bool versionValid = false; GnuID clientID; clientID.clear(); while (http.nextHeader()) { LOG_DEBUG(http.cmdLine); char *arg = http.getArgStr(); if (!arg) continue; if (http.isHeader(HTTP_HS_AGENT)) { agent.set(arg); if (strnicmp(arg,"PeerCast/",9)==0) versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0); }else if (http.isHeader(PCX_HS_NETWORKID)) clientID.fromStr(arg); }
if (!clientID.isSame(networkID)) throw HTTPException(HTTP_SC_UNAVAILABLE,503); if (!versionValid) throw HTTPException(HTTP_SC_UNAUTHORIZED,401); sock->writeLine(GNU_OK); sock->writeLine("");
}
// -----------------------------------
void Servent::processOutChannel()
{
}
// -----------------------------------void Servent::handshakeIn(){ int osType=0; HTTP http(*sock); bool versionValid = false; bool diffRootVer = false;
GnuID clientID; clientID.clear(); while (http.nextHeader()) { LOG_DEBUG("%s",http.cmdLine); char *arg = http.getArgStr(); if (!arg) continue; if (http.isHeader(HTTP_HS_AGENT)) { agent.set(arg); if (strnicmp(arg,"PeerCast/",9)==0) { versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0); diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0; } }else if (http.isHeader(PCX_HS_NETWORKID)) { clientID.fromStr(arg); }else if (http.isHeader(PCX_HS_PRIORITY)) { priorityConnect = atoi(arg)!=0;
}else if (http.isHeader(PCX_HS_ID)) { GnuID id; id.fromStr(arg); if (id.isSame(servMgr->sessionID)) throw StreamException("Servent loopback"); }else if (http.isHeader(PCX_HS_OS)) { if (stricmp(arg,PCX_OS_LINUX)==0) osType = 1; else if (stricmp(arg,PCX_OS_WIN32)==0) osType = 2; else if (stricmp(arg,PCX_OS_MACOSX)==0) osType = 3; else if (stricmp(arg,PCX_OS_WINAMP2)==0) osType = 4; } }
if (!clientID.isSame(networkID)) throw HTTPException(HTTP_SC_UNAVAILABLE,503); // if this is a priority connection and all incoming connections // are full then kill an old connection to make room. Otherwise reject connection. //if (!priorityConnect) { if (!isPrivate()) if (servMgr->pubInFull()) throw HTTPException(HTTP_SC_UNAVAILABLE,503); } if (!versionValid) throw HTTPException(HTTP_SC_FORBIDDEN,403); sock->writeLine(GNU_OK); sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT); if (networkID.isSet()) { char idStr[64]; networkID.toStr(idStr); sock->writeLineF("%s %s",PCX_HS_NETWORKID,idStr); } if (servMgr->isRoot) { sock->writeLineF("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0); sock->writeLineF("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL); sock->writeLineF("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL); sock->writeLineF("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast); //sock->writeLine("%s %d",PCX_HS_FULLHIT,2); if (diffRootVer) { sock->writeString(PCX_HS_DL); sock->writeLine(PCX_DL_URL); } sock->writeLineF("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr()); } char hostIP[64]; Host h = sock->host; h.IPtoStr(hostIP); sock->writeLineF("%s %s",PCX_HS_REMOTEIP,hostIP); sock->writeLine(""); while (http.nextHeader());
}
// -----------------------------------
bool Servent::pingHost(Host &rhost,GnuID &rsid)
{
char ipstr[64];
rhost.toStr(ipstr);
LOG_DEBUG("Ping host %s: trying..",ipstr);
ClientSocket *s=NULL;
bool hostOK=false;
try
{
s = sys->createSocket();
if (!s)
return false;
else
{
s->setReadTimeout(15000);
s->setWriteTimeout(15000);
s->open(rhost);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -