📄 servent.cpp
字号:
int ip,port; port = pong.readShort(); ip = pong.readLong(); ip = SWAP4(ip); Host h(ip,port);
if ((ip) && (port) && (h.globalIP())) {
LOG_NETWORK("added pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port); servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime()); } //return; } else if (pack.func == GNU_FUNC_HIT)
{
MemoryStream data(pack.data,pack.len);
ChanHit hit;
gnuStream.readHit(data,hit,pack.hops,pack.id);
} //if (gnuStream.packetsIn > 5) // die if we get too many packets // return; }
if((sys->getTime()-lastConnect > 60))
break; } }catch(StreamException &e) { LOG_ERROR("Relay: %s",e.msg); } } // -----------------------------------int Servent::givProc(ThreadInfo *thread){// thread->lock(); Servent *sv = (Servent*)thread->data; try {
sv->handshakeGiv(sv->givID);
sv->handshakeIncoming();
}catch(StreamException &e) { LOG_ERROR("GIV: %s",e.msg); } sv->kill(); sys->endThread(thread);
return 0;}
// -----------------------------------
void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent, bool isTrusted)
{
bool nonFW = (servMgr->getFirewall() != ServMgr::FW_ON);
bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);
bool sendBCID = isTrusted && chanMgr->isBroadcasting();
atom.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0) + (sendBCID?1:0));
atom.writeString(PCP_HELO_AGENT,PCX_AGENT);
atom.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
if (nonFW)
atom.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
if (testFW)
atom.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
if (sendBCID)
atom.writeBytes(PCP_HELO_BCID,chanMgr->broadcastID.id,16);
LOG_DEBUG("PCP outgoing waiting for OLEH..");
int numc,numd;
ID4 id = atom.read(numc,numd);
if (id != PCP_OLEH)
{
LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
throw StreamException("Got unexpected PCP response");
}
char arg[64];
GnuID clientID;
clientID.clear();
rid.clear();
int version=0;
int disable=0;
Host thisHost;
// read OLEH response
for(int i=0; i<numc; i++)
{
int c,dlen;
ID4 id = atom.read(c,dlen);
if (id == PCP_HELO_AGENT)
{
atom.readString(arg,sizeof(arg),dlen);
agent.set(arg);
}else if (id == PCP_HELO_REMOTEIP)
{
thisHost.ip = atom.readInt();
}else if (id == PCP_HELO_PORT)
{
thisHost.port = atom.readShort();
}else if (id == PCP_HELO_VERSION)
{
version = atom.readInt();
}else if (id == PCP_HELO_DISABLE)
{
disable = atom.readInt();
}else if (id == PCP_HELO_SESSIONID)
{
atom.readBytes(rid.id,16);
if (rid.isSame(servMgr->sessionID))
throw StreamException("Servent loopback");
}else
{
LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
atom.skip(c,dlen);
}
}
// update server ip/firewall status
if (isTrusted)
{
if (thisHost.isValid())
{
if ((servMgr->serverHost.ip != thisHost.ip) && (servMgr->forceIP.isEmpty()))
{
char ipstr[64];
thisHost.toStr(ipstr);
LOG_DEBUG("Got new ip: %s",ipstr);
servMgr->serverHost.ip = thisHost.ip;
}
if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
{
if (thisHost.port && thisHost.globalIP())
servMgr->setFirewall(ServMgr::FW_OFF);
else
servMgr->setFirewall(ServMgr::FW_ON);
}
}
if (disable == 1)
{
LOG_ERROR("client disabled: %d",disable);
servMgr->isDisabled = true;
}else
{
servMgr->isDisabled = false;
}
}
if (!rid.isSet())
{
atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
throw StreamException("Remote host not identified");
}
LOG_DEBUG("PCP Outgoing handshake complete.");
}
// -----------------------------------
void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
{
int numc,numd;
ID4 id = atom.read(numc,numd);
if (id != PCP_HELO)
{
LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
throw StreamException("Got unexpected PCP response");
}
char arg[64];
ID4 osType;
int version=0;
int pingPort=0;
GnuID bcID;
GnuID clientID;
bcID.clear();
clientID.clear();
rhost.port = 0;
for(int i=0; i<numc; i++)
{
int c,dlen;
ID4 id = atom.read(c,dlen);
if (id == PCP_HELO_AGENT)
{
atom.readString(arg,sizeof(arg),dlen);
agent.set(arg);
}else if (id == PCP_HELO_VERSION)
{
version = atom.readInt();
}else if (id == PCP_HELO_SESSIONID)
{
atom.readBytes(rid.id,16);
if (rid.isSame(servMgr->sessionID))
throw StreamException("Servent loopback");
}else if (id == PCP_HELO_BCID)
{
atom.readBytes(bcID.id,16);
}else if (id == PCP_HELO_OSTYPE)
{
osType = atom.readInt();
}else if (id == PCP_HELO_PORT)
{
rhost.port = atom.readShort();
}else if (id == PCP_HELO_PING)
{
pingPort = atom.readShort();
}else
{
LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
atom.skip(c,dlen);
}
}
if (version)
LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);
if (!rhost.globalIP() && servMgr->serverHost.globalIP())
rhost.ip = servMgr->serverHost.ip;
if (pingPort)
{
char ripStr[64];
rhost.toStr(ripStr);
LOG_DEBUG("Incoming firewalled test request: %s ", ripStr);
rhost.port = pingPort;
if (!rhost.globalIP() || !pingHost(rhost,rid))
rhost.port = 0;
}
if (servMgr->isRoot)
{
if (bcID.isSet())
{
if (bcID.getFlags() & 1) // private
{
BCID *bcid = servMgr->findValidBCID(bcID);
if (!bcid || (bcid && !bcid->valid))
{
atom.writeParent(PCP_OLEH,1);
atom.writeInt(PCP_HELO_DISABLE,1);
throw StreamException("Client is banned");
}
}
}
}
atom.writeParent(PCP_OLEH,5);
atom.writeString(PCP_HELO_AGENT,PCX_AGENT);
atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
atom.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
atom.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
atom.writeShort(PCP_HELO_PORT,rhost.port);
if (version)
{
if (version < PCP_CLIENT_MINVERSION)
{
atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
throw StreamException("Agent is not valid");
}
}
if (!rid.isSet())
{
atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
throw StreamException("Remote host not identified");
}
if (servMgr->isRoot)
{
servMgr->writeRootAtoms(atom,false);
}
LOG_DEBUG("PCP Incoming handshake complete.");
}
// -----------------------------------
void Servent::processIncomingPCP(bool suggestOthers)
{
PCPStream::readVersion(*sock);
AtomStream atom(*sock);
Host rhost = sock->host;
handshakeIncomingPCP(atom,rhost,remoteID,agent);
bool alreadyConnected = (servMgr->findConnection(Servent::T_COUT,remoteID)!=NULL)
|| (servMgr->findConnection(Servent::T_CIN,remoteID)!=NULL);
bool unavailable = servMgr->controlInFull();
bool offair = !servMgr->isRoot && !chanMgr->isBroadcasting();
char rstr[64];
rhost.toStr(rstr);
if (unavailable || alreadyConnected || offair)
{
int error;
if (alreadyConnected)
error = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
else if (unavailable)
error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
else if (offair)
error = PCP_ERROR_QUIT+PCP_ERROR_OFFAIR;
else
error = PCP_ERROR_QUIT;
if (suggestOthers)
{
ChanHit best;
ChanHitSearch chs;
int cnt=0;
for(int i=0; i<8; i++)
{
best.init();
// find best hit on this network
if (!rhost.globalIP())
{
chs.init();
chs.matchHost = servMgr->serverHost;
chs.waitDelay = 2;
chs.excludeID = remoteID;
chs.trackersOnly = true;
chs.useBusyControls = false;
if (chanMgr->pickHits(chs))
best = chs.best[0];
}
// find best hit on same network
if (!best.host.ip)
{
chs.init();
chs.matchHost = rhost;
chs.waitDelay = 2;
chs.excludeID = remoteID;
chs.trackersOnly = true;
chs.useBusyControls = false;
if (chanMgr->pickHits(chs))
best = chs.best[0];
}
// else find best hit on other networks
if (!best.host.ip)
{
chs.init();
chs.waitDelay = 2;
chs.excludeID = remoteID;
chs.trackersOnly = true;
chs.useBusyControls = false;
if (chanMgr->pickHits(chs))
best = chs.best[0];
}
if (!best.host.ip)
break;
GnuID noID;
noID.clear();
best.writeAtoms(atom,noID);
cnt++;
}
if (cnt)
{
LOG_DEBUG("Sent %d tracker(s) to %s",cnt,rstr);
}
else if (rhost.port)
{
// send push request to best firewalled tracker on other network
chs.init();
chs.waitDelay = 30;
chs.excludeID = remoteID;
chs.trackersOnly = true;
chs.useFirewalled = true;
chs.useBusyControls = false;
if (chanMgr->pickHits(chs))
{
best = chs.best[0];
GnuID noID;
noID.clear();
int cnt = servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,rstr);
}
}else
{
LOG_DEBUG("No available trackers");
}
}
LOG_ERROR("Sending QUIT to incoming: %d",error);
atom.writeInt(PCP_QUIT,error);
return;
}
type = T_CIN;
setStatus(S_CONNECTED);
atom.writeInt(PCP_OK,0);
// ask for update
atom.writeParent(PCP_ROOT,1);
atom.writeParent(PCP_ROOT_UPDATE,0);
pcpStream = new PCPStream(remoteID);
int error = 0;
BroadcastState bcs;
while (!error && thread.active && !sock->eof())
{
error = pcpStream->readPacket(*sock,bcs);
sys->sleepIdle();
if (!servMgr->isRoot && !chanMgr->isBroadcasting())
error = PCP_ERROR_OFFAIR;
if (peercastInst->isQuitting)
error = PCP_ERROR_SHUTDOWN;
}
pcpStream->flush(*sock);
error += PCP_ERROR_QUIT;
atom.writeInt(PCP_QUIT,error);
LOG_DEBUG("PCP Incoming to %s closed: %d",rstr,error);
}
// -----------------------------------int Servent::outgoingProc(ThreadInfo *thread){// thread->lock(); LOG_DEBUG("COUT started");
Servent *sv = (Servent*)thread->data;
GnuID noID;
noID.clear();
sv->pcpStream = new PCPStream(noID);
while (sv->thread.active)
{
sv->setStatus(S_WAIT);
if (chanMgr->isBroadcasting() && servMgr->autoServe)
{
ChanHit bestHit;
ChanHitSearch chs;
char ipStr[64];
do
{
bestHit.init();
if (servMgr->rootHost.isEmpty())
break;
if (sv->pushSock)
{
sv->sock = sv->pushSock;
sv->pushSock = NULL;
bestHit.host = sv->sock->host;
break;
}
GnuID noID;
noID.clear();
ChanHitList *chl = chanMgr->findHitListByID(noID);
if (chl)
{
// find local tracker
chs.init();
chs.matchHost = servMgr->serverHost;
chs.waitDelay = MIN_TRACKER_RETRY;
chs.excludeID = servMgr->sessionID;
chs.trackersOnly = true;
if (!chl->pickHits(chs))
{
// else find global tracker
chs.init();
chs.waitDelay = MIN_TRACKER_RETRY;
chs.excludeID = servMgr->sessionID;
chs.trackersOnly = true;
chl->pickHits(chs);
}
if (chs.numResults)
{
bestHit = chs.best[0];
}
}
unsigned int ctime = sys->getTime();
if ((!bestHit.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
{
bestHit.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
bestHit.yp = true;
chanMgr->lastYPConnect = ctime;
}
sys->sleepIdle();
}while (!bestHit.host.ip && (sv->thread.active));
if (!bestHit.host.ip) // give up
{
LOG_ERROR("COUT giving up");
break;
}
bestHit.host.toStr(ipStr);
int error=0;
try {
LOG_DEBUG("COUT to %s: Connecting..",ipStr);
if (!sv->sock)
{
sv->setStatus(S_CONNECTING);
sv->sock = sys->createSocket();
if (!sv->sock)
throw StreamException("Unable to create socket");
sv->sock->open(bestHit.host);
sv->sock->connect();
}
sv->sock->setReadTimeout(30000);
AtomStream atom(*sv->sock);
sv->setStatus(S_HANDSHAKE);
Host rhost = sv->sock->host;
atom.writeInt(PCP_CONNECT,1);
handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent,bestHit.yp);
sv->setStatus(S_CONNECTED);
LOG_DEBUG("COUT to %s: OK",ipStr);
sv->pcpStream->init(sv->remoteID);
BroadcastState bcs;
error = 0;
while (!error && sv->thread.active && !sv->sock->eof() && servMgr->autoServe)
{
error = sv->pcpStream->readPacket(*sv->sock,bcs);
sys->sleepIdle();
if (!chanMgr->isBroadcasting())
error = PCP_ERROR_OFFAIR;
if (peercastInst->isQuitting)
error = PCP_ERROR_SHUTDOWN;
if (sv->pcpStream->nextRootPacket)
if (sys->getTime() > (sv->pcpStream->nextRootPacket+30))
error = PCP_ERROR_NOROOT;
}
sv->setStatus(S_CLOSING);
sv->pcpStream->flush(*sv->sock);
error += PCP_ERROR_QUIT;
atom.writeInt(PCP_QUIT,error);
LOG_ERROR("COUT to %s closed: %d",ipStr,error);
}catch(TimeoutException &e) { LOG_ERROR("COUT to %s: timeout (%s)",ipStr,e.msg); sv->setStatus(S_TIMEOUT);
}catch(StreamException &e) { LOG_ERROR("COUT to %s: %s",ipStr,e.msg); sv->setStatus(S_ERROR); }
try
{
if (sv->sock)
{
sv->sock->close();
delete sv->sock;
sv->sock = NULL;
}
}catch(StreamException &) {}
// don`t discard this hit if we caused the disconnect (stopped broadcasting)
if (error != (PCP_ERROR_QUIT+PCP_ERROR_OFFAIR))
chanMgr->deadHit(bestHit);
}
sys->sleepIdle();
} sv->kill(); sys->endThread(thread);
LOG_DEBUG("COUT ended");
return 0;}// -----------------------------------int Servent::incomingProc(ThreadInfo *thread){// thread->lock(); Servent *sv = (Servent*)thread->data;
char ipStr[64];
sv->sock->host.toStr(ipStr);
try { sv->handshakeIncoming(); }catch(HTTPException &e) { try { sv->sock->writeLine(e.msg); if (e.code == 401) sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\""); sv->sock->writeLine(""); }catch(StreamException &){} LOG_ERROR("Incoming from %s: %s",ipStr,e.msg); }catch(StreamException &e) { LOG_ERROR("Incoming from %s: %s",ipStr,e.msg); }
sv->kill();
sys->endThread(thread);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -