📄 channel.cpp.svn-base
字号:
break; case ChanPacket::T_DATA:
pack.pos = ch->streamPos;
ch->newPacket(pack);
ch->streamPos+=pack.len;
break;
case ChanPacket::T_META: ch->insertMeta.fromMem(pack.data,pack.len); { if (pack.len) { XML xml; xml.read(mem); XML::Node *n = xml.findNode("channel"); if (n) {
ChanInfo newInfo = ch->info; newInfo.updateFromXML(n); ChanHitList *chl = chanMgr->findHitList(ch->info); if (chl) newInfo.updateFromXML(n); ch->updateInfo(newInfo);
} } }
break;#if 0
case ChanPacket::T_SYNC: { unsigned int s = mem.readLong(); if ((s-ch->syncPos) != 1) { LOG_CHANNEL("Ch.%d SKIP: %d to %d (%d)",ch->index,ch->syncPos,s,ch->info.numSkips); if (ch->syncPos) { ch->info.numSkips++; if (ch->info.numSkips>50) throw StreamException("Bumped - Too many skips"); } } ch->syncPos = s; } break;
#endif
}
}
return 0;}// -----------------------------------void ChannelStream::readRaw(Stream &in, Channel *ch){ ChanPacket pack; const int readLen = 8192;
pack.init(ChanPacket::T_DATA,pack.data,readLen,ch->streamPos); in.read(pack.data,pack.len); ch->newPacket(pack);
ch->checkReadDelay(pack.len);
ch->streamPos+=pack.len;}
// ------------------------------------------
void RawStream::readHeader(Stream &,Channel *)
{
}
// ------------------------------------------
int RawStream::readPacket(Stream &in,Channel *ch)
{
readRaw(in,ch);
return 0;
}
// ------------------------------------------
void RawStream::readEnd(Stream &,Channel *)
{
}
// -----------------------------------void ChanPacket::init(TYPE t, const void *p, unsigned int l,unsigned int _pos){
type = t; if (l > MAX_DATALEN)
throw StreamException("Packet data too large"); len = l; memcpy(data,p,len);
pos = _pos;
}// -----------------------------------
void ChanPacket::writeRaw(Stream &out)
{
out.write(data,len);
}
// -----------------------------------void ChanPacket::writePeercast(Stream &out){ unsigned int tp = 0;
switch (type)
{
case T_HEAD: tp = 'HEAD'; break;
case T_META: tp = 'META'; break;
case T_DATA: tp = 'DATA'; break;
}
if (type != T_UNKNOWN)
{
out.writeTag(tp); out.writeShort(len); out.writeShort(0); out.write(data,len);
}}// -----------------------------------void ChanPacket::readPeercast(Stream &in){
unsigned int tp = in.readTag();
switch (tp)
{
case 'HEAD': type = T_HEAD; break;
case 'DATA': type = T_DATA; break;
case 'META': type = T_META; break;
default: type = T_UNKNOWN;
}
len = in.readShort(); in.readShort(); if (len > MAX_DATALEN) throw StreamException("Bad ChanPacket"); in.read(data,len);}// -----------------------------------
int ChanPacketBuffer::copyFrom(ChanPacketBuffer &buf, unsigned int reqPos)
{
lock.on();
buf.lock.on();
firstPos = 0;
lastPos = 0;
safePos = 0;
readPos = 0;
for(unsigned int i=buf.firstPos; i<=buf.lastPos; i++)
{
ChanPacket *src = &buf.packets[i%MAX_PACKETS];
if (src->type & accept)
{
if (src->pos >= reqPos)
{
lastPos = writePos;
packets[writePos++] = *src;
}
}
}
buf.lock.off();
lock.off();
return lastPos-firstPos;
}
// -----------------------------------
bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
{
if (writePos == 0)
return false;
lock.on();
unsigned int fpos = getStreamPos(firstPos);
if (spos < fpos)
spos = fpos;
for(unsigned int i=firstPos; i<=lastPos; i++)
{
ChanPacket &p = packets[i%MAX_PACKETS];
if (p.pos >= spos)
{
pack = p;
lock.off();
return true;
}
}
lock.off();
return false;
}
// -----------------------------------
unsigned int ChanPacketBuffer::getLatestPos()
{
if (!writePos)
return 0;
else
return getStreamPos(lastPos);
}
// -----------------------------------
unsigned int ChanPacketBuffer::getOldestPos()
{
if (!writePos)
return 0;
else
return getStreamPos(firstPos);
}
// -----------------------------------
unsigned int ChanPacketBuffer::findOldestPos(unsigned int spos)
{
unsigned int min = getStreamPos(safePos);
unsigned int max = getStreamPos(lastPos);
if (min > spos)
return min;
if (max < spos)
return max;
return spos;
}
// -----------------------------------
unsigned int ChanPacketBuffer::getStreamPos(unsigned int index)
{
return packets[index%MAX_PACKETS].pos;
}
// -----------------------------------
unsigned int ChanPacketBuffer::getStreamPosEnd(unsigned int index)
{
return packets[index%MAX_PACKETS].pos+packets[index%MAX_PACKETS].len;
}
// -----------------------------------bool ChanPacketBuffer::writePacket(ChanPacket &pack, bool updateReadPos){
if (pack.len)
{
if (willSkip()) // too far behind
return false;
lock.on();
packets[writePos%MAX_PACKETS] = pack; lastPos = writePos; writePos++;
if (writePos >= MAX_PACKETS)
firstPos = writePos-MAX_PACKETS;
else
firstPos = 0;
if (writePos >= NUM_SAFEPACKETS)
safePos = writePos - NUM_SAFEPACKETS;
else
safePos = 0;
if (updateReadPos)
readPos = writePos;
lastWriteTime = sys->getTime();
lock.off();
return true;
}
return false;}// -----------------------------------
void ChanPacketBuffer::readPacket(ChanPacket &pack)
{
unsigned int tim = sys->getTime();
if (readPos < firstPos)
throw StreamException("Read too far behind");
while (readPos >= writePos)
{
sys->sleepIdle();
if ((sys->getTime() - tim) > 30)
throw TimeoutException();
}
lock.on();
pack = packets[readPos%MAX_PACKETS];
readPos++;
sys->sleepIdle();
lock.off();
}
// -----------------------------------
bool ChanPacketBuffer::willSkip()
{
return ((writePos-readPos) >= MAX_PACKETS);
}
// -----------------------------------void Channel::getStreamPath(char *str){ char idStr[64]; getIDStr(idStr); sprintf(str,"/stream/%s%s",idStr,info.getTypeExt(info.contentType));}// -----------------------------------void ChanMgr::startSearch(ChanInfo &info){ searchInfo = info; clearHitLists(); numFinds = 0; lastHit = 0;// lastSearch = 0; searchActive = true;}// -----------------------------------
void ChanMgr::quit()
{
closeAll();
}
// -----------------------------------
void ChanMgr::closeAll()
{
for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].isActive())
channels[i].close();
}// -----------------------------------Channel *ChanMgr::findChannelByNameID(ChanInfo &info){ for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].isActive())
if (channels[i].info.matchNameID(info))
return &channels[i];
return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByName(const char *n){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (stricmp(channels[i].info.name,n)==0) return &channels[i]; return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByIndex(int index){
int cnt=0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive())
{
if (cnt == index)
return &channels[i];
cnt++;
} return NULL;} // -----------------------------------Channel *ChanMgr::findChannelByMount(const char *str){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (strcmp(channels[i].mount,str)==0) return &channels[i]; return NULL;} // -----------------------------------Channel *ChanMgr::findChannelByID(GnuID &id){ for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].info.id.isSame(id)) return &channels[i]; return NULL;} // -----------------------------------int ChanMgr::findChannels(ChanInfo &info, Channel **ch, int max){ int cnt=0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].info.match(info)) { ch[cnt++] = &channels[i]; if (cnt >= max) break; } return cnt;}// -----------------------------------int ChanMgr::findChannelsByStatus(Channel **ch, int max, Channel::STATUS status){ int cnt=0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].status == status) { ch[cnt++] = &channels[i]; if (cnt >= max) break; } return cnt;}// -----------------------------------Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected){ Channel *c = chanMgr->createChannel(info,NULL); if (c) { c->stayConnected = stayConnected;
c->startGet(); return c; } return NULL;}// -----------------------------------Channel *ChanMgr::findAndRelay(ChanInfo &info){ char idStr[64]; info.id.toStr(idStr); LOG_CHANNEL("Searching for: %s (%s)",idStr,info.name.cstr()); peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Finding channel...");
Channel *c = NULL; c = findChannelByNameID(info); if (!c) { c = chanMgr->createChannel(info,NULL); if (c)
{ c->setStatus(Channel::S_SEARCHING);
c->startGet();
} }
for(int i=0; i<600; i++) // search for 1 minute.
{
c = findChannelByNameID(info);
if (!c)
{ peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Channel not found");
return NULL;
}
if (c->isPlaying() && (c->info.contentType!=ChanInfo::T_UNKNOWN))
break;
sys->sleep(100); } return c;}// -----------------------------------ChanMgr::ChanMgr(){ int i; for(i=0; i<MAX_CHANNELS; i++) channels[i].init(); for(i=0; i<MAX_HITLISTS; i++) { hitlists[i].index = i; hitlists[i].init(); }
currFindAndPlayChannel.clear();
broadcastMsg.clear(); broadcastMsgInterval=10; broadcastID.generate(); deadHitAge = 600;
icyIndex = 0; icyMetaInterval = 8192; maxRelaysPerChannel = 0; searchInfo.init(); minBroadcastTTL = 1; maxBroadcastTTL = 7; pushTimeout = 60; // 1 minute pushTries = 5; // 5 times maxPushHops = 8; // max 8 hops away maxUptime = 0; // 0 = none
prefetchTime = 10; // n seconds
hostUpdateInterval = 180; // 2 minutes
bufferTime = 5;
autoQuery = 0; lastQuery = 0;
lastYPConnect = 0;
}
// -----------------------------------bool ChanMgr::writeVariable(Stream &out, const String &var, int index){ char buf[1024]; if (var == "numHitLists") sprintf(buf,"%d",numHitLists());
else if (var == "numChannels") sprintf(buf,"%d",numChannels()); else if (var == "djMessage")
strcpy(buf,broadcastMsg.cstr());
else if (var == "icyMetaInterval")
sprintf(buf,"%d",icyMetaInterval);
else if (var == "maxRelaysPerChannel")
sprintf(buf,"%d",maxRelaysPerChannel);
else if (var == "hostUpdateInterval")
sprintf(buf,"%d",hostUpdateInterval);
else return false; out.writeString(buf); return true;}
// -----------------------------------
bool Channel::writeVariable(Stream &out, const String &var, int index)
{
char buf[1024];
buf[0]=0;
String utf8;
if (var == "name")
{
utf8 = info.name;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "bitrate")
{
sprintf(buf,"%d",info.bitrate);
}else if (var == "genre")
{
utf8 = info.genre;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "desc")
{
utf8 = info.desc;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "comment")
{
utf8 = info.comment;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "uptime")
{
String uptime;
if (info.lastPlayStart)
uptime.setFromStopwatch(sys->getTime()-info.lastPlayStart);
else
uptime.set("-");
strcpy(buf,uptime.cstr());
}
else if (var == "type")
sprintf(buf,"%s",ChanInfo::getTypeStr(info.contentType));
else if (var == "localRelays")
sprintf(buf,"%d",localRelays());
else if (var == "localListeners")
sprintf(buf,"%d",localListeners());
else if (var == "totalRelays")
sprintf(buf,"%d",totalRelays());
else if (var == "totalListeners")
sprintf(buf,"%d",totalListeners());
else if (var == "status")
sprintf(buf,"%s",getStatusStr());
else if (var == "keep")
sprintf(buf,"%s",stayConnected?"Yes":"No");
else if (var == "id")
info.id.toStr(buf);
else if (var.startsWith("track."))
{
if (var == "track.title")
utf8 = info.track.title;
else if (var == "track.artist")
utf8 = info.track.artist;
else if (var == "track.album")
utf8 = info.track.album;
else if (var == "track.genre")
utf8 = info.track.genre;
else if (var == "track.contactURL")
utf8 = info.track.contact;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -