📄 servent.cpp
字号:
if (ret == GnuStream::R_PROCESS) { GnuID routeID; GnuPacket out; out.func = 255; ret = gnuStream.processPacket(pack,out,this,routeID); if (out.func != 255) { LOG_NETWORK("Sent return packet: %s",GNU_FUNC_STR(out.func)); outputPacket(out,false); } } switch(ret) { case GnuStream::R_BROADCAST: if (!servMgr->broadcast(pack,this)) stats.add(Stats::NUMDROPPED); else stats.add(Stats::NUMBROADCASTED); break; case GnuStream::R_ROUTE: if (!servMgr->route(pack,routeID,NULL)) stats.add(Stats::NUMDROPPED); else stats.add(Stats::NUMROUTED); break; case GnuStream::R_ACCEPTED: stats.add(Stats::NUMACCEPTED); break; case GnuStream::R_DUPLICATE: stats.add(Stats::NUMDUP); break; case GnuStream::R_DEAD: stats.add(Stats::NUMDEAD); break; case GnuStream::R_DISCARD: stats.add(Stats::NUMDISCARDED); break; case GnuStream::R_BADVERSION: stats.add(Stats::NUMOLD); break; } LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, v%05x from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,pack.id.getVersion(),ipstr); }else{ LOG_ERROR("Bad packet"); } } GnuPacket *p; if ((p=outPacketsPri.curr())) // priority packet { gnuStream.sendPacket(*p); seenIDs.addGnuID(p->id); outPacketsPri.next(); } else if ((p=outPacketsNorm.curr())) // or.. normal packet { gnuStream.sendPacket(*p); seenIDs.addGnuID(p->id); outPacketsNorm.next(); } int lpt = sys->getTime()-lastPacket; if (!doneBigPing) { if ((sys->getTime()-lastPing) > 15) { gnuStream.ping(7); lastPing = sys->getTime(); doneBigPing = true; } }else{ if (lpt > packetTimeoutSecs) { if ((sys->getTime()-lastPing) > packetTimeoutSecs) { gnuStream.ping(1); lastPing = sys->getTime(); } } } if (lpt > abortTimeoutSecs) throw TimeoutException(); unsigned int tot = sock->totalBytesIn+sock->totalBytesOut; unsigned int bytes = (tot-lastTotal); lastTotal = tot; unsigned int delay = sys->idleSleepTime; if ((bytes) && (servMgr->serventBandwidth >= 8)) delay = (bytes*1000)/(servMgr->serventBandwidth/8); if (delay < sys->idleSleepTime) delay = sys->idleSleepTime; sys->sleep(delay); } }catch(StreamException &e) { LOG_ERROR("Process Error: %s",e.msg); }}// -----------------------------------void Servent::processRelay(){ try { sock->timeout = 10000; // 10 seconds gnuStream.init(sock); setStatus(S_CONNECTED); gnuStream.ping(2); while (thread.active && sock->active()) { if (gnuStream.readPacket(pack)) { char ipstr[64]; sock->host.toStr(ipstr); unsigned int ver = pack.id.getVersion(); servMgr->addVersion(ver); LOG_NETWORK("packet in: %d v%05x from %s",pack.func,pack.getVersion(),ipstr); //if (pack.id.getVersion() < MIN_PACKETVER) // break; if (pack.func == 0) // if ping then pong back some hosts and close { Host hl[64]; int cnt = servMgr->getNewestHosts(hl,64,sock->host); if (cnt) { int start = sys->rnd() % cnt; for(int i=0; i<8; i++) { GnuPacket pong; pack.hops = 1; pong.initPong(hl[start],false,pack); gnuStream.sendPacket(pong); start = (start+1) % cnt; } char str[64]; sock->host.toStr(str); LOG_NETWORK("Sent 8 pongs to %s",str); }else { LOG_NETWORK("No Pongs to send"); return; } }else if (pack.func == 1) // pong? { MemoryStream pong(pack.data,pack.len); int ip,port; port = pong.readShort(); ip = pong.readLong(); ip = SWAP4(ip); LOG_NETWORK("pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port); if ((ip) && (port)) { Host h(ip,port); servMgr->addHost(h,true); } return; } if (gnuStream.packetsIn > 5) // die if we get too many packets return; } } }catch(StreamException &e) { LOG_ERROR("Relay: %s",e.msg); }}// -----------------------------------int Servent::givProc(ThreadInfo *thread){ thread->lock(); Servent *sv = (Servent*)thread->data; try { sv->processStream(true,false); }catch(StreamException &e) { LOG_ERROR("GIV: %s",e.msg); } sv->endThread(); return 0;}// -----------------------------------int Servent::outgoingProc(ThreadInfo *thread){ thread->lock(); Servent *sv = (Servent*)thread->data; try { sv->setStatus(S_CONNECTING); sv->sock->timeout = 10000; // 10 seconds handshake sv->sock->connect(); sv->setStatus(S_HANDSHAKE); sv->handshakeOut(); sv->process(); }catch(TimeoutException &e) { LOG_ERROR("Outgoing Timeout: %s",e.msg); servMgr->deadHost(sv->sock->host); sv->setStatus(S_TIMEOUT); }catch(StreamException &e) { servMgr->deadHost(sv->sock->host); LOG_ERROR("Outgoing: %s",e.msg); sv->setStatus(S_REFUSED); if (sv->type == T_LOOKUP) sys->sleep(30000); } sv->endThread(); return 0;}// -----------------------------------int Servent::incomingProc(ThreadInfo *thread){ thread->lock(); Servent *sv = (Servent*)thread->data; try { sv->handshakeIncoming(); }catch(HTTPException &e) { try { sv->sock->writeLine(e.msg); if (e.code == 401) sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\""); sv->sock->writeLine(""); }catch(StreamException &){} LOG_ERROR("Incoming HTTP error: %s",e.msg); }catch(StreamException &e) { LOG_ERROR("Incoming Connect error: %s",e.msg); } sv->endThread(); return 0;}// -----------------------------------void Servent::processServent(){ setStatus(S_HANDSHAKE); handshakeIn(); if (!sock) throw StreamException("Servent has no socket"); if (servMgr->isRoot && !servMgr->needConnections()) processRelay(); else process();}// -----------------------------------void Servent::processStream(bool push, bool isRaw){ Channel *ch = NULL; try { setStatus(S_HANDSHAKE); ch = chanMgr->findChannelByID(chanID); if (!ch) throw StreamException("Not found"); chanIndex = ch->index; if (push) handshakeGiv(ch); else handshakeStream(ch,isRaw); servMgr->totalStreams++; if (isRaw) { if (addMetadata) sendRawMetaChannel(ch,chanMgr->icyMetaInterval); else sendRawChannel(ch); }else sendChannel(ch); setStatus(S_CLOSING); }catch(StreamException &e) { setStatus(S_ERROR); LOG_ERROR("Stream Error: %s",e.msg); }}// -----------------------------------------#if 0// debug FileStream file; file.openReadOnly("c://test.mp3"); LOG_DEBUG("raw file read"); char buf[4000]; int cnt=0; while (!file.eof()) { LOG_DEBUG("send %d",cnt++); file.read(buf,sizeof(buf)); sock->write(buf,sizeof(buf)); } file.close(); LOG_DEBUG("raw file sent"); return;// debug#endif// -----------------------------------bool Servent::waitForChannelHeader(Channel *ch){ for(int i=0; i<60; i++) { if (ch->isPlaying() && ch->syncPos) return true; if (!thread.active || !sock->active()) break; sys->sleep(1000); } return false;}// -----------------------------------void Servent::sendRawChannel(Channel *ch){ ch->numListeners++; outputBitrate = ch->getBitrate(); try { sock->timeout = 10000; setStatus(S_CONNECTED); LOG_DEBUG("Starting Raw stream: %s",ch->info.name.cstr()); if (!waitForChannelHeader(ch)) throw StreamException("Channel not ready"); if (ch->headMeta.len) { LOG_DEBUG("Send %d bytes header ",ch->headMeta.len); sock->write(&ch->headMeta.data,ch->headMeta.len); } currPos=0; unsigned int mySync=0,chanSync=0; unsigned int syncTimeout=0; ChanPacket pack; while ((thread.active) && sock->active()) { if (!ch->isActive()) throw StreamException("Channel closed"); currPos = ch->chanData.readPacket(currPos,pack); //LOG_CHANNEL("Send POS: %d",currPos); if (pack.type == 'SYNC') { chanSync = *(unsigned int *)pack.data; //LOG_CHANNEL("Send SYNC: %d",chanSync); int rs = mySync-chanSync; if (rs > ChanPacketBuffer::MAX_PACKETS) mySync = chanSync-1; } if (pack.type == 'DATA') { // skip packets until we`re back in sync if (chanSync <= mySync) { syncTimeout++; if (syncTimeout > 30) throw StreamException("Unable to sync"); LOG_CHANNEL("SYNC wait: %d - %d",mySync-chanSync,syncTimeout); }else { //LOG_CHANNEL("Send DATA: %d",chanSync); syncTimeout=0; mySync = chanSync; sock->write(pack.data,pack.len); } } } }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); } if (ch->numListeners) ch->numListeners--;}// -----------------------------------void Servent::sendRawMetaChannel(Channel *ch, int interval){ ch->numListeners++; outputBitrate = ch->getBitrate(); try { sock->timeout = 10000; setStatus(S_CONNECTED); LOG_DEBUG("Starting Raw Meta stream: %s",ch->info.name.cstr()); if (!waitForChannelHeader(ch)) throw StreamException("Channel not ready"); if (ch->headMeta.len) { LOG_DEBUG("Send %d bytes header ",ch->headMeta.len); sock->write(&ch->headMeta.data,ch->headMeta.len); } String lastTitle,lastURL; currPos=0; unsigned int mySync=0,chanSync=0; int lastMsgTime=sys->getTime(); bool showMsg=true; ChanPacket pack; char buf[16384]; int bufPos=0; if ((interval > sizeof(buf)) || (interval < 1)) throw StreamException("Bad ICY Meta Interval value"); unsigned int syncTimeout=0; while ((thread.active) && sock->active()) { if (!ch->isActive()) throw StreamException("Channel closed"); currPos = ch->chanData.readPacket(currPos,pack); MemoryStream mem(pack.data,pack.len); if (pack.type == 'SYNC') { chanSync = mem.readLong(); int rs = mySync-chanSync; if (rs > ChanPacketBuffer::MAX_PACKETS) mySync = chanSync-1; } if (pack.type == 'DATA') { // skip packets until we`re back in sync if (chanSync <= mySync) { syncTimeout++; if (syncTimeout > 30) throw StreamException("Unable to sync"); LOG_DEBUG("SYNC wait: %d - %d",mySync-chanSync,syncTimeout); }else { syncTimeout = 0; mySync = chanSync; int len = pack.len; char *p = pack.data; while (len) { int rl = len; if ((bufPos+rl) > interval) rl = interval-bufPos; memcpy(&buf[bufPos],p,rl); bufPos+=rl; p+=rl; len-=rl; if (bufPos >= interval) { bufPos = 0; sock->write(buf,interval); if (chanMgr->broadcastMsgInterval) if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval) { showMsg ^= true; lastMsgTime = sys->getTime(); } String *metaTitle = &ch->info.track.title; if (!ch->info.comment.isEmpty() && (showMsg)) metaTitle = &ch->info.comment; if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL)) { char tmp[1024]; String title,url; title = *metaTitle; url = ch->info.url; title.convertTo(String::T_META); url.convertTo(String::T_META); sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr()); int len = ((strlen(tmp) + 15+1) / 16); sock->writeChar(len); sock->write(tmp,len*16); lastTitle = *metaTitle; lastURL = ch->info.url; LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr()); }else sock->writeChar(0); } } } } } }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); } if (ch->numListeners) ch->numListeners--;}// -----------------------------------void Servent::sendChannel(Channel *ch){ ch->numRelays++; outputBitrate = ch->getBitrate(); try { sock->timeout = 10000; setStatus(S_CONNECTED); LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr()); currPos=0; sock->writeTag("PCST"); ChanPacket pack; pack.init('HEAD',ch->headMeta.data,ch->headMeta.len); pack.write(*sock); pack.init('META',ch->insertMeta.data,ch->insertMeta.len); pack.write(*sock); while ((thread.active) && sock->active()) { if (!ch->isActive()) throw StreamException("Channel closed"); unsigned int np = ch->chanData.readPacket(currPos,pack); if ((np-currPos) > 1) LOG_DEBUG("sendChannel skip: %d",np-currPos); currPos = np; pack.write(*sock); } }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); } if (ch->numRelays) ch->numRelays--;}// -----------------------------------int Servent::serverProc(ThreadInfo *thread){ thread->lock(); Servent *sv = (Servent*)thread->data; try { if (!sv->sock) throw StreamException("Server has no socket"); sv->setStatus(S_LISTENING); //LOG4("Listening on port %d",sv->sock->host.port); char servIP[64]; sv->sock->host.toStr(servIP); if (servMgr->isRoot) LOG_DEBUG("Root Server started: %s",servIP); else LOG_DEBUG("Server started: %s",servIP); while ((thread->active) && (sv->sock->active())) { ClientSocket *cs = sv->sock->accept(); if (cs) { // if global IP then we`re not firewalled if (cs->host.globalIP()) servMgr->setFirewall(ServMgr::FW_OFF); Servent *ns = servMgr->allocServent(); if (ns) ns->initIncoming(cs,sv->allow); else LOG_ERROR("Out of servents"); } } }catch(StreamException &e) { LOG_ERROR("Server Error: %s:%d",e.msg,e.err); } LOG_DEBUG("Server stopped"); sv->endThread(); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -