📄 servent.cpp
字号:
s->connect();
AtomStream atom(*s);
atom.writeInt(PCP_CONNECT,1);
atom.writeParent(PCP_HELO,1);
atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
GnuID sid;
sid.clear();
int numc,numd;
ID4 id = atom.read(numc,numd);
if (id == PCP_OLEH)
{
for(int i=0; i<numc; i++)
{
int c,d;
ID4 pid = atom.read(c,d);
if (pid == PCP_SESSIONID)
atom.readBytes(sid.id,16,d);
else
atom.skip(c,d);
}
}else
{
LOG_DEBUG("Ping response: %s",id.getString().str());
throw StreamException("Bad ping response");
}
if (!sid.isSame(rsid))
throw StreamException("SIDs don`t match");
hostOK = true;
LOG_DEBUG("Ping host %s: OK",ipstr);
atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT);
}
}catch(StreamException &e)
{
LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
}
if (s)
{
s->close();
delete s;
}
if (!hostOK)
rhost.port = 0;
return true;
}
// -----------------------------------bool Servent::handshakeStream(ChanInfo &chanInfo){
HTTP http(*sock);
bool gotPCP=false;
unsigned int reqPos=0;
nsSwitchNum=0;
while (http.nextHeader()) { char *arg = http.getArgStr(); if (!arg) continue; if (http.isHeader(PCX_HS_PCP))
gotPCP = atoi(arg)!=0;
else if (http.isHeader(PCX_HS_POS))
reqPos = atoi(arg); else if (http.isHeader("icy-metadata")) addMetadata = atoi(arg) > 0;
else if (http.isHeader(HTTP_HS_AGENT))
agent = arg;
else if (http.isHeader("Pragma"))
{
char *ssc = stristr(arg,"stream-switch-count=");
char *so = stristr(arg,"stream-offset");
if (ssc || so)
{
nsSwitchNum=1;
//nsSwitchNum = atoi(ssc+20);
}
}
LOG_DEBUG("Stream: %s",http.cmdLine); }
if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
outputProtocol = ChanInfo::SP_PEERCAST;
if (outputProtocol == ChanInfo::SP_HTTP)
{
if ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
|| (chanInfo.contentType == ChanInfo::T_WMA)
|| (chanInfo.contentType == ChanInfo::T_WMV)
|| (chanInfo.contentType == ChanInfo::T_ASX)
)
outputProtocol = ChanInfo::SP_MMS;
}
bool chanFound=false;
bool chanReady=false;
Channel *ch = chanMgr->findChannelByID(chanInfo.id);
if (ch)
{
sendHeader = true;
if (reqPos)
{
streamPos = ch->rawData.findOldestPos(reqPos);
}else
{
streamPos = ch->rawData.getLatestPos();
}
chanReady = canStream(ch);
}
ChanHitList *chl = chanMgr->findHitList(chanInfo);
if (chl)
{
chanFound = true;
}
bool result = false;
char idStr[64];
chanInfo.id.toStr(idStr);
char sidStr[64];
servMgr->sessionID.toStr(sidStr);
Host rhost = sock->host;
AtomStream atom(*sock);
if (!chanFound)
{
sock->writeLine(HTTP_SC_NOTFOUND);
sock->writeLine("");
LOG_DEBUG("Sending channel not found");
return false;
}
if (!chanReady)
{
if (outputProtocol == ChanInfo::SP_PCP)
{
sock->writeLine(HTTP_SC_UNAVAILABLE);
sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
sock->writeLine("");
handshakeIncomingPCP(atom,rhost,remoteID,agent);
char ripStr[64];
rhost.toStr(ripStr);
LOG_DEBUG("Sending channel unavailable");
ChanHitSearch chs;
int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
if (chl)
{
ChanHit best;
// search for up to 8 other hits
int cnt=0;
for(int i=0; i<8; i++)
{
best.init();
// find best hit this network if local IP
if (!rhost.globalIP())
{
chs.init();
chs.matchHost = servMgr->serverHost;
chs.waitDelay = 2;
chs.excludeID = remoteID;
if (chl->pickHits(chs))
best = chs.best[0];
}
// find best hit on same network
if (!best.host.ip)
{
chs.init();
chs.matchHost = rhost;
chs.waitDelay = 2;
chs.excludeID = remoteID;
if (chl->pickHits(chs))
best = chs.best[0];
}
// find best hit on other networks
if (!best.host.ip)
{
chs.init();
chs.waitDelay = 2;
chs.excludeID = remoteID;
if (chl->pickHits(chs))
best = chs.best[0];
}
if (!best.host.ip)
break;
best.writeAtoms(atom,chanInfo.id);
cnt++;
}
if (cnt)
{
LOG_DEBUG("Sent %d channel hit(s) to %s",cnt,ripStr);
}
else if (rhost.port)
{
// find firewalled host
chs.init();
chs.waitDelay = 30;
chs.useFirewalled = true;
chs.excludeID = remoteID;
if (chl->pickHits(chs))
{
best = chs.best[0];
int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_RELAY);
LOG_DEBUG("Broadcasted channel push request to %d clients for %s",cnt,ripStr);
}
}
// if all else fails, use tracker
if (!best.host.ip)
{
// find best tracker on this network if local IP
if (!rhost.globalIP())
{
chs.init();
chs.matchHost = servMgr->serverHost;
chs.trackersOnly = true;
chs.excludeID = remoteID;
if (chl->pickHits(chs))
best = chs.best[0];
}
// find local tracker
if (!best.host.ip)
{
chs.init();
chs.matchHost = rhost;
chs.trackersOnly = true;
chs.excludeID = remoteID;
if (chl->pickHits(chs))
best = chs.best[0];
}
// find global tracker
if (!best.host.ip)
{
chs.init();
chs.trackersOnly = true;
chs.excludeID = remoteID;
if (chl->pickHits(chs))
best = chs.best[0];
}
if (best.host.ip)
{
best.writeAtoms(atom,chanInfo.id);
LOG_DEBUG("Sent 1 tracker hit to %s",ripStr);
}else if (rhost.port)
{
// find firewalled tracker
chs.init();
chs.useFirewalled = true;
chs.trackersOnly = true;
chs.excludeID = remoteID;
chs.waitDelay = 30;
if (chl->pickHits(chs))
{
best = chs.best[0];
int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,ripStr);
}
}
}
}
// return not available yet code
atom.writeInt(PCP_QUIT,error);
result = false;
}else
{
LOG_DEBUG("Sending channel unavailable");
sock->writeLine(HTTP_SC_UNAVAILABLE);
sock->writeLine("");
result = false;
}
} else {
if (chanInfo.contentType != ChanInfo::T_MP3)
addMetadata=false;
if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP)) // winamp mp3 metadata check {
sock->writeLine(ICY_OK); sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT); sock->writeLineF("icy-name:%s",chanInfo.name.cstr()); sock->writeLineF("icy-br:%d",chanInfo.bitrate); sock->writeLineF("icy-genre:%s",chanInfo.genre.cstr()); sock->writeLineF("icy-url:%s",chanInfo.url.cstr()); sock->writeLineF("icy-metaint:%d",chanMgr->icyMetaInterval); sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr); sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3); }else { sock->writeLine(HTTP_SC_OK); if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA)) { sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT); sock->writeLine("Accept-Ranges: none"); sock->writeLineF("x-audiocast-name: %s",chanInfo.name.cstr()); sock->writeLineF("x-audiocast-bitrate: %d",chanInfo.bitrate); sock->writeLineF("x-audiocast-genre: %s",chanInfo.genre.cstr()); sock->writeLineF("x-audiocast-description: %s",chanInfo.desc.cstr()); sock->writeLineF("x-audiocast-url: %s",chanInfo.url.cstr()); sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr); }
if (outputProtocol == ChanInfo::SP_HTTP) { switch (chanInfo.contentType) { case ChanInfo::T_OGG: sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XOGG); break; case ChanInfo::T_MP3: sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3); break; case ChanInfo::T_MOV: sock->writeLine("Connection: close"); sock->writeLine("Content-Length: 10000000"); sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MOV); break; case ChanInfo::T_MPG: sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MPG); break; case ChanInfo::T_NSV: sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_NSV); break; case ChanInfo::T_ASX: sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_ASX);
break;
case ChanInfo::T_WMA:
sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMA);
break;
case ChanInfo::T_WMV: sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMV); break; } } else if (outputProtocol == ChanInfo::SP_MMS)
{
sock->writeLine("Server: Rex/9.0.0.2980");
sock->writeLine("Cache-Control: no-cache");
sock->writeLine("Pragma: no-cache");
sock->writeLine("Pragma: client-id=3587303426");
sock->writeLine("Pragma: features=\"broadcast,playlist\"");
if (nsSwitchNum)
{
sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MMS);
}else
{
sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
if (ch)
sock->writeLineF("Content-Length: %d",ch->headPack.len);
sock->writeLine("Connection: Keep-Alive");
}
} else if (outputProtocol == ChanInfo::SP_PCP)
{
sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
}else if (outputProtocol == ChanInfo::SP_PEERCAST) { sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST); } }
sock->writeLine("");
result = true;
if (gotPCP)
{
handshakeIncomingPCP(atom,rhost,remoteID,agent);
atom.writeInt(PCP_OK,0);
}
}
return result;}// -----------------------------------void Servent::handshakeGiv(GnuID &id){ if (id.isSet())
{ char idstr[64]; id.toStr(idstr); sock->writeLineF("GIV /%s",idstr);
}else sock->writeLine("GIV");
sock->writeLine("");
}// -----------------------------------void Servent::processGnutella(){
type = T_PGNU;
//if (servMgr->isRoot && !servMgr->needConnections())
if (servMgr->isRoot)
{
processRoot();
return;
}
gnuStream.init(sock); setStatus(S_CONNECTED); if (!servMgr->isRoot)
{
chanMgr->broadcastRelays(this, 1, 1);
GnuPacket *p;
if ((p=outPacketsNorm.curr()))
gnuStream.sendPacket(*p);
return;
} gnuStream.ping(2);// if (type != T_LOOKUP)// chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2); lastPacket = lastPing = sys->getTime(); bool doneBigPing=false; const unsigned int abortTimeoutSecs = 60; // abort connection after 60 secs of no activitiy const unsigned int packetTimeoutSecs = 30; // ping connection after 30 secs of no activity unsigned int currBytes=0; unsigned int lastWait=0; unsigned int lastTotalIn=0,lastTotalOut=0; while (thread.active && sock->active()) { if (sock->readReady()) { lastPacket = sys->getTime(); if (gnuStream.readPacket(pack)) { char ipstr[64]; sock->host.toStr(ipstr); GnuID routeID; GnuStream::R_TYPE ret = GnuStream::R_PROCESS; if (pack.func != GNU_FUNC_PONG) if (servMgr->seenPacket(pack)) ret = GnuStream::R_DUPLICATE; seenIDs.add(pack.id); if (ret == GnuStream::R_PROCESS) { GnuID routeID; ret = gnuStream.processPacket(pack,this,routeID); if (flowControl && (ret == GnuStream::R_BROADCAST)) ret = GnuStream::R_DROP; } switch(ret) { case GnuStream::R_BROADCAST: if (servMgr->broadcast(pack,this)) stats.add(Stats::NUMBROADCASTED); else stats.add(Stats::NUMDROPPED); break; case GnuStream::R_ROUTE: if (servMgr->route(pack,routeID,NULL)) stats.add(Stats::NUMROUTED); else stats.add(Stats::NUMDROPPED); break; case GnuStream::R_ACCEPTED: stats.add(Stats::NUMACCEPTED); break; case GnuStream::R_DUPLICATE: stats.add(Stats::NUMDUP); break; case GnuStream::R_DEAD: stats.add(Stats::NUMDEAD); break; case GnuStream::R_DISCARD: stats.add(Stats::NUMDISCARDED); break; case GnuStream::R_BADVERSION: stats.add(Stats::NUMOLD); break; case GnuStream::R_DROP: stats.add(Stats::NUMDROPPED); break; } LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,ipstr); }else{ LOG_ERROR("Bad packet"); } } GnuPacket *p; if ((p=outPacketsPri.curr())) // priority packet { gnuStream.sendPacket(*p); seenIDs.add(p->id); outPacketsPri.next(); } else if ((p=outPacketsNorm.curr())) // or.. normal packet { gnuStream.sendPacket(*p); seenIDs.add(p->id); outPacketsNorm.next(); } int lpt = sys->getTime()-lastPacket; if (!doneBigPing) { if ((sys->getTime()-lastPing) > 15) { gnuStream.ping(7); lastPing = sys->getTime(); doneBigPing = true; } }else{ if (lpt > packetTimeoutSecs) { if ((sys->getTime()-lastPing) > packetTimeoutSecs) { gnuStream.ping(1); lastPing = sys->getTime(); } } } if (lpt > abortTimeoutSecs) throw TimeoutException(); unsigned int totIn = sock->totalBytesIn-lastTotalIn; unsigned int totOut = sock->totalBytesOut-lastTotalOut; unsigned int bytes = totIn+totOut; lastTotalIn = sock->totalBytesIn; lastTotalOut = sock->totalBytesOut; const int serventBandwidth = 1000;
int delay = sys->idleSleepTime; if ((bytes) && (serventBandwidth >= 8)) delay = (bytes*1000)/(serventBandwidth/8); // set delay relative packetsize if (delay < (int)sys->idleSleepTime) delay = sys->idleSleepTime; //LOG("delay %d, in %d, out %d",delay,totIn,totOut); sys->sleep(delay); }}
// -----------------------------------void Servent::processRoot(){ try { gnuStream.init(sock); setStatus(S_CONNECTED); gnuStream.ping(2);
unsigned int lastConnect = sys->getTime();
while (thread.active && sock->active()) { if (gnuStream.readPacket(pack)) { char ipstr[64]; sock->host.toStr(ipstr); LOG_NETWORK("packet in: %d from %s",pack.func,ipstr); if (pack.func == GNU_FUNC_PING) // if ping then pong back some hosts and close { Host hl[32]; int cnt = servMgr->getNewestServents(hl,32,sock->host); if (cnt) { int start = sys->rnd() % cnt;
int max = cnt>8?8:cnt;
for(int i=0; i<max; i++) { GnuPacket pong; pack.hops = 1; pong.initPong(hl[start],false,pack); gnuStream.sendPacket(pong); char ipstr[64]; hl[start].toStr(ipstr); //LOG_NETWORK("Pong %d: %s",start+1,ipstr); start = (start+1) % cnt; } char str[64]; sock->host.toStr(str); LOG_NETWORK("Sent %d pong(s) to %s",max,str); }else { LOG_NETWORK("No Pongs to send"); //return; } }else if (pack.func == GNU_FUNC_PONG) // pong? { MemoryStream pong(pack.data,pack.len);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -