📄 servent.cpp.svn-base
字号:
// ------------------------------------------------// File : servent.cpp// Date: 4-apr-2002// Author: giles// Desc: // Servents are the actual connections between clients. They do the handshaking,// transfering of data and processing of GnuPackets. Each servent has one socket allocated// to it on connect, it uses this to transfer all of its data.//// (c) 2002 peercast.org// ------------------------------------------------// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details.// ------------------------------------------------#include <stdlib.h>#include "servent.h"#include "sys.h"#include "gnutella.h"#include "xml.h"#include "html.h"#include "http.h"#include "stats.h"#include "servmgr.h"#include "peercast.h"#include "atom.h"
#include "pcp.h"
// -----------------------------------char *Servent::statusMsgs[]={ "NONE", "CONNECTING", "PROTOCOL", "HANDSHAKE", "CONNECTED", "CLOSING", "LISTENING", "TIMEOUT", "REFUSED", "VERIFIED", "ERROR", "WAIT"};// -----------------------------------char *Servent::typeMsgs[]={
"NONE", "INCOMING",
"SERVER", "STREAM", "COUT",
"CIN",
"PGNU"
};// -----------------------------------bool Servent::isPrivate() { Host h = getHost(); return servMgr->isFiltered(ServFilter::F_PRIVATE,h);}// -----------------------------------bool Servent::isAllowed(int a) { Host h = getHost(); if (!h.isValid()) return false; if (servMgr->isFiltered(ServFilter::F_BAN,h)) return false; if (!servMgr->isFiltered(ServFilter::F_NETWORK,h)) return false; return (allow&a)!=0;}// -----------------------------------Servent::Servent(int id):outPacketsPri(MAX_OUTPACKETS),outPacketsNorm(MAX_OUTPACKETS),seenIDs(MAX_HASH),serventID(id),sock(NULL),next(NULL){ reset();}// -----------------------------------Servent::~Servent(){ }// -----------------------------------void Servent::kill() {
thread.unlock(); close();
if (type != T_SERVER) { type = T_NONE; setStatus(S_DEAD); }
}// -----------------------------------void Servent::abort() { thread.active = false; if (sock) sock->close();}// -----------------------------------void Servent::reset(){
remoteID.clear();
pcpStream.init();
flowControl = false; networkID.clear();
chanID.clear();
outputProtocol = ChanInfo::SP_UNKNOWN;
agent.clear(); sock = NULL; allow = ALLOW_ALL; syncPos = 0; addMetadata = false;
nsSwitchNum = 0;
pack.func = 255; lastConnect = lastPing = lastPacket = 0; loginPassword[0] = 0; loginMount[0] = 0; bytesPerSecond = 0; priorityConnect = false;
pushSock = NULL; outPacketsNorm.reset(); outPacketsPri.reset(); seenIDs.clearAll(); status = S_NONE; type = T_NONE;}
// -----------------------------------
bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
{
if ( (type == t)
&& (isConnected())
&& (!cid.isSet() || chanID.isSame(cid))
&& (!did.isSet() || remoteID.isSame(did))
&& (!sid.isSet() || !sid.isSame(remoteID))
)
return pcpStream.outData.writePacket(pack);
return false;
}
// -----------------------------------
bool Servent::acceptGIV(ClientSocket *givSock)
{
if (!pushSock)
{
pushSock = givSock;
return true;
}else
return false;
}
// -----------------------------------Host Servent::getHost(){ Host h(0,0); if (sock) h = sock->host; return h;}// -----------------------------------bool Servent::outputPacket(GnuPacket &p, bool pri){ lock.on(); bool r=false; if (pri) r = outPacketsPri.write(p); else { if (servMgr->useFlowControl) { int per = outPacketsNorm.percentFull(); if (per > 50) flowControl = true; else if (per < 25) flowControl = false; } bool send=true; if (flowControl) { // if in flowcontrol, only allow packets with less of a hop count than already in queue if (p.hops >= outPacketsNorm.findMinHop()) send = false; } if (send) r = outPacketsNorm.write(p); } lock.off(); return r;}// -----------------------------------bool Servent::initServer(Host &h){ try { checkFree(); status = S_WAIT; createSocket(); sock->bind(h); thread.data = this; thread.func = serverProc; type = T_SERVER; if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { LOG_ERROR("Bad server: %s",e.msg); kill(); return false; } return true;}// -----------------------------------void Servent::checkFree(){ if (sock) throw StreamException("Socket already set"); if (thread.active) throw StreamException("Thread already active");}// -----------------------------------void Servent::initIncoming(ClientSocket *s, unsigned int a){ try{ checkFree(); type = T_INCOMING; sock = s; allow = a; thread.data = this; thread.func = incomingProc; setStatus(S_PROTOCOL);
char ipStr[64];
sock->host.toStr(ipStr);
LOG_DEBUG("Incoming from %s",ipStr);
if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { LOG_ERROR("Incoming error: %s",e.msg); kill(); }}// -----------------------------------void Servent::initOutgoing(TYPE ty){ try { checkFree(); type = ty; thread.data = this; thread.func = outgoingProc; if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { LOG_ERROR("Unable to start outgoing: %s",e.msg); kill(); }}
// -----------------------------------
void Servent::initPCP(Host &rh)
{
char ipStr[64];
rh.toStr(ipStr);
try
{
checkFree();
createSocket();
type = T_COUT;
sock->open(rh);
if (!isAllowed(ALLOW_SERVENT))
throw StreamException("Servent not allowed");
thread.data = this;
thread.func = outgoingProc;
LOG_DEBUG("Outgoing to %s",ipStr);
if (!sys->startThread(&thread))
throw StreamException("Can`t start thread");
}catch(StreamException &e)
{
LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
kill();
}
}
#if 0
// -----------------------------------
void Servent::initChannelFetch(Host &host)
{
type = T_STREAM;
char ipStr[64];
host.toStr(ipStr);
checkFree();
createSocket();
sock->open(host);
if (!isAllowed(ALLOW_DATA))
throw StreamException("Servent not allowed");
sock->connect();
}
#endif
// -----------------------------------void Servent::initGIV(Host &h, GnuID &id){ char ipStr[64]; h.toStr(ipStr); try { checkFree(); givID = id; createSocket(); sock->open(h); if (!isAllowed(ALLOW_DATA)) throw StreamException("Servent not allowed"); sock->connect(); thread.data = this; thread.func = givProc; type = T_STREAM; if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { LOG_ERROR("GIV error to %s: %s",ipStr,e.msg); kill(); }}// -----------------------------------void Servent::createSocket(){ if (sock) LOG_ERROR("Servent::createSocket attempt made while active"); sock = sys->createSocket();}// -----------------------------------void Servent::close(){ thread.active = false;
setStatus(S_CLOSING); if (sock) {
sock->close(); delete sock; sock = NULL; }
if (pushSock)
{
pushSock->close();
delete pushSock;
pushSock = NULL;
}
}// -----------------------------------void Servent::setStatus(STATUS s){ if (s != status) { status = s; if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING)) lastConnect = sys->getTime(); }}
#if 0
// -----------------------------------
ChannelStream *Servent::handshakeFetch(ChanHit &chanHit, unsigned spos, GnuID &cid)
{
chanID = cid;
streamPos = spos;
setStatus(S_HANDSHAKE);
char idStr[64];
cid.toStr(idStr);
char sidStr[64];
servMgr->sessionID.toStr(sidStr);
sock->writeLine("GET /channel/%s HTTP/1.0",idStr);
sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT);
sock->writeLine("%s %d",PCX_HS_PCP,1);
sock->writeLine("%s %s",PCX_HS_SESSIONID,sidStr);
if (servMgr->canConnectToMe(chanHit.host))
sock->writeLine("%s %d",PCX_HS_PORT,servMgr->serverHost.port);
// ask to check me for incoming connections
if ((chanHit.tracker) && (servMgr->getFirewall() == ServMgr::FW_UNKNOWN))
sock->writeLine("%s %d",PCX_HS_PINGME,servMgr->serverHost.port);
// ask for last known packet pos
sock->writeLine("%s %d",PCX_HS_POS,streamPos);
sock->writeLine("");
HTTP http(*sock);
int r = http.readResponse();
ChanInfo info;
while (http.nextHeader())
{
Servent::readICYHeader(http, info, NULL,chanHit.tracker);
LOG_CHANNEL("Channel fetch: %s",http.cmdLine);
}
streamPos = info.streamPos;
setStatus(S_CONNECTED);
pcpStream = new PCPStream(PCP_BCST_UP,PCP_BCST_DOWN);
return pcpStream;
}
#endif
// -----------------------------------void Servent::handshakeOut(){ sock->writeLine(GNU_PEERCONN); char str[64]; sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT); sock->writeLine("%s %d",PCX_HS_PCP,1);
if (priorityConnect) sock->writeLine("%s %d",PCX_HS_PRIORITY,1); if (networkID.isSet()) { networkID.toStr(str); sock->writeLine("%s %s",PCX_HS_NETWORKID,str); } servMgr->sessionID.toStr(str); sock->writeLine("%s %s",PCX_HS_ID,str); sock->writeLine("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS()); sock->writeLine(""); HTTP http(*sock);
int r = http.readResponse();
if (r != 200)
{
LOG_ERROR("Expected 200, got %d",r);
throw StreamException("Unexpected HTTP response");
}
bool versionValid = false; GnuID clientID; clientID.clear(); while (http.nextHeader()) { LOG_DEBUG(http.cmdLine); char *arg = http.getArgStr(); if (!arg) continue; if (http.isHeader(HTTP_HS_AGENT)) { agent.set(arg); if (strnicmp(arg,"PeerCast/",9)==0) versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0); }else if (http.isHeader(PCX_HS_NETWORKID)) clientID.fromStr(arg); }
if (!clientID.isSame(networkID)) throw HTTPException(HTTP_SC_UNAVAILABLE,503); if (!versionValid) throw HTTPException(HTTP_SC_UNAUTHORIZED,401); sock->writeLine(GNU_OK); sock->writeLine("");
}
// -----------------------------------
void Servent::processOutChannel()
{
}
// -----------------------------------void Servent::handshakeIn(){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -