📄 channel.cpp.svn-base
字号:
// -----------------------------------void Channel::readMMS(){ ChanPacket pack; while ((!input->eof() && (thread.active))) { ASFChunk chunk; chunk.read(*input); switch (chunk.type) { case 0x4824: // asf header { MemoryStream mem(headMeta.data,sizeof(headMeta.data)); chunk.write(mem); headMeta.len = mem.pos; MemoryStream asfm(chunk.data,chunk.dataLen); ASFObject asfHead; asfHead.readHead(asfm); ASFStream asf = parseASFHeader(asfm); LOG_DEBUG("ASF file prop: pnum=%d, psize=%d, br=%d",asf.numPackets,asf.packetSize,asf.bitrate); info.bitrate = asf.bitrate/1000; break; } case 0x4424: // asf data { syncPos++; pack.init('SYNC',&syncPos,sizeof(syncPos)); chanData.writePacket(pack); MemoryStream mem(pack.data,sizeof(pack.data)); chunk.write(mem); pack.len = mem.pos; pack.type = 'DATA'; chanData.writePacket(pack); break; } default: throw StreamException("Unknown ASF chunk"); } if (checkBump()) throw StreamException("Bumped"); if (checkIdle()) break; }}// -----------------------------------void ChanMgr::startSearch(ChanInfo &info){ searchInfo = info; clearHitLists(); numFinds = 0; lastHit = 0;// lastSearch = 0; searchActive = true;}// -----------------------------------void ChanMgr::lockHitList(GnuID &id, bool on){ ChanHitList *chl = findHitListByID(id); if (chl) chl->locked = on;}// -----------------------------------Channel *ChanMgr::findChannel(ChanInfo &info){ Channel *c=NULL; findChannels(info,&c,1); return c;}// -----------------------------------Channel *ChanMgr::findChannelByName(const char *n){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (stricmp(channels[i].info.name,n)==0) return &channels[i]; return NULL;}// -----------------------------------Channel *ChanMgr::findListenerChannel(){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].numListeners) return &channels[i]; return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByIndex(int index){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].index == index) return &channels[i]; return NULL;} // -----------------------------------Channel *ChanMgr::findChannelByMount(const char *str){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (strcmp(channels[i].mount,str)==0) return &channels[i]; return NULL;} // -----------------------------------Channel *ChanMgr::findChannelByID(GnuID &id){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].info.id.isSame(id)) return &channels[i]; return NULL;} // -----------------------------------Channel *ChanMgr::findPushChannel(int index){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if ((channels[i].pushIndex == index) && (!channels[i].pushSock)) return &channels[i]; return NULL;} // -----------------------------------int ChanMgr::findChannels(ChanInfo &info, Channel **ch, int max){ int cnt=0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].info.match(info)) { ch[cnt++] = &channels[i]; if (cnt >= max) break; } return cnt;}// -----------------------------------int ChanMgr::findChannelsByStatus(Channel **ch, int max, Channel::STATUS status){ int cnt=0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].status == status) { ch[cnt++] = &channels[i]; if (cnt >= max) break; } return cnt;}// -----------------------------------Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected){ Channel *c = chanMgr->createChannel(info,NULL); if (c) { c->stayConnected = stayConnected; c->startFind(); return c; } return NULL;}// -----------------------------------int ChanMgr::findAndRelay(ChanInfo &info, Channel **ch, int max){ int cnt=0; char idStr[64]; info.id.toStr(idStr); LOG_CHANNEL("Searching for: %s %s",info.name.cstr(),idStr); for(int i=0; i<180; i++) // search for 3 minutes. { ChanHitList *chl = findHitList(info); if (chl) { Channel *c; c = findChannelByID(chl->info.id); if (!c) { c = chanMgr->createChannel(chl->info,NULL); if (c) c->startGet(); } if (!c) break; ch[0] = c; cnt = 1; break; }else { if ((i%60) == 0) servMgr->findChannel(info); } sys->sleep(1000); } LOG_CHANNEL("Search results: %d",cnt); return cnt;}// -----------------------------------ChanMgr::ChanMgr(){ int i; for(i=0; i<MAX_CHANNELS; i++) channels[i].init(); for(i=0; i<MAX_HITLISTS; i++) { hitlists[i].index = i; hitlists[i].init(); } broadcastMsg.clear(); broadcastMsgInterval=10; broadcastID.generate(); deadHitAge = 600; icyMetaInterval = 8192; maxStreamsPerChannel = 0; searchInfo.init(); minBroadcastTTL = 1; maxBroadcastTTL = 7; pushTimeout = 60; // 1 minute pushTries = 5; // 5 times maxPushHops = 8; // max 8 hops away maxUptime = 0; // 0 = none autoQuery = 0; lastQuery = 0;}// -----------------------------------void ChanMgr::broadcastRelays(Servent *serv, int minTTL, int maxTTL){ if (!servMgr->relayBroadcast) return; //if ((servMgr->getFirewall() == ServMgr::FW_OFF) || servMgr->serverHost.localIP()) { Host sh = servMgr->serverHost; bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF); bool busy = (servMgr->inFull() && servMgr->outFull()) || servMgr->streamFull(); bool stable = servMgr->totalStreams>0; GnuPacket hit; int numChans=0; for(int i=0; i<MAX_CHANNELS; i++) { Channel *c = &channels[i]; if (c->isPlaying()) { int ttl = (c->info.getUptime() / servMgr->relayBroadcast); // 1 hop per N seconds if (ttl < minTTL) ttl = minTTL; if (ttl > maxTTL) ttl = maxTTL; if (hit.initHit(sh,1,&c,NULL,push,busy,stable,ttl)) { int numOut=0; numChans++; if (serv) { serv->outputPacket(hit,false); numOut++; } else numOut+=servMgr->broadcast(hit,NULL); LOG_NETWORK("Sent ch.%d to %d servents, TTL %d",c->index,numOut,ttl); } } } //if (numChans) // LOG_NETWORK("Sent %d channels to %d servents",numChans,numOut); }}// -----------------------------------void ChanMgr::setBroadcastMsg(String &msg){ //if (!msg.isSame(broadcastMsg)) { broadcastMsg = msg; for(int i=0; i<MAX_CHANNELS; i++) { Channel *c = &channels[i]; if (c->isActive()) if (c->status == Channel::S_BROADCASTING) { c->info.comment = broadcastMsg; c->updateMeta(); } } }}// -----------------------------------void ChanMgr::clearHitLists(){ for(int i=0; i<MAX_HITLISTS; i++) if (!hitlists[i].locked) { peercastApp->delChannel(&hitlists[i].info); hitlists[i].init(); }}// -----------------------------------Channel *ChanMgr::createChannel(ChanInfo &info, const char *mount){ lock.on(); for(int i=0; i<MAX_CHANNELS; i++) { Channel *c = &channels[i]; if (!c->isActive()) { c->info = info; c->info.lastPlay = 0; c->info.status = ChanInfo::S_UNKNOWN; if (mount) c->mount.set(mount); c->index = i+1; c->setStatus(Channel::S_WAIT); c->type = Channel::T_ALLOCATED; lock.off(); return c; } } lock.off(); return NULL;}// -----------------------------------ChanHitList *ChanMgr::findHitList(ChanInfo &info){ for(int i=0; i<MAX_HITLISTS; i++) if (hitlists[i].isUsed()) if (hitlists[i].info.match(info)) return &hitlists[i]; return NULL;}// -----------------------------------ChanHitList *ChanMgr::findHitListByID(GnuID &id){ for(int i=0; i<MAX_HITLISTS; i++) if (hitlists[i].isUsed()) if (hitlists[i].info.id.isSame(id)) return &hitlists[i]; return NULL;}// -----------------------------------int ChanMgr::numHitLists(){ int num=0; for(int i=0; i<MAX_HITLISTS; i++) if (hitlists[i].isUsed()) num++; return num;}// -----------------------------------ChanHitList *ChanMgr::addHitList(ChanInfo &info){ ChanHitList *hl = NULL; for(int i=0; i<MAX_HITLISTS; i++) if (!hitlists[i].isUsed()) { hl = &hitlists[i]; break; } if (hl) { hl->info = info; peercastApp->addChannel(&hl->info); } return hl;}// -----------------------------------void ChanMgr::clearDeadHits(){ for(int i=0; i<MAX_HITLISTS; i++) if (hitlists[i].isUsed()) if (hitlists[i].clearDeadHits(deadHitAge) == 0) { peercastApp->delChannel(&hitlists[i].info); hitlists[i].init(); }}// -----------------------------------int ChanMgr::numConnected(){ int tot = 0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) tot++; return tot;}// -----------------------------------int ChanMgr::numRelayed(){ int tot = 0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].isPlaying()) if (channels[i].numRelays>0) tot++; return tot;}// -----------------------------------int ChanMgr::numListeners(){ int tot = 0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].isPlaying()) if (channels[i].numListeners>0) tot++; return tot;}// -----------------------------------int ChanMgr::numIdle(){ int tot = 0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].status == Channel::S_IDLE) tot++; return tot;}// -----------------------------------unsigned int ChanMgr::totalInput(){ unsigned int tot = 0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].isPlaying()) if (channels[i].input) tot+=channels[i].input->bytesInPerSec; return tot;}// -----------------------------------ChanHit *ChanMgr::addHit(ChanInfo &info, ChanHit &h){ if (searchActive) lastHit = sys->getTime(); // if channel is waiting for update then give it Channel *ch = findChannelByID(info.id); if (ch) { if (!ch->isBroadcasting()) ch->info.update(info); } // otherwise add to list of channels ChanHitList *hl = findHitListByID(info.id); if (!hl) hl = addHitList(info); if (hl) { hl->info.update(info); return hl->addHit(h); }else return NULL;}// -----------------------------------void ChanMgr::playChannels(Channel **cl, int num){ if (num) { Channel *c1 = cl[0]; char str[128],fname[128],idStr[128]; sprintf(str,"http://localhost:%d",servMgr->serverHost.port); c1->info.id.toStr(idStr); PlayList::TYPE type; if ((c1->info.contentType == ChanInfo::T_WMA) || (c1->info.contentType == ChanInfo::T_WMV)) { type = PlayList::T_ASX; // WMP seems to have a bug where it doesn`t re-read asx files if they have the same name // so we prepend the channel id to make it unique - NOTE: should be deleted afterwards. sprintf(fname,"%s.asx",idStr); }else { type = PlayList::T_SCPLS; sprintf(fname,"play.pls"); } PlayList *pls = new PlayList(type,num); pls->addChannels(str,cl,num); FileStream file; file.openWriteReplace(fname); pls->write(file); file.close(); sys->executeFile(fname); delete pls; }}// -----------------------------------class ChanFindInfo : public ThreadInfo{public: GnuID id; bool keep;};// -----------------------------------THREAD_PROC playChannelProc(ThreadInfo *th){ ChanFindInfo *cfi = (ChanFindInfo *)th; ChanInfo info; info.id = cfi->id; peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Connecting to channel, please wait..."); Channel *ch; int num = chanMgr->findChannels(info,&ch,1); if (!num) num = chanMgr->findAndRelay(info,&ch,1); if (num) { ch->prefetchCnt = 10; // prefetch for 10 packets before giving up and closing channel chanMgr->playChannels(&ch,1); if (cfi->keep) ch->stayConnected = cfi->keep; } delete cfi; return 0;}// -----------------------------------void ChanMgr::playChannel(GnuID &id, bool keep){ ChanFindInfo *cfi = new ChanFindInfo; cfi->id = id; cfi->keep = keep; cfi->func = playChannelProc; sys->startThread(cfi);}// -----------------------------------void ChanHitList::init(){ info.init(); memset(hits,0,sizeof(ChanHit)*MAX_HITS); lastHitTime = 0; locked = false;}// -----------------------------------bool ChanHitList::isAvailable() { return (numHits()-numBusy())>0;}// -----------------------------------ChanHit *ChanHitList::addHit(ChanHit &h){ int i; lastHitTime = sys->getTime(); h.time = lastHitTime; for(i=0; i<MAX_HITS; i++) if (hits[i].host.ip == h.host.ip) if (hits[i].host.port == h.host.port) { hits[i] = h; return &hits[i]; } for(i=0; i<MAX_HITS; i++) if (hits[i].host.ip == 0) { hits[i] = h; return &hits[i]; } return NULL;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -