📄 channel.cpp
字号:
default: type = T_UNKNOWN;
}
len = in.readShort(); in.readShort(); if (len > MAX_DATALEN) throw StreamException("Bad ChanPacket"); in.read(data,len);}// -----------------------------------
int ChanPacketBuffer::copyFrom(ChanPacketBuffer &buf, unsigned int reqPos)
{
lock.on();
buf.lock.on();
firstPos = 0;
lastPos = 0;
safePos = 0;
readPos = 0;
for(unsigned int i=buf.firstPos; i<=buf.lastPos; i++)
{
ChanPacket *src = &buf.packets[i%MAX_PACKETS];
if (src->type & accept)
{
if (src->pos >= reqPos)
{
lastPos = writePos;
packets[writePos++] = *src;
}
}
}
buf.lock.off();
lock.off();
return lastPos-firstPos;
}
// -----------------------------------
bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
{
if (writePos == 0)
return false;
unsigned int fpos = getStreamPos(firstPos);
if (spos < fpos)
spos = fpos;
for(unsigned int i=firstPos; i<=lastPos; i++)
{
ChanPacket &p = packets[i%MAX_PACKETS];
if (p.pos >= spos)
{
pack = p;
return true;
}
}
return false;
}
// -----------------------------------
unsigned int ChanPacketBuffer::getLatestPos()
{
if (!writePos)
return 0;
else
return getStreamPos(lastPos);
}
// -----------------------------------
unsigned int ChanPacketBuffer::findOldestPos(unsigned int spos)
{
unsigned int min = getStreamPos(safePos);
unsigned int max = getStreamPos(lastPos);
if (min > spos)
return min;
if (max < spos)
return max;
return spos;
}
// -----------------------------------
unsigned int ChanPacketBuffer::getStreamPos(unsigned int index)
{
return packets[index%MAX_PACKETS].pos;
}
// -----------------------------------
unsigned int ChanPacketBuffer::getStreamPosEnd(unsigned int index)
{
return packets[index%MAX_PACKETS].pos+packets[index%MAX_PACKETS].len;
}
// -----------------------------------bool ChanPacketBuffer::writePacket(ChanPacket &pack, bool updateReadPos){
if (pack.len)
{
if (willSkip()) // too far behind
return false;
lock.on();
packets[writePos%MAX_PACKETS] = pack; lastPos = writePos; writePos++;
if (writePos >= MAX_PACKETS)
firstPos = writePos-MAX_PACKETS;
else
firstPos = 0;
if (writePos >= NUM_SAFEPACKETS)
safePos = writePos - NUM_SAFEPACKETS;
else
safePos = 0;
if (updateReadPos)
readPos = writePos;
lastWriteTime = sys->getTime();
lock.off();
return true;
}
return false;}// -----------------------------------
void ChanPacketBuffer::readPacket(ChanPacket &pack)
{
unsigned int tim = sys->getTime();
if (readPos < firstPos)
throw StreamException("Read too far behind");
while (readPos >= writePos)
{
sys->sleepIdle();
if ((sys->getTime() - tim) > 30)
throw TimeoutException();
}
lock.on();
pack = packets[readPos%MAX_PACKETS];
readPos++;
sys->sleepIdle();
lock.off();
}
// -----------------------------------
bool ChanPacketBuffer::willSkip()
{
return ((writePos-readPos) >= MAX_PACKETS);
}
// -----------------------------------void Channel::getStreamPath(char *str){ char idStr[64]; getIDStr(idStr); sprintf(str,"/stream/%s%s",idStr,info.getTypeExt(info.contentType));}// -----------------------------------void ChanMgr::startSearch(ChanInfo &info){ searchInfo = info; clearHitLists(); numFinds = 0; lastHit = 0;// lastSearch = 0; searchActive = true;}// -----------------------------------
void ChanMgr::quit()
{
closeAll();
}
// -----------------------------------
void ChanMgr::closeAll()
{
for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].isActive())
channels[i].close();
}// -----------------------------------Channel *ChanMgr::findChannelByNameID(ChanInfo &info){ for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].isActive())
if (channels[i].info.matchNameID(info))
return &channels[i];
return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByName(const char *n){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (stricmp(channels[i].info.name,n)==0) return &channels[i]; return NULL;}// -----------------------------------Channel *ChanMgr::findListenerChannel(){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].numListeners()) return &channels[i]; return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByIndex(int index){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].index == index) return &channels[i]; return NULL;} // -----------------------------------Channel *ChanMgr::findChannelByMount(const char *str){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (strcmp(channels[i].mount,str)==0) return &channels[i]; return NULL;} // -----------------------------------Channel *ChanMgr::findChannelByID(GnuID &id){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].info.id.isSame(id)) return &channels[i]; return NULL;} // -----------------------------------int ChanMgr::findChannels(ChanInfo &info, Channel **ch, int max){ int cnt=0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].info.match(info)) { ch[cnt++] = &channels[i]; if (cnt >= max) break; } return cnt;}// -----------------------------------int ChanMgr::findChannelsByStatus(Channel **ch, int max, Channel::STATUS status){ int cnt=0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].status == status) { ch[cnt++] = &channels[i]; if (cnt >= max) break; } return cnt;}// -----------------------------------Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected){ Channel *c = chanMgr->createChannel(info,NULL); if (c) { c->stayConnected = stayConnected;
c->startGet(); return c; } return NULL;}// -----------------------------------Channel *ChanMgr::findAndRelay(ChanInfo &info){ char idStr[64]; info.id.toStr(idStr); LOG_CHANNEL("Searching for: %s (%s)",idStr,info.name.cstr()); peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Finding channel...");
Channel *c = NULL; c = findChannelByNameID(info); if (!c) { c = chanMgr->createChannel(info,NULL); if (c)
{ c->setStatus(Channel::S_SEARCHING);
c->startGet();
} }
for(int i=0; i<600; i++) // search for 1 minute.
{
c = findChannelByNameID(info);
if (!c)
{ peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Channel not found");
return NULL;
}
if (c->isPlaying() && (c->info.contentType!=ChanInfo::T_UNKNOWN))
break;
sys->sleep(100); } return c;}// -----------------------------------ChanMgr::ChanMgr(){ int i; for(i=0; i<MAX_CHANNELS; i++) channels[i].init(); for(i=0; i<MAX_HITLISTS; i++) { hitlists[i].index = i; hitlists[i].init(); } broadcastMsg.clear(); broadcastMsgInterval=10; broadcastID.generate(); deadHitAge = 600;
icyIndex = 0; icyMetaInterval = 8192; maxStreamsPerChannel = 0; searchInfo.init(); minBroadcastTTL = 1; maxBroadcastTTL = 7; pushTimeout = 60; // 1 minute pushTries = 5; // 5 times maxPushHops = 8; // max 8 hops away maxUptime = 0; // 0 = none
prefetchTime = 10; // n seconds
hostUpdateInterval = 180; // 2 minutes
bufferTime = 5;
trackerHitList.init();
autoQuery = 0; lastQuery = 0;
lastYPConnect = 0;
}
// -----------------------------------
ChanHit ChanMgr::getTracker(Host *sh,unsigned int wait,bool useFirewalled)
{
ChanHit hit;
hit.init();
if (!hit.host.ip)
hit = trackerHitList.getHit(sh,wait,useFirewalled,true);
if (!hit.host.ip)
for(int i=0; i<MAX_HITLISTS; i++)
if (hitlists[i].isUsed())
{
hit = hitlists[i].getHit(sh,wait,useFirewalled,true);
if (hit.host.ip)
break;
}
return hit;
}
// -----------------------------------
ChanHit ChanMgr::findHit(GnuID &sid,bool tracker)
{
ChanHit hit;
hit.init();
for(int i=0; i<MAX_HITLISTS; i++)
if (hitlists[i].isUsed())
{
hit = hitlists[i].findHit(sid,tracker);
if (hit.host.ip)
break;
}
return hit;
}
// -----------------------------------bool ChanMgr::writeVariable(Stream &out, const String &var){ char buf[128]; if (var == "numHitLists") sprintf(buf,"%d",numHitLists()); else if (var == "numRelayed") sprintf(buf,"%d",numRelayed()); else if (var == "numConnected") sprintf(buf,"%d",numConnected()); else if (var == "totalListeners") sprintf(buf,"%d",numListeners()); else if (var == "totalInPerSec") sprintf(buf,"%.1f",BYTES_TO_KBPS(totalInput())); else if (var == "totalOutPerSec") sprintf(buf,"%.1f",BYTES_TO_KBPS(servMgr->totalOutput(true))); else if (var == "totalPerSec") sprintf(buf,"%.1f",BYTES_TO_KBPS(totalInput()+servMgr->totalOutput(true))); else return false; out.writeString(buf); return true;}
// -----------------------------------
void ChanMgr::broadcastTrackerUpdate(GnuID &svID, bool force)
{
for(int i=0; i<MAX_CHANNELS; i++)
{
Channel *c = &channels[i];
if ( c->isActive() && c->isBroadcasting() )
c->broadcastTrackerUpdate(svID,force);
}
}
// -----------------------------------
int ChanMgr::broadcastPacketUp(ChanPacket &pack,GnuID &chanID, GnuID &srcID, GnuID &destID)
{
int cnt=0;
if (destID.isSet())
{
for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].sendPacketUp(pack,chanID,srcID,destID))
return 1;
}
GnuID noID;
noID.clear();
for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].sendPacketUp(pack,chanID,srcID,noID))
cnt++;
return cnt;
}
// -----------------------------------void ChanMgr::broadcastRelays(Servent *serv, int minTTL, int maxTTL){ if (!servMgr->relayBroadcast) return; //if ((servMgr->getFirewall() == ServMgr::FW_OFF) || servMgr->serverHost.localIP()) { Host sh = servMgr->serverHost; bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF); bool busy = (servMgr->pubInFull() && servMgr->outFull()) || servMgr->streamFull(); bool stable = servMgr->totalStreams>0;
GnuPacket hit; int numChans=0; for(int i=0; i<MAX_CHANNELS; i++) { Channel *c = &channels[i]; if (c->isPlaying()) {
bool tracker = c->isBroadcasting();
int ttl = (c->info.getUptime() / servMgr->relayBroadcast); // 1 hop per N seconds if (ttl < minTTL) ttl = minTTL; if (ttl > maxTTL) ttl = maxTTL; if (hit.initHit(sh,c,NULL,push,busy,stable,tracker,ttl)) { int numOut=0; numChans++; if (serv) { serv->outputPacket(hit,false); numOut++; } else numOut+=servMgr->broadcast(hit,NULL); LOG_NETWORK("Sent ch.%d to %d servents, TTL %d",c->index,numOut,ttl); } } } //if (numChans) // LOG_NETWORK("Sent %d channels to %d servents",numChans,numOut); }}// -----------------------------------
void ChanMgr::setUpdateInterval(unsigned int v)
{
hostUpdateInterval = v;
}
// -----------------------------------
// message check
#if 0
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack.data));
AtomStream atom(mem);
atom.writeParent(PCP_BCST,3);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ALL);
atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
atom.writeParent(PCP_MESG,1);
atom.writeString(PCP_MESG_DATA,msg.cstr());
mem.len = mem.pos;
mem.rewind();
pack.len = mem.len;
GnuID noID;
noID.clear();
BroadcastState bcs;
PCPStream::readAtom(atom,bcs);
//int cnt = servMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
//int cnt = servMgr->broadcastPacketDown(pack,noID,servMgr->sessionID);
//int cnt = chanMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
//LOG_DEBUG("Sent message to %d clients",cnt);
#endif
// -----------------------------------void ChanMgr::setBroadcastMsg(String &msg){
if (!msg.isSame(broadcastMsg)) { broadcastMsg = msg; for(int i=0; i<MAX_CHANNELS; i++) { Channel *c = &channels[i]; if (c->isActive())
{
GnuID id = servMgr->sessionID; if (c->status == Channel::S_BROADCASTING) { c->info.comment = broadcastMsg; c->updateMeta();
id = c->info.id;
}
} } }}
// -----------------------------------void ChanMgr::clearHitLists(){ for(int i=0; i<MAX_HITLISTS; i++) { peercastApp->delChannel(&hitlists[i].info); hitlists[i].init(); }}// -----------------------------------Channel *ChanMgr::createChannel(ChanInfo &info, const char *mount){ lock.on(); for(int i=0; i<MAX_CHANNELS; i++) { Channel *c = &channels[i]; if (!c->isPlaying())
{
c->close();
for(int j=0; j<100; j++) // wait 10 seconds
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -