📄 servent.cpp.svn-base
字号:
int osType=0; HTTP http(*sock); bool versionValid = false; bool diffRootVer = false;
GnuID clientID; clientID.clear(); while (http.nextHeader()) { LOG_DEBUG(http.cmdLine); char *arg = http.getArgStr(); if (!arg) continue; if (http.isHeader(HTTP_HS_AGENT)) { agent.set(arg); if (strnicmp(arg,"PeerCast/",9)==0) { versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0); diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0; } }else if (http.isHeader(PCX_HS_NETWORKID)) { clientID.fromStr(arg); }else if (http.isHeader(PCX_HS_PRIORITY)) { priorityConnect = atoi(arg)!=0;
}else if (http.isHeader(PCX_HS_ID)) { GnuID id; id.fromStr(arg); if (id.isSame(servMgr->sessionID)) throw StreamException("Servent loopback"); }else if (http.isHeader(PCX_HS_OS)) { if (stricmp(arg,PCX_OS_LINUX)==0) osType = 1; else if (stricmp(arg,PCX_OS_WIN32)==0) osType = 2; else if (stricmp(arg,PCX_OS_MACOSX)==0) osType = 3; else if (stricmp(arg,PCX_OS_WINAMP2)==0) osType = 4; } }
if (!clientID.isSame(networkID)) throw HTTPException(HTTP_SC_UNAVAILABLE,503); // if this is a priority connection and all incoming connections // are full then kill an old connection to make room. Otherwise reject connection. if (!priorityConnect) { if (!isPrivate()) if (servMgr->pubInFull()) throw HTTPException(HTTP_SC_UNAVAILABLE,503); } if (!versionValid) throw HTTPException(HTTP_SC_FORBIDDEN,403); sock->writeLine(GNU_OK); sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT); if (networkID.isSet()) { char idStr[64]; networkID.toStr(idStr); sock->writeLine("%s %s",PCX_HS_NETWORKID,idStr); } if (servMgr->isRoot) { sock->writeLine("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0); sock->writeLine("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL); sock->writeLine("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL); sock->writeLine("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast); //sock->writeLine("%s %d",PCX_HS_FULLHIT,2); if (diffRootVer) { sock->writeString(PCX_HS_DL); switch(osType) { case 1: sock->writeLine(PCX_DL_LINUX); break; case 2: sock->writeLine(PCX_DL_WIN32); break; case 3: sock->writeLine(PCX_DL_MACOSX); break; case 4: sock->writeLine(PCX_DL_WINAMP2); break; default: sock->writeLine(PCX_DL_URL); break; } } sock->writeLine("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr()); } char hostIP[64]; Host h = sock->host; h.IPtoStr(hostIP); sock->writeLine("%s %s",PCX_HS_REMOTEIP,hostIP); sock->writeLine(""); while (http.nextHeader());
}
// -----------------------------------
bool Servent::pingHost(Host &rhost,GnuID &rsid)
{
char ipstr[64];
rhost.toStr(ipstr);
LOG_DEBUG("Ping host %s: trying..",ipstr);
ClientSocket *s=NULL;
try
{
s = sys->createSocket();
if (!s)
return false;
else
{
s->open(rhost);
s->connect();
AtomStream atom(*s);
atom.writeInt(PCP_CONNECT,1);
atom.writeParent(PCP_HELO,1);
atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
atom.writeInt(PCP_QUIT,0);
GnuID sid;
sid.clear();
int numc,numd;
ID4 id = atom.read(numc,numd);
if (id == PCP_OLEH)
{
for(int i=0; i<numc; i++)
{
int c,d;
ID4 pid = atom.read(c,d);
if (pid == PCP_SESSIONID)
atom.readBytes(sid.id,16,d);
else
atom.skip(c,d);
}
}else
{
LOG_DEBUG("Ping response: %s",id.getString().str());
throw StreamException("Bad ping response");
}
if (!sid.isSame(rsid))
throw StreamException("SIDs don`t match");
LOG_DEBUG("Ping host %s: OK",ipstr);
}
}catch(StreamException &e)
{
rhost.port = 0;
LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
}
if (s)
{
s->close();
delete s;
}
return true;
}
// -----------------------------------bool Servent::handshakeStream(ChanInfo &chanInfo){
HTTP http(*sock);
bool gotPCP=false;
unsigned int reqPos=0;
nsSwitchNum=0;
while (http.nextHeader()) { char *arg = http.getArgStr(); if (!arg) continue; if (http.isHeader(PCX_HS_PCP))
gotPCP = atoi(arg)!=0;
else if (http.isHeader(PCX_HS_POS))
reqPos = atoi(arg); else if (http.isHeader("icy-metadata")) addMetadata = atoi(arg) > 0;
else if (http.isHeader(HTTP_HS_AGENT))
agent = arg;
else if (http.isHeader("Pragma"))
{
char *ssc = stristr(arg,"stream-switch-count=");
char *so = stristr(arg,"stream-offset");
if (ssc || so)
{
nsSwitchNum=1;
//nsSwitchNum = atoi(ssc+20);
}
}
LOG_DEBUG("Stream: %s",http.cmdLine); }
if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
outputProtocol = ChanInfo::SP_PEERCAST;
if (outputProtocol == ChanInfo::SP_HTTP)
{
if ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
|| (chanInfo.contentType == ChanInfo::T_WMA)
|| (chanInfo.contentType == ChanInfo::T_WMV)
|| (chanInfo.contentType == ChanInfo::T_ASX)
)
outputProtocol = ChanInfo::SP_MMS;
}
bool chanFound=false;
bool chanReady=false;
Channel *ch = chanMgr->findChannelByID(chanInfo.id);
if (ch)
{
if (reqPos)
streamPos = ch->rawData.findOldestPos(reqPos);
else
streamPos = 0;
chanReady = canStream(ch);
}
ChanHitList *chl = chanMgr->findHitList(chanInfo);
if (chl)
{
chanFound = true;
}
bool result = false;
char idStr[64];
chanInfo.id.toStr(idStr);
char sidStr[64];
servMgr->sessionID.toStr(sidStr);
Host rhost = sock->host;
AtomStream atom(*sock);
if (!chanFound)
{
sock->writeLine(HTTP_SC_NOTFOUND);
sock->writeLine("");
LOG_DEBUG("Sending channel not found");
return false;
}
if (!chanReady)
{
if (outputProtocol == ChanInfo::SP_PCP)
{
LOG_DEBUG("Sending channel unavailable (with hits)");
sock->writeLine(HTTP_SC_UNAVAILABLE);
sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
sock->writeLine("");
handshakeIncomingPCP(atom,rhost,remoteID,agent);
if (chl)
{
ChanHit best;
if (!rhost.globalIP())
best = chl->getHit(&servMgr->serverHost,0,false,false); // find best hit this network
if (!best.host.ip)
best = chl->getHit(&rhost,0,false,false); // find best hit on same network
if (!best.host.ip)
best = chl->getHit(NULL,0,false,false); // else find best hit on other networks
// force push
//best.host.ip = 0;
//
if (best.host.ip)
{
best.writeAtoms(atom,true,chanInfo.id);
LOG_DEBUG("Sent 1 hit");
}
else if (rhost.port)
{
// send push request to best firewalled host on other network
best = chl->getHit(NULL,0,true,false);
if (best.host.ip)
servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_STREAM);
}
// if all else fails, contact tracker
if (!best.host.ip)
{
// find best tracker on this network
if (!rhost.globalIP())
best = chl->getHit(&servMgr->serverHost,0,false,true,true);
// find local tracker
if (!best.host.ip)
best = chl->getHit(&rhost,0,false,true,true);
// find global tracker
if (!best.host.ip)
best = chl->getHit(NULL,0,false,true,true);
if (best.host.ip)
{
best.writeAtoms(atom,true,chanInfo.id);
LOG_DEBUG("Sent 1 tracker hit");
}else
{
// else send push request to tracker
best = chl->getHit(NULL,0,true,true,true);
if (best.host.ip)
servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
}
}
if (!best.host.ip)
LOG_DEBUG("Failed to find any hits");
}
// return not available yet code
atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE);
result = false;
}else
{
LOG_DEBUG("Sending channel unavailable");
sock->writeLine(HTTP_SC_UNAVAILABLE);
sock->writeLine("");
result = false;
}
} else {
if (chanInfo.contentType != ChanInfo::T_MP3)
addMetadata=false;
if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP)) // winamp mp3 metadata check {
sock->writeLine(ICY_OK); sock->writeLine("%s %s",HTTP_HS_SERVER,PCX_AGENT); sock->writeLine("icy-name:%s",chanInfo.name.cstr()); sock->writeLine("icy-br:%d",chanInfo.bitrate); sock->writeLine("icy-genre:%s",chanInfo.genre.cstr()); sock->writeLine("icy-url:%s",chanInfo.url.cstr()); sock->writeLine("icy-metaint:%d",chanMgr->icyMetaInterval); sock->writeLine("%s %s",PCX_HS_CHANNELID,idStr); sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MP3); }else { sock->writeLine(HTTP_SC_OK); if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA)) { sock->writeLine("%s %s",HTTP_HS_SERVER,PCX_AGENT); sock->writeLine("Accept-Ranges: none"); sock->writeLine("x-audiocast-name: %s",chanInfo.name.cstr()); sock->writeLine("x-audiocast-bitrate: %d",chanInfo.bitrate); sock->writeLine("x-audiocast-genre: %s",chanInfo.genre.cstr()); sock->writeLine("x-audiocast-description: %s",chanInfo.desc.cstr()); sock->writeLine("x-audiocast-url: %s",chanInfo.url.cstr()); sock->writeLine("%s %s",PCX_HS_CHANNELID,idStr); }
if (outputProtocol == ChanInfo::SP_HTTP) { switch (chanInfo.contentType) { case ChanInfo::T_OGG: sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XOGG); break; case ChanInfo::T_MP3: sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MP3); break; case ChanInfo::T_MOV: sock->writeLine("Connection: close"); sock->writeLine("Content-Length: 10000000"); sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MOV); break; case ChanInfo::T_MPG: sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MPG); break; case ChanInfo::T_NSV: sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_NSV); break; case ChanInfo::T_ASX: sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_ASX);
break;
case ChanInfo::T_WMA:
sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_WMA);
break;
case ChanInfo::T_WMV: sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_WMV); break; } } else if (outputProtocol == ChanInfo::SP_MMS)
{
sock->writeLine("Server: Rex/9.0.0.2980");
sock->writeLine("Cache-Control: no-cache");
sock->writeLine("Pragma: no-cache");
sock->writeLine("Pragma: client-id=3587303426");
sock->writeLine("Pragma: features=\"broadcast,playlist\"");
if (nsSwitchNum)
{
sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MMS);
}else
{
sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
if (ch)
sock->writeLine("Content-Length: %d",ch->headPack.len);
sock->writeLine("Connection: Keep-Alive");
}
} else if (outputProtocol == ChanInfo::SP_PCP)
{
sock->writeLine("%s %d",PCX_HS_POS,streamPos);
sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
}else if (outputProtocol == ChanInfo::SP_PEERCAST) { sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST); } }
sock->writeLine("");
result = true;
if (gotPCP)
{
handshakeIncomingPCP(atom,rhost,remoteID,agent);
atom.writeInt(PCP_OK,0);
}
}
return result;}// -----------------------------------void Servent::handshakeGiv(GnuID &id){ if (id.isSet())
{ char idstr[64]; id.toStr(idstr); sock->writeLine("GIV /%s",idstr);
}else sock->writeLine("GIV");
sock->writeLine("");
}// -----------------------------------void Servent::processGnutella(){
type = T_PGNU;
if (servMgr->isRoot && !servMgr->needConnections())
{
processRoot();
return;
}
gnuStream.init(sock); setStatus(S_CONNECTED); gnuStream.ping(2);// if (type != T_LOOKUP)// chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2); lastPacket = lastPing = sys->getTime(); bool doneBigPing=false; const unsigned int abortTimeoutSecs = 60; // abort connection after 60 secs of no activitiy const unsigned int packetTimeoutSecs = 30; // ping connection after 30 secs of no activity unsigned int currBytes=0; unsigned int lastWait=0; unsigned int lastTotalIn=0,lastTotalOut=0; while (thread.active && sock->active()) { if (sock->readReady()) { lastPacket = sys->getTime(); if (gnuStream.readPacket(pack)) { unsigned int ver = pack.id.getVersion(); char ipstr[64]; sock->host.toStr(ipstr); GnuID routeID; GnuStream::R_TYPE ret = GnuStream::R_PROCESS; if (ver < MIN_PACKETVER) ret = GnuStream::R_BADVERSION; if (pack.func != GNU_FUNC_PONG) if (servMgr->seenPacket(pack)) ret = GnuStream::R_DUPLICATE; seenIDs.addGnuID(pack.id); servMgr->addVersion(ver); if (ret == GnuStream::R_PROCESS) { GnuID routeID; ret = gnuStream.processPacket(pack,this,routeID); if (flowControl && (ret == GnuStream::R_BROADCAST)) ret = GnuStream::R_DROP; } switch(ret) { case GnuStream::R_BROADCAST: if (servMgr->broadcast(pack,this)) stats.add(Stats::NUMBROADCASTED); else stats.add(Stats::NUMDROPPED); break; case GnuStream::R_ROUTE: if (servMgr->route(pack,routeID,NULL)) stats.add(Stats::NUMROUTED); else stats.add(Stats::NUMDROPPED);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -