📄 channel.cpp
字号:
{
out.writeTag(tp); out.writeShort(len); out.writeShort(0); out.write(data,len);
}}// -----------------------------------void ChanPacket::readPeercast(Stream &in){
unsigned int tp = in.readTag();
switch (tp)
{
case 'HEAD': type = T_HEAD; break;
case 'DATA': type = T_DATA; break;
case 'META': type = T_META; break;
default: type = T_UNKNOWN;
}
len = in.readShort(); in.readShort(); if (len > MAX_DATALEN) throw StreamException("Bad ChanPacket"); in.read(data,len);}// -----------------------------------
int ChanPacketBuffer::copyFrom(ChanPacketBuffer &buf, unsigned int reqPos)
{
lock.on();
buf.lock.on();
firstPos = 0;
lastPos = 0;
safePos = 0;
readPos = 0;
for(unsigned int i=buf.firstPos; i<=buf.lastPos; i++)
{
ChanPacket *src = &buf.packets[i%MAX_PACKETS];
if (src->type & accept)
{
if (src->pos >= reqPos)
{
lastPos = writePos;
packets[writePos++] = *src;
}
}
}
buf.lock.off();
lock.off();
return lastPos-firstPos;
}
// -----------------------------------
bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
{
if (writePos == 0)
return false;
unsigned int fpos = getStreamPos(firstPos);
if (spos < fpos)
spos = fpos;
for(unsigned int i=firstPos; i<=lastPos; i++)
{
ChanPacket &p = packets[i%MAX_PACKETS];
if (p.pos >= spos)
{
pack = p;
return true;
}
}
return false;
}
// -----------------------------------
unsigned int ChanPacketBuffer::getLatestPos()
{
if (!writePos)
return 0;
else
return getStreamPos(lastPos);
}
// -----------------------------------
unsigned int ChanPacketBuffer::findOldestPos(unsigned int spos)
{
unsigned int min = getStreamPos(safePos);
unsigned int max = getStreamPos(lastPos);
if (min > spos)
return min;
if (max < spos)
return max;
return spos;
}
// -----------------------------------
unsigned int ChanPacketBuffer::getStreamPos(unsigned int index)
{
return packets[index%MAX_PACKETS].pos;
}
// -----------------------------------
unsigned int ChanPacketBuffer::getStreamPosEnd(unsigned int index)
{
return packets[index%MAX_PACKETS].pos+packets[index%MAX_PACKETS].len;
}
// -----------------------------------bool ChanPacketBuffer::writePacket(ChanPacket &pack, bool updateReadPos){
if (pack.len)
{
if (willSkip()) // too far behind
return false;
lock.on();
packets[writePos%MAX_PACKETS] = pack; lastPos = writePos; writePos++;
if (writePos >= MAX_PACKETS)
firstPos = writePos-MAX_PACKETS;
else
firstPos = 0;
if (writePos >= NUM_SAFEPACKETS)
safePos = writePos - NUM_SAFEPACKETS;
else
safePos = 0;
if (updateReadPos)
readPos = writePos;
lastWriteTime = sys->getTime();
lock.off();
return true;
}
return false;}// -----------------------------------
void ChanPacketBuffer::readPacket(ChanPacket &pack)
{
unsigned int tim = sys->getTime();
if (readPos < firstPos)
throw StreamException("Read too far behind");
while (readPos >= writePos)
{
sys->sleepIdle();
if ((sys->getTime() - tim) > 30)
throw TimeoutException();
}
lock.on();
pack = packets[readPos%MAX_PACKETS];
readPos++;
sys->sleepIdle();
lock.off();
}
// -----------------------------------
bool ChanPacketBuffer::willSkip()
{
return ((writePos-readPos) >= MAX_PACKETS);
}
// -----------------------------------void Channel::getStreamPath(char *str){ char idStr[64]; getIDStr(idStr); sprintf(str,"/stream/%s%s",idStr,info.getTypeExt(info.contentType));}// -----------------------------------void ChanMgr::startSearch(ChanInfo &info){ searchInfo = info; clearHitLists(); numFinds = 0; lastHit = 0;// lastSearch = 0; searchActive = true;}// -----------------------------------
void ChanMgr::quit()
{
closeAll();
}
// -----------------------------------
void ChanMgr::closeAll()
{
for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].isActive())
channels[i].close();
}// -----------------------------------Channel *ChanMgr::findChannelByNameID(ChanInfo &info){ for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].isActive())
if (channels[i].info.matchNameID(info))
return &channels[i];
return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByName(const char *n){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (stricmp(channels[i].info.name,n)==0) return &channels[i]; return NULL;}// -----------------------------------Channel *ChanMgr::findListenerChannel(){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].numListeners()) return &channels[i]; return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByIndex(int index){
int cnt=0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive())
{
if (cnt == index)
return &channels[i];
cnt++;
} return NULL;} // -----------------------------------Channel *ChanMgr::findChannelByMount(const char *str){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (strcmp(channels[i].mount,str)==0) return &channels[i]; return NULL;} // -----------------------------------Channel *ChanMgr::findChannelByID(GnuID &id){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].info.id.isSame(id)) return &channels[i]; return NULL;} // -----------------------------------int ChanMgr::findChannels(ChanInfo &info, Channel **ch, int max){ int cnt=0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].info.match(info)) { ch[cnt++] = &channels[i]; if (cnt >= max) break; } return cnt;}// -----------------------------------int ChanMgr::findChannelsByStatus(Channel **ch, int max, Channel::STATUS status){ int cnt=0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].status == status) { ch[cnt++] = &channels[i]; if (cnt >= max) break; } return cnt;}// -----------------------------------Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected){ Channel *c = chanMgr->createChannel(info,NULL); if (c) { c->stayConnected = stayConnected;
c->startGet(); return c; } return NULL;}// -----------------------------------Channel *ChanMgr::findAndRelay(ChanInfo &info){ char idStr[64]; info.id.toStr(idStr); LOG_CHANNEL("Searching for: %s (%s)",idStr,info.name.cstr()); peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Finding channel...");
Channel *c = NULL; c = findChannelByNameID(info); if (!c) { c = chanMgr->createChannel(info,NULL); if (c)
{ c->setStatus(Channel::S_SEARCHING);
c->startGet();
} }
for(int i=0; i<600; i++) // search for 1 minute.
{
c = findChannelByNameID(info);
if (!c)
{ peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Channel not found");
return NULL;
}
if (c->isPlaying() && (c->info.contentType!=ChanInfo::T_UNKNOWN))
break;
sys->sleep(100); } return c;}// -----------------------------------ChanMgr::ChanMgr(){ int i; for(i=0; i<MAX_CHANNELS; i++) channels[i].init(); for(i=0; i<MAX_HITLISTS; i++) { hitlists[i].index = i; hitlists[i].init(); } broadcastMsg.clear(); broadcastMsgInterval=10; broadcastID.generate(); deadHitAge = 600;
icyIndex = 0; icyMetaInterval = 8192; maxStreamsPerChannel = 0; searchInfo.init(); minBroadcastTTL = 1; maxBroadcastTTL = 7; pushTimeout = 60; // 1 minute pushTries = 5; // 5 times maxPushHops = 8; // max 8 hops away maxUptime = 0; // 0 = none
prefetchTime = 10; // n seconds
hostUpdateInterval = 180; // 2 minutes
bufferTime = 5;
trackerHitList.init();
autoQuery = 0; lastQuery = 0;
lastYPConnect = 0;
}
// -----------------------------------
ChanHit ChanMgr::getTracker(Host *sh,unsigned int wait,bool useFirewalled)
{
ChanHit hit;
hit.init();
if (!hit.host.ip)
hit = trackerHitList.getHit(sh,wait,useFirewalled,true);
if (!hit.host.ip)
for(int i=0; i<MAX_HITLISTS; i++)
if (hitlists[i].isUsed())
{
hit = hitlists[i].getHit(sh,wait,useFirewalled,true);
if (hit.host.ip)
break;
}
return hit;
}
// -----------------------------------
ChanHit ChanMgr::findHit(GnuID &sid,bool tracker)
{
ChanHit hit;
hit.init();
for(int i=0; i<MAX_HITLISTS; i++)
if (hitlists[i].isUsed())
{
hit = hitlists[i].findHit(sid,tracker);
if (hit.host.ip)
break;
}
return hit;
}
// -----------------------------------bool ChanMgr::writeVariable(Stream &out, const String &var, int index){ char buf[1024]; if (var == "numHitLists") sprintf(buf,"%d",numHitLists()); else if (var == "numRelayed") sprintf(buf,"%d",numRelayed()); else if (var == "numChannels") sprintf(buf,"%d",numChannels()); else if (var == "totalListeners") sprintf(buf,"%d",numListeners()); else if (var == "djMessage")
strcpy(buf,broadcastMsg.cstr());
else if (var == "icyMetaInterval")
sprintf(buf,"%d",icyMetaInterval);
else if (var == "maxStreamsPerChannel")
sprintf(buf,"%d",maxStreamsPerChannel);
else if (var == "hostUpdateInterval")
sprintf(buf,"%d",hostUpdateInterval);
else return false; out.writeString(buf); return true;}
// -----------------------------------
bool Channel::writeVariable(Stream &out, const String &var, int index)
{
char buf[1024];
buf[0]=0;
String utf8;
if (var == "name")
{
utf8 = info.name;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "bitrate")
{
sprintf(buf,"%d",info.bitrate);
}else if (var == "genre")
{
utf8 = info.genre;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "desc")
{
utf8 = info.desc;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "comment")
{
utf8 = info.comment;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "uptime")
{
String uptime;
if (info.lastPlayTime)
uptime.setFromStopwatch(sys->getTime()-info.lastPlayTime);
else
uptime.set("-");
strcpy(buf,uptime.cstr());
}
else if (var == "type")
sprintf(buf,"%s",ChanInfo::getTypeStr(info.contentType));
else if (var == "numRelays")
sprintf(buf,"%d",numRelays());
else if (var == "numListeners")
sprintf(buf,"%d",numListeners());
else if (var == "status")
sprintf(buf,"%s",getStatusStr());
else if (var == "keep")
sprintf(buf,"%s",stayConnected?"Yes":"No");
else if (var == "id")
info.id.toStr(buf);
else if (var.startsWith("track."))
{
if (var == "track.title")
utf8 = info.track.title;
else if (var == "track.artist")
utf8 = info.track.artist;
else if (var == "track.album")
utf8 = info.track.album;
else if (var == "track.genre")
utf8 = info.track.genre;
else if (var == "track.contactURL")
utf8 = info.track.contact;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "contactURL")
sprintf(buf,"%s",info.url.cstr());
else if (var == "streamPos")
sprintf(buf,"%d",streamPos);
else if (var == "sourceType")
strcpy(buf,getSrcTypeStr());
else if (var == "sourceProtocol")
strcpy(buf,ChanInfo::getProtocolStr(info.srcProtocol));
else if (var == "sourceURL")
strcpy(buf,sourceURL.cstr());
else if (var == "headPos")
sprintf(buf,"%d",headPack.pos);
else if (var == "headLen")
sprintf(buf,"%d",headPack.len);
else if (var == "numHits")
{
ChanHitList *chl = chanMgr->findHitListByID(info.id);
int numHits = 0;
if (chl)
numHits = chl->numHits();
sprintf(buf,"%d",numHits);
}else
return false;
out.writeString(buf);
return true;
}
// -----------------------------------
void ChanMgr::broadcastTrackerUpdate(GnuID &svID, bool force)
{
for(int i=0; i<MAX_CHANNELS; i++)
{
Channel *c = &channels[i];
if ( c->isActive() && c->isBroadcasting() )
c->broadcastTrackerUpdate(svID,force);
}
}
// -----------------------------------
int ChanMgr::broadcastPacketUp(ChanPacket &pack,GnuID &chanID, GnuID &srcID, GnuID &destID)
{
int cnt=0;
if (destID.isSet())
{
for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].sendPacketUp(pack,chanID,srcID,destID))
return 1;
}
GnuID noID;
noID.clear();
for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].sendPacketUp(pack,chanID,srcID,noID))
cnt++;
return cnt;
}
// -----------------------------------void ChanMgr::broadcastRelays(Servent *serv, int minTTL, int maxTTL){ //if ((servMgr->getFirewall() == ServMgr::FW_OFF) || servMgr->serverHost.localIP()) { Host sh = servMgr->serverHost; bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF); bool busy = (servMgr->pubInFull() && servMgr->outFull()) || servMgr->streamFull(); bool stable = servMgr->totalStreams>0;
GnuPacket hit; int numChans=0; for(int i=0; i<MAX_CHANNELS; i++)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -