📄 channel.cpp
字号:
}
// ------------------------------------------
int RawStream::readPacket(Stream &in,Channel *ch)
{
readRaw(in,ch);
return 0;
}
// ------------------------------------------
void RawStream::readEnd(Stream &,Channel *)
{
}
// -----------------------------------void ChanPacket::init(TYPE t, const void *p, unsigned int l,unsigned int _pos){
type = t; if (l > MAX_DATALEN)
throw StreamException("Packet data too large"); len = l; memcpy(data,p,len);
pos = _pos;
}// -----------------------------------
void ChanPacket::writeRaw(Stream &out)
{
out.write(data,len);
}
// -----------------------------------void ChanPacket::writePeercast(Stream &out){ unsigned int tp = 0;
switch (type)
{
case T_HEAD: tp = 'HEAD'; break;
case T_META: tp = 'META'; break;
case T_DATA: tp = 'DATA'; break;
}
if (type != T_UNKNOWN)
{
out.writeTag(tp); out.writeShort(len); out.writeShort(0); out.write(data,len);
}}// -----------------------------------void ChanPacket::readPeercast(Stream &in){
unsigned int tp = in.readTag();
switch (tp)
{
case 'HEAD': type = T_HEAD; break;
case 'DATA': type = T_DATA; break;
case 'META': type = T_META; break;
default: type = T_UNKNOWN;
}
len = in.readShort(); in.readShort(); if (len > MAX_DATALEN) throw StreamException("Bad ChanPacket"); in.read(data,len);}// -----------------------------------
int ChanPacketBuffer::copyFrom(ChanPacketBuffer &buf, unsigned int reqPos)
{
lock.on();
buf.lock.on();
firstPos = 0;
lastPos = 0;
safePos = 0;
readPos = 0;
for(unsigned int i=buf.firstPos; i<=buf.lastPos; i++)
{
ChanPacket *src = &buf.packets[i%MAX_PACKETS];
if (src->type & accept)
{
if (src->pos >= reqPos)
{
lastPos = writePos;
packets[writePos++] = *src;
}
}
}
buf.lock.off();
lock.off();
return lastPos-firstPos;
}
// -----------------------------------
bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
{
if (writePos == 0)
return false;
lock.on();
unsigned int fpos = getStreamPos(firstPos);
if (spos < fpos)
spos = fpos;
for(unsigned int i=firstPos; i<=lastPos; i++)
{
ChanPacket &p = packets[i%MAX_PACKETS];
if (p.pos >= spos)
{
pack = p;
lock.off();
return true;
}
}
lock.off();
return false;
}
// -----------------------------------
unsigned int ChanPacketBuffer::getLatestPos()
{
if (!writePos)
return 0;
else
return getStreamPos(lastPos);
}
// -----------------------------------
unsigned int ChanPacketBuffer::getOldestPos()
{
if (!writePos)
return 0;
else
return getStreamPos(firstPos);
}
// -----------------------------------
unsigned int ChanPacketBuffer::findOldestPos(unsigned int spos)
{
unsigned int min = getStreamPos(safePos);
unsigned int max = getStreamPos(lastPos);
if (min > spos)
return min;
if (max < spos)
return max;
return spos;
}
// -----------------------------------
unsigned int ChanPacketBuffer::getStreamPos(unsigned int index)
{
return packets[index%MAX_PACKETS].pos;
}
// -----------------------------------
unsigned int ChanPacketBuffer::getStreamPosEnd(unsigned int index)
{
return packets[index%MAX_PACKETS].pos+packets[index%MAX_PACKETS].len;
}
// -----------------------------------bool ChanPacketBuffer::writePacket(ChanPacket &pack, bool updateReadPos){
if (pack.len)
{
if (willSkip()) // too far behind
return false;
lock.on();
pack.sync = writePos; packets[writePos%MAX_PACKETS] = pack; lastPos = writePos; writePos++;
if (writePos >= MAX_PACKETS)
firstPos = writePos-MAX_PACKETS;
else
firstPos = 0;
if (writePos >= NUM_SAFEPACKETS)
safePos = writePos - NUM_SAFEPACKETS;
else
safePos = 0;
if (updateReadPos)
readPos = writePos;
lastWriteTime = sys->getTime();
lock.off();
return true;
}
return false;}// -----------------------------------
void ChanPacketBuffer::readPacket(ChanPacket &pack)
{
unsigned int tim = sys->getTime();
if (readPos < firstPos)
throw StreamException("Read too far behind");
while (readPos >= writePos)
{
sys->sleepIdle();
if ((sys->getTime() - tim) > 30)
throw TimeoutException();
}
lock.on();
pack = packets[readPos%MAX_PACKETS];
readPos++;
lock.off();
sys->sleepIdle();
}
// -----------------------------------
bool ChanPacketBuffer::willSkip()
{
return ((writePos-readPos) >= MAX_PACKETS);
}
// -----------------------------------void Channel::getStreamPath(char *str){ char idStr[64]; getIDStr(idStr); sprintf(str,"/stream/%s%s",idStr,info.getTypeExt(info.contentType));}
// -----------------------------------void ChanMgr::startSearch(ChanInfo &info){ searchInfo = info; clearHitLists(); numFinds = 0; lastHit = 0;// lastSearch = 0; searchActive = true;}// -----------------------------------
void ChanMgr::quit()
{
LOG_DEBUG("ChanMgr is quitting..");
closeAll();
}
// -----------------------------------
int ChanMgr::numIdleChannels()
{
int cnt=0;
Channel *ch = channel;
while (ch)
{
if (ch->isActive())
if (ch->thread.active)
if (ch->status == Channel::S_IDLE)
cnt++;
ch=ch->next;
}
return cnt;
}
// -----------------------------------
void ChanMgr::closeOldestIdle()
{
unsigned int idleTime = (unsigned int)-1;
Channel *ch = channel,*oldest=NULL;
while (ch)
{
if (ch->isActive())
if (ch->thread.active)
if (ch->status == Channel::S_IDLE)
if (ch->lastIdleTime < idleTime)
{
oldest = ch;
idleTime = ch->lastIdleTime;
}
ch=ch->next;
}
if (oldest)
oldest->thread.active = false;
}
// -----------------------------------
void ChanMgr::closeAll()
{
Channel *ch = channel;
while (ch)
{
if (ch->thread.active)
ch->thread.shutdown();
ch=ch->next;
}
}// -----------------------------------Channel *ChanMgr::findChannelByNameID(ChanInfo &info){ Channel *ch = channel;
while (ch)
{
if (ch->isActive())
if (ch->info.matchNameID(info))
return ch;
ch=ch->next;
}
return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByName(const char *n){ Channel *ch = channel;
while (ch)
{
if (ch->isActive()) if (stricmp(ch->info.name,n)==0) return ch; ch=ch->next;
}
return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByIndex(int index){
int cnt=0; Channel *ch = channel;
while (ch)
{
if (ch->isActive())
{
if (cnt == index)
return ch;
cnt++;
} ch=ch->next;
}
return NULL;} // -----------------------------------Channel *ChanMgr::findChannelByMount(const char *str){ Channel *ch = channel;
while (ch)
{
if (ch->isActive()) if (strcmp(ch->mount,str)==0) return ch; ch=ch->next;
}
return NULL;} // -----------------------------------Channel *ChanMgr::findChannelByID(GnuID &id){ Channel *ch = channel;
while (ch)
{
if (ch->isActive()) if (ch->info.id.isSame(id)) return ch; ch=ch->next;
} return NULL;} // -----------------------------------int ChanMgr::findChannels(ChanInfo &info, Channel **chlist, int max){ int cnt=0; Channel *ch = channel;
while (ch)
{
if (ch->isActive()) if (ch->info.match(info)) { chlist[cnt++] = ch; if (cnt >= max) break; } ch=ch->next;
}
return cnt;}// -----------------------------------int ChanMgr::findChannelsByStatus(Channel **chlist, int max, Channel::STATUS status){ int cnt=0; Channel *ch = channel;
while (ch)
{
if (ch->isActive()) if (ch->status == status) { chlist[cnt++] = ch; if (cnt >= max) break; } ch=ch->next;
}
return cnt;}// -----------------------------------Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected){ Channel *c = chanMgr->createChannel(info,NULL); if (c) { c->stayConnected = stayConnected;
c->startGet(); return c; } return NULL;}// -----------------------------------Channel *ChanMgr::findAndRelay(ChanInfo &info){ char idStr[64]; info.id.toStr(idStr); LOG_CHANNEL("Searching for: %s (%s)",idStr,info.name.cstr()); peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Finding channel...");
Channel *c = NULL; c = findChannelByNameID(info); if (!c) { c = chanMgr->createChannel(info,NULL); if (c)
{ c->setStatus(Channel::S_SEARCHING);
c->startGet();
} }
for(int i=0; i<600; i++) // search for 1 minute.
{
c = findChannelByNameID(info);
if (!c)
{ peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Channel not found");
return NULL;
}
if (c->isPlaying() && (c->info.contentType!=ChanInfo::T_UNKNOWN))
break;
sys->sleep(100); } return c;}// -----------------------------------ChanMgr::ChanMgr(){ channel = NULL;
hitlist = NULL;
currFindAndPlayChannel.clear();
broadcastMsg.clear(); broadcastMsgInterval=10;
broadcastID.generate(PCP_BROADCAST_FLAGS);
deadHitAge = 600;
icyIndex = 0; icyMetaInterval = 8192; maxRelaysPerChannel = 0; searchInfo.init(); minBroadcastTTL = 1; maxBroadcastTTL = 7; pushTimeout = 60; // 1 minute pushTries = 5; // 5 times maxPushHops = 8; // max 8 hops away maxUptime = 0; // 0 = none
prefetchTime = 10; // n seconds
hostUpdateInterval = 180; // 2 minutes
bufferTime = 5;
autoQuery = 0; lastQuery = 0;
lastYPConnect = 0;
}
// -----------------------------------bool ChanMgr::writeVariable(Stream &out, const String &var, int index){ char buf[1024]; if (var == "numHitLists") sprintf(buf,"%d",numHitLists());
else if (var == "numChannels") sprintf(buf,"%d",numChannels()); else if (var == "djMessage")
strcpy(buf,broadcastMsg.cstr());
else if (var == "icyMetaInterval")
sprintf(buf,"%d",icyMetaInterval);
else if (var == "maxRelaysPerChannel")
sprintf(buf,"%d",maxRelaysPerChannel);
else if (var == "hostUpdateInterval")
sprintf(buf,"%d",hostUpdateInterval);
else if (var == "broadcastID")
broadcastID.toStr(buf);
else return false; out.writeString(buf); return true;}
// -----------------------------------
bool Channel::writeVariable(Stream &out, const String &var, int index)
{
char buf[1024];
buf[0]=0;
String utf8;
if (var == "name")
{
utf8 = info.name;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "bitrate")
{
sprintf(buf,"%d",info.bitrate);
}else if (var == "srcrate")
{
if (sourceData)
{
unsigned int tot = sourceData->getSourceRate();
sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
}else
strcpy(buf,"0");
}else if (var == "genre")
{
utf8 = info.genre;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "desc")
{
utf8 = info.desc;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "comment")
{
utf8 = info.comment;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "uptime")
{
String uptime;
if (info.lastPlayStart)
uptime.setFromStopwatch(sys->getTime()-info.lastPlayStart);
else
uptime.set("-");
strcpy(buf,uptime.cstr());
}
else if (var == "type")
sprintf(buf,"%s",ChanInfo::getTypeStr(info.contentType));
else if (var == "ext")
sprintf(buf,"%s",ChanInfo::getTypeExt(info.contentType));
else if (var == "localRelays")
sprintf(buf,"%d",localRelays());
else if (var == "localListeners")
sprintf(buf,"%d",localListeners());
else if (var == "totalRelays")
sprintf(buf,"%d",totalRelays());
else if (var == "totalListeners")
sprintf(buf,"%d",totalListeners());
else if (var == "status")
sprintf(buf,"%s",getStatusStr());
else if (var == "keep")
sprintf(buf,"%s",stayConnected?"Yes":"No");
else if (var == "id")
info.id.toStr(buf);
else if (var.startsWith("track."))
{
if (var == "track.title")
utf8 = info.track.title;
else if (var == "track.artist")
utf8 = info.track.artist;
else if (var == "track.album")
utf8 = info.track.album;
else if (var == "track.genre")
utf8 = info.track.genre;
else if (var == "track.contactURL")
utf8 = info.track.contact;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "contactURL")
sprintf(buf,"%s",info.url.cstr());
else if (var == "streamPos")
sprintf(buf,"%d",streamPos);
else if (var == "sourceType")
strcpy(buf,getSrcTypeStr());
else if (var == "sourceProtocol")
strcpy(buf,ChanInfo::getProtocolStr(info.srcProtocol));
else if (var == "sourceURL")
{
if (sourceURL.isEmpty())
sourceHost.host.toStr(buf);
else
strcpy(buf,sourceURL.cstr());
}
else if (var == "headPos")
sprintf(buf,"%d",headPack.pos);
else if (var == "headLen")
sprintf(buf,"%d",headPack.len);
else if (var == "numHits")
{
ChanHitList *chl = chanMgr->findHitListByID(info.id);
int numHits = 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -