📄 servent.cpp
字号:
{ try { sv->sock->writeLine(e.msg); if (e.code == 401) sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\""); sv->sock->writeLine(""); }catch(StreamException &){} LOG_ERROR("Incoming HTTP error: %s",e.msg); }catch(StreamException &e) { LOG_ERROR("Incoming Connect error: %s",e.msg); } sv->endThread(); return 0;}// -----------------------------------void Servent::processServent(){ setStatus(S_HANDSHAKE); handshakeIn(); if (!sock) throw StreamException("Servent has no socket"); if (servMgr->isRoot && !servMgr->needConnections()) processRelay(); else process();}// -----------------------------------void Servent::processStream(bool push, bool isRaw){ Channel *ch = NULL; try { setStatus(S_HANDSHAKE); ch = chanMgr->findChannelByID(chanID); if (!ch) throw StreamException("Not found"); chanIndex = ch->index; if (push) handshakeGiv(ch); else handshakeStream(ch,isRaw); servMgr->totalStreams++; Host host = sock->host; host.port = 0; // force to 0 so we ignore the incoming port GnuID noid; noid.clear(); if (isRaw) { // we have agent string now, so check again. if (!canPreview()) throw StreamException("Preview disallowed"); servMgr->addHost(host,ServHost::T_STREAM,0,noid); if ((addMetadata) && (chanMgr->icyMetaInterval)) sendRawMetaChannel(ch,chanMgr->icyMetaInterval); else sendRawChannel(ch); }else { servMgr->addHost(host,ServHost::T_CHANNEL,0,noid); sendChannel(ch); } setStatus(S_CLOSING); }catch(StreamException &e) { setStatus(S_ERROR); LOG_ERROR("Stream Error: %s",e.msg); }}// -----------------------------------------#if 0// debug FileStream file; file.openReadOnly("c://test.mp3"); LOG_DEBUG("raw file read"); char buf[4000]; int cnt=0; while (!file.eof()) { LOG_DEBUG("send %d",cnt++); file.read(buf,sizeof(buf)); sock->write(buf,sizeof(buf)); } file.close(); LOG_DEBUG("raw file sent"); return;// debug#endif// -----------------------------------bool Servent::waitForChannelHeader(Channel *ch){ for(int i=0; i<60; i++) { if (ch->isPlaying() && ch->syncPos) return true; if (!thread.active || !sock->active()) break; sys->sleep(1000); } return false;}// -----------------------------------bool Servent::checkPreview(unsigned int connectTime){ Host h = sock->host; h.port = 0; // ignore incoming port number if (!servMgr->isFiltered(ServFilter::F_DIRECT,h)) return false; if (isPrivate()) // always allow private clients return true; if (agent.contains("PeerCast")) // allow connections from peercast clients (direct relays etc..) return true; if (servMgr->maxPreviewTime == 0) return true; if ((sys->getTime()-connectTime) <= servMgr->maxPreviewTime) return true; return false;}// -----------------------------------bool Servent::canPreview(){ // probably not needed, but just in case we bind to the actual IP address and not localhost if (sock->host.ip == servMgr->serverHost.ip) return true; Host h = getHost(); h.port = 0; if (!servMgr->isFiltered(ServFilter::F_DIRECT,h)) return false; if (isPrivate()) // always allow private clients return true; if (agent.contains("PeerCast")) // allow connections from peercast clients (direct relays etc..) return true; if (servMgr->seenHost(sock->host,ServHost::T_STREAM,servMgr->maxPreviewWait)) return false; return true;}// -----------------------------------void Servent::sendRawChannel(Channel *ch){ ch->numListeners++; outputBitrate = ch->getBitrate(); try { sock->timeout = 10000; setStatus(S_CONNECTED); LOG_DEBUG("Starting Raw stream: %s",ch->info.name.cstr()); if (!waitForChannelHeader(ch)) throw StreamException("Channel not ready"); if (ch->headMeta.len) { LOG_DEBUG("Send %d bytes header ",ch->headMeta.len); sock->write(&ch->headMeta.data,ch->headMeta.len); } currPos=0; unsigned int mySync=0,chanSync=0; unsigned int syncTimeout=0; unsigned int connectTime=sys->getTime(); ChanPacket pack; while ((thread.active) && sock->active()) { if (!checkPreview(connectTime)) throw StreamException("Preview time limit reached"); if (!ch->isActive()) throw StreamException("Channel closed"); currPos = ch->chanData.readPacket(currPos,pack); //LOG_CHANNEL("Send POS: %d",currPos); if (pack.type == 'SYNC') { chanSync = *(unsigned int *)pack.data; //LOG_CHANNEL("Send SYNC: %d - %d",mySync,chanSync); int rs = mySync-chanSync; if (rs > ChanPacketBuffer::MAX_PACKETS) mySync = chanSync-1; } if (pack.type == 'DATA') { // skip packets until we`re back in sync if (chanSync > mySync) { //LOG_CHANNEL("Send DATA: %d",pack.len); syncTimeout=0; mySync = chanSync; sock->write(pack.data,pack.len); }else LOG_CHANNEL("SYNC wait: %d - %d",mySync-chanSync,syncTimeout); } syncTimeout++; if (syncTimeout > 30) throw StreamException("Unable to sync"); } }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); } if (ch->numListeners) ch->numListeners--;}// -----------------------------------void Servent::sendRawMetaChannel(Channel *ch, int interval){ ch->numListeners++; outputBitrate = ch->getBitrate(); try { sock->timeout = 10000; setStatus(S_CONNECTED); LOG_DEBUG("Starting Raw Meta stream: %s (metaint: %d)",ch->info.name.cstr(),interval); if (!waitForChannelHeader(ch)) throw StreamException("Channel not ready"); if (ch->headMeta.len) { LOG_DEBUG("Send %d bytes header ",ch->headMeta.len); sock->write(&ch->headMeta.data,ch->headMeta.len); } String lastTitle,lastURL; currPos=0; unsigned int mySync=0,chanSync=0; int lastMsgTime=sys->getTime(); bool showMsg=true; ChanPacket pack; char buf[16384]; int bufPos=0; if ((interval > sizeof(buf)) || (interval < 1)) throw StreamException("Bad ICY Meta Interval value"); unsigned int syncTimeout=0; unsigned int connectTime = sys->getTime(); while ((thread.active) && sock->active()) { if (!checkPreview(connectTime)) throw StreamException("Preview time limit reached"); if (!ch->isActive()) throw StreamException("Channel closed"); currPos = ch->chanData.readPacket(currPos,pack); MemoryStream mem(pack.data,pack.len); if (pack.type == 'SYNC') { chanSync = mem.readLong(); int rs = mySync-chanSync; if (rs > ChanPacketBuffer::MAX_PACKETS) mySync = chanSync-1; } if (pack.type == 'DATA') { // skip packets until we`re back in sync if (chanSync > mySync) { syncTimeout = 0; mySync = chanSync; int len = pack.len; char *p = pack.data; while (len) { int rl = len; if ((bufPos+rl) > interval) rl = interval-bufPos; memcpy(&buf[bufPos],p,rl); bufPos+=rl; p+=rl; len-=rl; if (bufPos >= interval) { bufPos = 0; sock->write(buf,interval); if (chanMgr->broadcastMsgInterval) if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval) { showMsg ^= true; lastMsgTime = sys->getTime(); } String *metaTitle = &ch->info.track.title; if (!ch->info.comment.isEmpty() && (showMsg)) metaTitle = &ch->info.comment; if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL)) { char tmp[1024]; String title,url; title = *metaTitle; url = ch->info.url; title.convertTo(String::T_META); url.convertTo(String::T_META); sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr()); int len = ((strlen(tmp) + 15+1) / 16); sock->writeChar(len); sock->write(tmp,len*16); lastTitle = *metaTitle; lastURL = ch->info.url; LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr()); }else { sock->writeChar(0); } } } }else { LOG_DEBUG("SYNC wait: %d - %d",mySync-chanSync,syncTimeout); } } syncTimeout++; if (syncTimeout > 30) throw StreamException("Unable to sync"); } }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); } if (ch->numListeners) ch->numListeners--;}// -----------------------------------void Servent::sendChannel(Channel *ch){ ch->numRelays++; outputBitrate = ch->getBitrate(); try { sock->timeout = 10000; setStatus(S_CONNECTED); LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr()); currPos=0; sock->writeTag("PCST"); ChanPacket pack; pack.init('HEAD',ch->headMeta.data,ch->headMeta.len); pack.write(*sock); pack.init('META',ch->insertMeta.data,ch->insertMeta.len); pack.write(*sock); while ((thread.active) && sock->active()) { if (!ch->isActive()) throw StreamException("Channel closed"); unsigned int np = ch->chanData.readPacket(currPos,pack); if ((np-currPos) > 1) LOG_DEBUG("sendChannel skip: %d",np-currPos); currPos = np; pack.write(*sock); } }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); } if (ch->numRelays) ch->numRelays--;}// -----------------------------------int Servent::serverProc(ThreadInfo *thread){ thread->lock(); Servent *sv = (Servent*)thread->data; try { if (!sv->sock) throw StreamException("Server has no socket"); sv->setStatus(S_LISTENING); //LOG4("Listening on port %d",sv->sock->host.port); char servIP[64]; sv->sock->host.toStr(servIP); if (servMgr->isRoot) LOG_DEBUG("Root Server started: %s",servIP); else LOG_DEBUG("Server started: %s",servIP); while ((thread->active) && (sv->sock->active())) { ClientSocket *cs = sv->sock->accept(); if (cs) { // if global IP then we`re not firewalled if (cs->host.globalIP()) servMgr->setFirewall(ServMgr::FW_OFF); Servent *ns = servMgr->allocServent(); if (ns) ns->initIncoming(cs,sv->allow); else LOG_ERROR("Out of servents"); } } }catch(StreamException &e) { LOG_ERROR("Server Error: %s:%d",e.msg,e.err); } LOG_DEBUG("Server stopped"); sv->endThread(); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -