📄 channel.cpp
字号:
int spos = gpos % MAX_DATALEN; // start int epos = pos % MAX_DATALEN; // end unsigned int rlen; if (pos >= MAX_DATALEN) { if (spos < epos) rlen = epos-spos; else rlen = MAX_DATALEN-spos; }else{ rlen = epos-spos; } // read as many bytes as we can if (rlen > len) rlen = len; if (rlen) { //LOG("BUF READ gpos=%d, pos=%d, spos=%d, epos=%d, avail=%d",gpos,pos,spos,epos,avail); if (rlen > MAX_DATALEN) { // something went horribly wrong throw StreamException("Chan buffer read fail"); } memcpy(p,&data[spos],rlen); p = (char *)p+rlen; len -= rlen; gpos += rlen; } } return gpos;}// -----------------------------------void ChanBuffer::write(const void *p,int len){ while (len) { unsigned int bpos = pos % MAX_DATALEN; int wlen = len; if ((bpos+wlen) > MAX_DATALEN) wlen = MAX_DATALEN-bpos; memcpy(&data[bpos],p,wlen); p = (char *)p+wlen; len -= wlen; pos += wlen; }}// -----------------------------------void ChanBuffer::writePacket(ChanPacket &pack){ lock.on(); unsigned int t = sys->gtohl( SWAP4(pack.type) ); //LOG("write packet %x,%d",t,pack.len); lastPacket = pos; if ((lastPacket-firstPacket) > MAX_DATALEN) firstPacket = lastPacket; write(&t,4); write(&pack.len,sizeof(int)); write(pack.data,pack.len); lock.off();}#endif// -----------------------------------void ChanPacketBuffer::writePacket(ChanPacket &pack){ lock.on(); packets[currPacket%MAX_PACKETS] = pack; lastPacket = currPacket; currPacket++; if (currPacket <= MAX_PACKETS) firstPacket = 0; else firstPacket = currPacket-MAX_PACKETS; lock.off();}// -----------------------------------unsigned int ChanPacketBuffer::readPacket(unsigned int pos,ChanPacket &pack){ unsigned int tim = sys->getTime(); if (pos > currPacket) pos = currPacket; while (pos == currPacket) { sys->sleepIdle(); if ((sys->getTime() - tim) > 30) throw TimeoutException(); } lock.on(); if (pos == 0) // start of stream pos = firstPacket; // or lastPacket else if (pos < firstPacket) // too far behind pos = firstPacket; // so skip forward pack = packets[pos%MAX_PACKETS]; pos++; lock.off(); return pos;}#if 0// -----------------------------------unsigned int ChanBuffer::readPacket(unsigned int gpos, ChanPacket &pack){ while (gpos >= pos) sys->sleepIdle(); lock.on(); try { gpos = read(gpos,&pack.type,sizeof(pack.type)); pack.type = sys->gtohl( SWAP4(pack.type) ); gpos = read(gpos,&pack.len,sizeof(pack.len)); if (pack.len > ChanPacket::MAX_DATALEN) pack.len = ChanPacket::MAX_DATALEN; gpos = read(gpos,pack.data,pack.len); }catch (StreamException &) { } lock.off(); return gpos;}#endif// -----------------------------------void Channel::processOggIdent(OggPacket &ogg){ MemoryStream in(ogg.getContent(),ogg.getContentLen()); in.skip(7); int ver = in.readLong(); int chans = in.readChar(); int rate = in.readLong(); int brMax = in.readLong(); int brNom = in.readLong(); int brLow = in.readLong(); LOG_CHANNEL("OGG Ident: ver=%d, chans=%d, rate=%d, brMax=%d, brNom=%d, brLow=%d", ver,chans,rate,brMax,brNom,brLow); if (brNom > 0) info.bitrate = brNom/1000; else info.bitrate = 0;}// -----------------------------------void Channel::processOggComment(OggPacket &ogg){ MemoryStream in(ogg.getContent(),ogg.getContentLen()); in.skip(7); // skip type + 'vorbis' int vLen = in.readLong(); // vendor len in.skip(vLen); char argBuf[256]; info.track.clear(); int cLen = in.readLong(); // comment len for(int i=0; i<cLen; i++) { int l = in.readLong(); if (l > sizeof(argBuf)) throw StreamException("Comment string too long"); in.read(argBuf,l); argBuf[l] = 0; LOG_CHANNEL("OGG Comment: %s",argBuf); char *arg; if ((arg=stristr(argBuf,"ARTIST="))) info.track.artist.set(arg+7,String::T_ASCII); else if ((arg=stristr(argBuf,"TITLE="))) info.track.title.set(arg+6,String::T_ASCII); else if ((arg=stristr(argBuf,"GENRE="))) info.track.genre.set(arg+6,String::T_ASCII); else if ((arg=stristr(argBuf,"CONTACT="))) info.track.contact.set(arg+8,String::T_ASCII); else if ((arg=stristr(argBuf,"ALBUM="))) info.track.album.set(arg+6,String::T_ASCII); } updateMeta();}// -----------------------------------void Channel::getStreamPath(char *str){ char idStr[64]; getIDStr(idStr); sprintf(str,"/stream/%s.%s",idStr,info.getTypeExt(info.contentType));}// -----------------------------------void Channel::readOGG(){ OggPacket ogg; ChanPacket pack; while ((!input->eof() && (thread.active))) { ogg.read(*input); if (ogg.isBOS()) { LOG_CHANNEL("Got OGG BOS"); headMeta.len = 0; headMeta.cnt = 0; } if (ogg.isVorbisPacket()) { if (ogg.getVorbisType()==3) { LOG_CHANNEL("Vorbis: Comment"); processOggComment(ogg); } } // first 3 headers of OGG are needed if (headMeta.cnt < 3) { //LOG("New OGG Header %d - %02x - %x",headMeta.cnt,ogg.data[5],*(unsigned int *)&ogg.data[14]); if (headMeta.len+ogg.len > ChanMeta::MAX_DATALEN) throw StreamException("OGG packet too big for headMeta"); if (ogg.isVorbisPacket()) { switch (ogg.getVorbisType()) { case 1: // ident LOG_CHANNEL("Vorbis Header: Ident"); processOggIdent(ogg); break; case 3: // comment LOG_CHANNEL("Vorbis Header: Comment"); //processOggComment(ogg); break; case 5: // setup LOG_CHANNEL("Vorbis Header: Setup"); break; } memcpy(&headMeta.data[headMeta.len],ogg.data,ogg.len); headMeta.len += ogg.len; headMeta.cnt++; }else{ // we`ve only had 2 vorbis headers, but force a continue anyway. headMeta.cnt = 3; LOG_CHANNEL("Vorbis: No Setup header, continue anyway..."); } if (headMeta.cnt == 3) { pack.init('HEAD',headMeta.data,headMeta.len); chanData.writePacket(pack); } } syncPos++; pack.init('SYNC',&syncPos,sizeof(syncPos)); chanData.writePacket(pack); pack.init('DATA',ogg.data,ogg.len); chanData.writePacket(pack); if (checkBump()) throw StreamException("Bumped"); if (checkIdle()) break; if (readDelay) sys->sleep(readDelay); }}// -----------------------------------void ChanMgr::startSearch(ChanInfo &info){ searchInfo = info; clearHitLists(); numFinds = 0; lastHit = 0; lastSearch = 0; searchActive = true;}// -----------------------------------void ChanMgr::lockHitList(GnuID &id, bool on){ ChanHitList *chl = findHitListByID(id); if (chl) chl->locked = on;}// -----------------------------------Channel *ChanMgr::findChannel(ChanInfo &info){ Channel *c=NULL; findChannels(info,&c,1); return c;}// -----------------------------------Channel *ChanMgr::findChannelByName(const char *n){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (stricmp(channels[i].info.name,n)==0) return &channels[i]; return NULL;}// -----------------------------------Channel *ChanMgr::findListenerChannel(){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].numListeners) return &channels[i]; return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByIndex(int index){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].index == index) return &channels[i]; return NULL;} // -----------------------------------Channel *ChanMgr::findChannelByMount(const char *str){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (strcmp(channels[i].mount,str)==0) return &channels[i]; return NULL;} // -----------------------------------Channel *ChanMgr::findChannelByID(GnuID &id){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].info.id.isSame(id)) return &channels[i]; return NULL;} // -----------------------------------Channel *ChanMgr::findPushChannel(int index){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if ((channels[i].pushIndex == index) && (!channels[i].pushSock)) return &channels[i]; return NULL;} // -----------------------------------int ChanMgr::findChannels(ChanInfo &info, Channel **ch, int max){ int cnt=0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].info.match(info)) { ch[cnt++] = &channels[i]; if (cnt >= max) break; } return cnt;}// -----------------------------------Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected){ Channel *c = chanMgr->createChannel(info,NULL); if (c) { c->stayConnected = stayConnected; c->startFind(); return c; } return NULL;}// -----------------------------------int ChanMgr::findAndRelay(ChanInfo &info, Channel **ch, int max){ int cnt=0; char idStr[64]; info.id.toStr(idStr); LOG_CHANNEL("Searching for: %s %s",info.name.cstr(),idStr); for(int i=0; i<180; i++) // search for 3 minutes. { ChanHitList *chl = findHitList(info); if (chl) { Channel *c; c = findChannelByID(chl->info.id); if (!c) { c = chanMgr->createChannel(chl->info,NULL); if (c) c->startGet(); } if (!c) break; ch[0] = c; cnt = 1; break; }else { if ((i%60) == 0) servMgr->findChannel(info); } sys->sleep(1000); } LOG_CHANNEL("Search results: %d",cnt); return cnt;}// -----------------------------------ChanMgr::ChanMgr(){ int i; for(i=0; i<MAX_CHANNELS; i++) channels[i].init(); for(i=0; i<MAX_HITLISTS; i++) { hitlists[i].index = i; hitlists[i].init(); } broadcastMsg.clear(); broadcastMsgInterval=10; broadcastID.generate(); deadHitAge = 300; icyMetaInterval = 8192; maxStreamsPerChannel = 0; searchInfo.init(); fullHitFrequency = 1; broadcastTTL = 7; pushTimeout = 60; // 1 minute pushTries = 5; // 5 times maxPushHops = 8; // max 8 hops away}// -----------------------------------void ChanMgr::broadcastRelays(int ttl, Servent *serv){ //if ((servMgr->getFirewall() == ServMgr::FW_OFF) || servMgr->serverHost.localIP()) { Channel *hits[16]; int cnt=0; ChanInfo info; info.init(); info.status = ChanInfo::S_PLAY; cnt = findChannels(info,hits,16); GnuPacket out; if (cnt) { Host sh = servMgr->serverHost; bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF); bool busy = (servMgr->inFull() && servMgr->outFull()) || servMgr->streamFull(); bool stable = servMgr->totalStreams>0; if (out.initHit(sh,cnt,hits,NULL,0x0114,push,busy,stable,ttl)) { int numOut=0; if (serv) { if (serv->outputPacket(out,true)) numOut=1; }else numOut = servMgr->broadcast(out,NULL); LOG_CHANNEL("Relay Broadcast %d channels to %d servents",cnt,numOut); }else{ LOG_ERROR("Relay Broadcast failed"); } } }}// -----------------------------------void ChanMgr::setBroadcastMsg(String &msg){ //if (!msg.isSame(broadcastMsg)) { broadcastMsg = msg; for(int i=0; i<MAX_CHANNELS; i++) { Channel *c = &channels[i]; if (c->isActive()) if (c->status == Channel::S_BROADCASTING) { c->info.comment = broadcastMsg; c->updateMeta(); } } }}// -----------------------------------void ChanMgr::clearHitLists(){ for(int i=0; i<MAX_HITLISTS; i++) if (!hitlists[i].locked) { peercastApp->delChannel(&hitlists[i].info); hitlists[i].init(); }}// -----------------------------------Channel *ChanMgr::createChannel(ChanInfo &info, const char *mount){ lock.on(); for(int i=0; i<MAX_CHANNELS; i++) { Channel *c = &channels[i]; if (!c->isActive()) { c->info = info; c->info.lastPlay = 0; c->info.status = ChanInfo::S_UNKNOWN; if (mount) c->mount.set(mount); c->index = i+1; c->setStatus(Channel::S_WAIT); c->type = Channel::T_ALLOCATED; lock.off(); return c; } } lock.off(); return NULL;}// -----------------------------------ChanHitList *ChanMgr::findHitList(ChanInfo &info){ for(int i=0; i<MAX_HITLISTS; i++) if (hitlists[i].isUsed()) if (hitlists[i].info.match(info)) return &hitlists[i]; return NULL;}// -----------------------------------ChanHitList *ChanMgr::findHitListByID(GnuID &id){ for(int i=0; i<MAX_HITLISTS; i++) if (hitlists[i].isUsed()) if (hitlists[i].info.id.isSame(id)) return &hitlists[i]; return NULL;}// -----------------------------------int ChanMgr::numHitLists(){ int num=0; for(int i=0; i<MAX_HITLISTS; i++) if (hitlists[i].isUsed()) num++; return num;}// -----------------------------------ChanHitList *ChanMgr::addHitList(ChanInfo &info){ ChanHitList *hl = NULL; for(int i=0; i<MAX_HITLISTS; i++) if (!hitlists[i].isUsed()) { hl = &hitlists[i]; break; } if (hl) { hl->info = info; peercastApp->addChannel(&hl->info); } return hl;}// -----------------------------------void ChanMgr::clearDeadHits(){ for(int i=0; i<MAX_HITLISTS; i++) if (hitlists[i].isUsed()) if (hitlists[i].clearDeadHits(deadHitAge) == 0) { peercastApp->delChannel(&hitlists[i].info); hitlists[i].init(); }}// -----------------------------------int ChanMgr::numConnected(){ int tot = 0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) tot++; return tot;}// -----------------------------------int ChanMgr::numRelayed(){ int tot = 0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].isPlaying()) if (channels[i].numRelays>0) tot++; return tot;}// -----------------------------------int ChanMgr::numListeners(){ int tot = 0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].isPlaying()) if (channels[i].numListeners>0) tot++;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -