📄 servent.cpp
字号:
} } // if this is a priority connection and all incoming connections // are full then kill an old connection to make room. Otherwise reject connection. if (!priorityConnect) if (servMgr->inFull()) throw HTTPException(HTTP_SC_UNAVAILABLE,503); if ((stricmp(servMgr->network.cstr(),"peercast")!=0) && (!subnetValid)) throw HTTPException(HTTP_SC_NOTFOUND,404); if (!versionValid) { throw HTTPException(HTTP_SC_UNAUTHORIZED,401); } sock->writeLine(GNU_OK); sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT); sock->writeLine("%s %s",PCX_HS_SUBNET,servMgr->network.cstr()); if (servMgr->isRoot) { sock->writeLine("%s %d",PCX_HS_BCTTL,9); //sock->writeLine("%s %d",PCX_HS_FULLHIT,2); if (diffRootVer) { sock->writeString(PCX_HS_DL); switch(osType) { case 1: sock->writeLine(PCX_DL_LINUXDYN); break; case 2: sock->writeLine(PCX_DL_LINUXSTA); break; case 3: sock->writeLine(PCX_DL_WIN32); break; case 4: sock->writeLine(PCX_DL_MACOSX); break; case 5: sock->writeLine(PCX_DL_WINAMP2); break; default: sock->writeLine(PCX_DL_URL); break; } } sock->writeLine("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr()); } char hostIP[64]; Host h = sock->host; h.IPtoStr(hostIP); sock->writeLine("Remote-IP: %s",hostIP); sock->writeLine(""); while (http.nextHeader());}// -----------------------------------void Servent::handshakeStream(Channel *ch, bool isRaw){ sock->timeout = 10000; HTTP http(*sock); char idStr[64]; ch->info.id.toStr(idStr); while (http.nextHeader()) { char *arg = http.getArgStr(); if (!arg) continue; if (http.isHeader(HTTP_HS_AGENT)) agent.set(arg); if (ch->info.contentType == ChanInfo::T_MP3) if (http.isHeader("icy-metadata")) addMetadata = atoi(arg) > 0; LOG_DEBUG("Stream: %s",http.cmdLine); } if (addMetadata && isRaw) // winamp mp3 metadata check { sock->writeLine(ICY_OK); sock->writeLine("%s %s",HTTP_HS_SERVER,PCX_AGENT); sock->writeLine("icy-name:%s",ch->getName()); sock->writeLine("icy-br:%d",ch->getBitrate()); sock->writeLine("icy-genre:%s",ch->info.genre.cstr()); sock->writeLine("icy-url:%s",ch->info.url.cstr()); sock->writeLine("icy-metaint:%d",chanMgr->icyMetaInterval); sock->writeLine("%s %s",PCX_HS_CHANNELID,idStr); sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MP3); }else { sock->writeLine(HTTP_SC_OK); sock->writeLine("%s %s",HTTP_HS_SERVER,PCX_AGENT); // sock->writeLine("icy-metaint:0"); sock->writeLine("x-audiocast-name: %s",ch->getName()); sock->writeLine("x-audiocast-bitrate: %d",ch->getBitrate()); sock->writeLine("x-audiocast-genre: %s",ch->info.genre.cstr()); sock->writeLine("x-audiocast-description: %s",ch->info.desc.cstr()); sock->writeLine("x-audiocast-url: %s",ch->info.url.cstr()); sock->writeLine("%s %s",PCX_HS_CHANNELID,idStr); if (isRaw) { switch (ch->info.contentType) { case ChanInfo::T_OGG: sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XOGG); break; case ChanInfo::T_MP3: sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MP3); break; case ChanInfo::T_MOV: sock->writeLine("Accept-Ranges: bytes"); sock->writeLine("Connection: close"); sock->writeLine("Content-Length: 10000000"); sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MOV); break; case ChanInfo::T_MPG: sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MPG); break; case ChanInfo::T_NSV: sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_NSV); break; } }else { sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST); } } sock->writeLine("");}// -----------------------------------void Servent::handshakeGiv(Channel *ch){ sock->timeout = 10000; char idstr[64]; pushID.toStr(idstr); sock->writeLine("GIV %d:%s/%s",chanIndex,idstr,ch->info.name.cstr()); handshakeStream(ch,false);}// -----------------------------------void Servent::process(){ try { gnuStream.init(sock); setStatus(S_CONNECTED); sock->timeout = 10000; // 10 seconds gnuStream.ping(2); if (type != T_LOOKUP) chanMgr->broadcastRelays(2,this); lastPacket = lastPing = sys->getTime(); bool doneBigPing=false; const unsigned int abortTimeoutSecs = 60; // abort connection after 60 secs of no activitiy const unsigned int packetTimeoutSecs = 30; // ping connection after 30 secs of no activity unsigned int currBytes=0,lastTotal=0; unsigned int lastWait=0; while (thread.active && sock->active()) { if (sock->readPending()) { lastPacket = sys->getTime(); if (gnuStream.readPacket(pack)) { unsigned int ver = pack.id.getVersion(); char ipstr[64]; sock->host.toStr(ipstr); GnuID routeID; GnuStream::R_TYPE ret = GnuStream::R_PROCESS; if ((ver < MIN_PACKETVER) || (ver >= MAX_PACKETVER)) ret = GnuStream::R_BADVERSION; if (pack.func != GNU_FUNC_PONG) if (servMgr->seenPacket(pack)) ret = GnuStream::R_DUPLICATE; seenIDs.addGnuID(pack.id); servMgr->addVersion(ver); if (ret == GnuStream::R_PROCESS) { GnuID routeID; ret = gnuStream.processPacket(pack,this,routeID); } switch(ret) { case GnuStream::R_BROADCAST: if (servMgr->broadcast(pack,this)) stats.add(Stats::NUMBROADCASTED); else stats.add(Stats::NUMDROPPED); break; case GnuStream::R_ROUTE: if (servMgr->route(pack,routeID,NULL)) stats.add(Stats::NUMROUTED); else stats.add(Stats::NUMDROPPED); break; case GnuStream::R_ACCEPTED: stats.add(Stats::NUMACCEPTED); break; case GnuStream::R_DUPLICATE: stats.add(Stats::NUMDUP); break; case GnuStream::R_DEAD: stats.add(Stats::NUMDEAD); break; case GnuStream::R_DISCARD: stats.add(Stats::NUMDISCARDED); break; case GnuStream::R_BADVERSION: stats.add(Stats::NUMOLD); break; } LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, v%05x from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,pack.id.getVersion(),ipstr); }else{ LOG_ERROR("Bad packet"); } } GnuPacket *p; if ((p=outPacketsPri.curr())) // priority packet { gnuStream.sendPacket(*p); seenIDs.addGnuID(p->id); outPacketsPri.next(); } else if ((p=outPacketsNorm.curr())) // or.. normal packet { gnuStream.sendPacket(*p); seenIDs.addGnuID(p->id); outPacketsNorm.next(); } int lpt = sys->getTime()-lastPacket; if (!doneBigPing) { if ((sys->getTime()-lastPing) > 15) { gnuStream.ping(7); lastPing = sys->getTime(); doneBigPing = true; } }else{ if (lpt > packetTimeoutSecs) { if ((sys->getTime()-lastPing) > packetTimeoutSecs) { gnuStream.ping(1); lastPing = sys->getTime(); } } } if (lpt > abortTimeoutSecs) throw TimeoutException(); unsigned int tot = sock->totalBytesIn+sock->totalBytesOut; unsigned int bytes = (tot-lastTotal); lastTotal = tot; unsigned int delay = sys->idleSleepTime; if ((bytes) && (servMgr->serventBandwidth >= 8)) delay = (bytes*1000)/(servMgr->serventBandwidth/8); if (delay < sys->idleSleepTime) delay = sys->idleSleepTime; sys->sleep(delay); // if this is a private servent (has networkID) then check to see // that the channel is still connected. Otherwise, closedown. if (networkID.isSet()) { bool hasChan=true; Channel *ch = chanMgr->findChannelByID(networkID); if (ch) { if (ch->status == Channel::S_IDLE) throw StreamException("NetID idle"); } } } }catch(StreamException &e) { LOG_ERROR("Servent closed: %s",e.msg); }}// -----------------------------------void Servent::processRelay(){ try { sock->timeout = 10000; // 10 seconds gnuStream.init(sock); setStatus(S_CONNECTED); gnuStream.ping(2); while (thread.active && sock->active()) { if (gnuStream.readPacket(pack)) { char ipstr[64]; sock->host.toStr(ipstr); unsigned int ver = pack.id.getVersion(); servMgr->addVersion(ver); LOG_NETWORK("packet in: %d v%05x from %s",pack.func,pack.getVersion(),ipstr); //if (pack.id.getVersion() < MIN_PACKETVER) // break; if (pack.func == 0) // if ping then pong back some hosts and close { Host hl[32]; int cnt = servMgr->getNewestServents(hl,32,sock->host); if (cnt) { int start = sys->rnd() % cnt; for(int i=0; i<8; i++) { GnuPacket pong; pack.hops = 1; pong.initPong(hl[start],false,pack); gnuStream.sendPacket(pong); char ipstr[64]; hl[start].toStr(ipstr); //LOG_NETWORK("Pong %d: %s",start+1,ipstr); start = (start+1) % cnt; } char str[64]; sock->host.toStr(str); LOG_NETWORK("Sent 8 pongs to %s",str); }else { LOG_NETWORK("No Pongs to send"); //return; } }else if (pack.func == 1) // pong? { MemoryStream pong(pack.data,pack.len); int ip,port; port = pong.readShort(); ip = pong.readLong(); ip = SWAP4(ip); LOG_NETWORK("pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port); if ((ip) && (port)) { Host h(ip,port); servMgr->addHost(h,ServHost::T_SERVENT,0,networkID); } return; } if (gnuStream.packetsIn > 5) // die if we get too many packets return; } } }catch(StreamException &e) { LOG_ERROR("Relay: %s",e.msg); }}// -----------------------------------int Servent::givProc(ThreadInfo *thread){ thread->lock(); Servent *sv = (Servent*)thread->data; try { sv->processStream(true,false); }catch(StreamException &e) { LOG_ERROR("GIV: %s",e.msg); } sv->endThread(); return 0;}// -----------------------------------int Servent::outgoingProc(ThreadInfo *thread){ thread->lock(); Servent *sv = (Servent*)thread->data; try { sv->setStatus(S_CONNECTING); sv->sock->timeout = 10000; // 10 seconds handshake sv->sock->connect(); sv->setStatus(S_HANDSHAKE); sv->handshakeOut(); sv->process(); }catch(TimeoutException &e) { LOG_ERROR("Outgoing Timeout: %s",e.msg); servMgr->deadHost(sv->sock->host,ServHost::T_SERVENT); sv->setStatus(S_TIMEOUT); }catch(StreamException &e) { servMgr->deadHost(sv->sock->host,ServHost::T_SERVENT); LOG_ERROR("Outgoing: %s",e.msg); sv->setStatus(S_REFUSED); if (sv->type == T_LOOKUP) sys->sleep(30000); } sv->endThread(); return 0;}// -----------------------------------int Servent::incomingProc(ThreadInfo *thread){ thread->lock(); Servent *sv = (Servent*)thread->data; try { sv->handshakeIncoming(); }catch(HTTPException &e)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -