📄 channel.cpp
字号:
firstPos = 0;
lastPos = 0;
safePos = 0;
readPos = 0;
for(unsigned int i=buf.firstPos; i<=buf.lastPos; i++)
{
ChanPacket *src = &buf.packets[i%MAX_PACKETS];
if (src->type & accept)
{
if (src->pos >= reqPos)
{
lastPos = writePos;
packets[writePos++] = *src;
}
}
}
buf.lock.off();
lock.off();
return lastPos-firstPos;
}
// -----------------------------------
unsigned int ChanPacketBuffer::findPacket(unsigned int spos)
{
for(unsigned int i=firstPos; i<=lastPos; i++)
{
ChanPacket &pack = packets[i%MAX_PACKETS];
if (pack.pos >= spos)
return i;
}
return 0;
}
// -----------------------------------
unsigned int ChanPacketBuffer::findOldestPos(unsigned int spos)
{
unsigned int min = getStreamPos(safePos);
unsigned int max = getStreamPos(lastPos);
if (min > spos)
return min;
if (max < spos)
return max;
return spos;
}
// -----------------------------------
unsigned int ChanPacketBuffer::getStreamPos(unsigned int index)
{
return packets[index%MAX_PACKETS].pos;
}
// -----------------------------------bool ChanPacketBuffer::writePacket(ChanPacket &pack, bool updateReadPos){
if (pack.len)
{
if (willSkip()) // too far behind
return false;
lock.on(); packets[writePos%MAX_PACKETS] = pack; lastPos = writePos; writePos++;
if (writePos >= MAX_PACKETS)
firstPos = writePos-MAX_PACKETS;
else
firstPos = 0;
if (writePos >= NUM_SAFEPACKETS)
safePos = writePos - NUM_SAFEPACKETS;
else
safePos = 0;
if (updateReadPos)
readPos = writePos;
lock.off();
return true;
}
return false;}// -----------------------------------
void ChanPacketBuffer::readPacket(ChanPacket &pack)
{
unsigned int tim = sys->getTime();
if (readPos < firstPos)
throw StreamException("Read too far behind");
while (readPos >= writePos)
{
sys->sleepIdle();
if ((sys->getTime() - tim) > 30)
throw TimeoutException();
}
lock.on();
pack = packets[readPos%MAX_PACKETS];
readPos++;
sys->sleepIdle();
lock.off();
}
// -----------------------------------
unsigned int ChanPacketBuffer::peekPacket(unsigned int pos, ChanPacket &pack)
{
unsigned int tim = sys->getTime();
if (pos < safePos)
pos = safePos;
while (pos >= writePos)
{
sys->sleepIdle();
if ((sys->getTime() - tim) > 30)
throw TimeoutException();
}
lock.on();
pack = packets[pos%MAX_PACKETS];
pos++;
sys->sleepIdle();
lock.off();
return pos;
}
// -----------------------------------
bool ChanPacketBuffer::pollRead()
{
return pollRead(readPos);
}
// -----------------------------------
bool ChanPacketBuffer::pollRead(unsigned int pos)
{
return pos < writePos;
}
// -----------------------------------
bool ChanPacketBuffer::willSkip()
{
return ((writePos-readPos) >= MAX_PACKETS);
}
// -----------------------------------void Channel::getStreamPath(char *str){ char idStr[64]; getIDStr(idStr); sprintf(str,"/stream/%s%s",idStr,info.getTypeExt(info.contentType));}// -----------------------------------void ChanMgr::startSearch(ChanInfo &info){ searchInfo = info; clearHitLists(); numFinds = 0; lastHit = 0;// lastSearch = 0; searchActive = true;}// -----------------------------------void ChanMgr::lockHitList(ChanInfo &info, bool on){ ChanHitList *chl = findHitList(info); if (chl) chl->locked = on;}// -----------------------------------
void ChanMgr::quit()
{
closeAll();
}
// -----------------------------------
void ChanMgr::closeAll()
{
for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].isActive())
channels[i].close();
}// -----------------------------------Channel *ChanMgr::findChannelByNameID(ChanInfo &info){ for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].isActive())
if (channels[i].info.matchNameID(info))
return &channels[i];
return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByName(const char *n){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (stricmp(channels[i].info.name,n)==0) return &channels[i]; return NULL;}// -----------------------------------Channel *ChanMgr::findListenerChannel(){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].numListeners) return &channels[i]; return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByIndex(int index){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].index == index) return &channels[i]; return NULL;} // -----------------------------------Channel *ChanMgr::findChannelByMount(const char *str){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (strcmp(channels[i].mount,str)==0) return &channels[i]; return NULL;} // -----------------------------------Channel *ChanMgr::findChannelByID(GnuID &id){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].info.id.isSame(id)) return &channels[i]; return NULL;} // -----------------------------------int ChanMgr::findChannels(ChanInfo &info, Channel **ch, int max){ int cnt=0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].info.match(info)) { ch[cnt++] = &channels[i]; if (cnt >= max) break; } return cnt;}// -----------------------------------int ChanMgr::findChannelsByStatus(Channel **ch, int max, Channel::STATUS status){ int cnt=0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].status == status) { ch[cnt++] = &channels[i]; if (cnt >= max) break; } return cnt;}// -----------------------------------Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected){ Channel *c = chanMgr->createChannel(info,NULL); if (c) { c->stayConnected = stayConnected;
c->startGet(); return c; } return NULL;}// -----------------------------------Channel *ChanMgr::findAndRelay(ChanInfo &info){ char idStr[64]; info.id.toStr(idStr); LOG_CHANNEL("Searching for: %s (%s)",idStr,info.name.cstr());
Channel *c = NULL; for(int i=0; i<180; i++) // search for 3 minutes. { c = findChannelByNameID(info); if (!c) { c = chanMgr->createChannel(info,NULL); if (c) c->startGet(); } if (!c) break;
if (c->isPlaying() && (c->info.contentType!=ChanInfo::T_UNKNOWN))
break;
sys->sleep(1000); } return c;}// -----------------------------------ChanMgr::ChanMgr(){ int i; for(i=0; i<MAX_CHANNELS; i++) channels[i].init(); for(i=0; i<MAX_HITLISTS; i++) { hitlists[i].index = i; hitlists[i].init(); } broadcastMsg.clear(); broadcastMsgInterval=10; broadcastID.generate(); deadHitAge = 600; icyMetaInterval = 8192; maxStreamsPerChannel = 0; searchInfo.init(); minBroadcastTTL = 1; maxBroadcastTTL = 7; pushTimeout = 60; // 1 minute pushTries = 5; // 5 times maxPushHops = 8; // max 8 hops away maxUptime = 0; // 0 = none
prefetchTime = 10; // n seconds
hostUpdateInterval = 180; // 2 minutes
bufferTime = 5;
trackerHitList.init();
autoQuery = 0; lastQuery = 0;
}// -----------------------------------bool ChanMgr::writeVariable(Stream &out, const String &var){ char buf[128]; if (var == "numHitLists") sprintf(buf,"%d",numHitLists()); else if (var == "numRelayed") sprintf(buf,"%d",numRelayed()); else if (var == "numConnected") sprintf(buf,"%d",numConnected()); else if (var == "totalListeners") sprintf(buf,"%d",numListeners()); else if (var == "totalInPerSec") sprintf(buf,"%.1f",BYTES_TO_KBPS(totalInput())); else if (var == "totalOutPerSec") sprintf(buf,"%.1f",BYTES_TO_KBPS(servMgr->totalOutput(true))); else if (var == "totalPerSec") sprintf(buf,"%.1f",BYTES_TO_KBPS(totalInput()+servMgr->totalOutput(true))); else return false; out.writeString(buf); return true;}
// -----------------------------------
void ChanMgr::broadcastTrackerUpdate()
{
for(int i=0; i<MAX_CHANNELS; i++)
{
Channel *c = &channels[i];
if ( c->isActive() && c->isBroadcasting() )
c->broadcastTrackerUpdate();
}
}
// -----------------------------------
int ChanMgr::broadcastPacketUp(ChanPacket &pack,GnuID &chanID, GnuID &srcID)
{
int cnt=0;
for(int i=0; i<MAX_CHANNELS; i++)
{
Channel *c = &channels[i];
if ( c->isActive()
&& (!chanID.isSet() || c->info.id.isSame(chanID))
&& c->sourceStream
&& (!c->remoteID.isSame(srcID))
)
{
if (c->sourceStream->sendPacket(pack))
cnt++;
}
}
return cnt;
}
// -----------------------------------void ChanMgr::broadcastRelays(Servent *serv, int minTTL, int maxTTL){ if (!servMgr->relayBroadcast) return; //if ((servMgr->getFirewall() == ServMgr::FW_OFF) || servMgr->serverHost.localIP()) { Host sh = servMgr->serverHost; bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF); bool busy = (servMgr->pubInFull() && servMgr->outFull()) || servMgr->streamFull(); bool stable = servMgr->totalStreams>0;
GnuPacket hit; int numChans=0; for(int i=0; i<MAX_CHANNELS; i++) { Channel *c = &channels[i]; if (c->isPlaying()) {
bool tracker = c->isBroadcasting();
int ttl = (c->info.getUptime() / servMgr->relayBroadcast); // 1 hop per N seconds if (ttl < minTTL) ttl = minTTL; if (ttl > maxTTL) ttl = maxTTL; if (hit.initHit(sh,c,NULL,push,busy,stable,tracker,ttl)) { int numOut=0; numChans++; if (serv) { serv->outputPacket(hit,false); numOut++; } else numOut+=servMgr->broadcast(hit,NULL); LOG_NETWORK("Sent ch.%d to %d servents, TTL %d",c->index,numOut,ttl); } } } //if (numChans) // LOG_NETWORK("Sent %d channels to %d servents",numChans,numOut); }}// -----------------------------------
void ChanMgr::setUpdateInterval(unsigned int v)
{
hostUpdateInterval = v;
}
// -----------------------------------
// message check
#if 0
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack.data));
AtomStream atom(mem);
atom.writeParent(PCP_BCST,3);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ALL);
atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
atom.writeParent(PCP_MESG,1);
atom.writeString(PCP_MESG_DATA,msg.cstr());
mem.len = mem.pos;
mem.rewind();
pack.len = mem.len;
GnuID noID;
noID.clear();
BroadcastState bcs;
PCPStream::readAtom(atom,bcs);
//int cnt = servMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
//int cnt = servMgr->broadcastPacketDown(pack,noID,servMgr->sessionID);
//int cnt = chanMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
//LOG_DEBUG("Sent message to %d clients",cnt);
#endif
// -----------------------------------void ChanMgr::setBroadcastMsg(String &msg){
if (!msg.isSame(broadcastMsg)) { broadcastMsg = msg; for(int i=0; i<MAX_CHANNELS; i++) { Channel *c = &channels[i]; if (c->isActive())
{
GnuID id = servMgr->sessionID; if (c->status == Channel::S_BROADCASTING) { c->info.comment = broadcastMsg; c->updateMeta();
id = c->info.id;
}
} } }}
// -----------------------------------
ChanHit ChanMgr::getAnyTracker(Host *sh,bool useFirewalled)
{
ChanHit bestHit;
bestHit.init();
for(int i=0; i<MAX_HITLISTS; i++)
{
ChanHitList *chl = &hitlists[i];
if (chl->isUsed())
{
bestHit = chl->getHit(sh,0,useFirewalled,true);
if (bestHit.host.ip)
break;
}
}
return bestHit;
}
// -----------------------------------void ChanMgr::clearHitLists(){ for(int i=0; i<MAX_HITLISTS; i++) if (!hitlists[i].locked) { peercastApp->delChannel(&hitlists[i].info); hitlists[i].init(); }}// -----------------------------------Channel *ChanMgr::createChannel(ChanInfo &info, const char *mount){ lock.on(); for(int i=0; i<MAX_CHANNELS; i++) { Channel *c = &channels[i]; if (!c->isPlaying())
{
c->close();
for(int j=0; j<30; j++)
if (!c->isActive())
break;
if (c->isActive())
break;
c->info = info; c->info.lastPlayTime = 0; c->info.status = ChanInfo::S_UNKNOWN; if (mount) c->mount.set(mount); c->index = i+1; c->setStatus(Channel::S_WAIT); c->type = Channel::T_ALLOCATED; lock.off();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -