📄 channel.cpp
字号:
if (!c->isActive())
break;
sys->sleep(100);
}
if (c->isActive())
break;
c->info = info; c->info.lastPlayTime = 0; c->info.status = ChanInfo::S_UNKNOWN; if (mount) c->mount.set(mount); c->index = i+1; c->setStatus(Channel::S_WAIT); c->type = Channel::T_ALLOCATED;
LOG_CHANNEL("New channel (%d) created",c->index); lock.off(); return c; }
} lock.off(); return NULL;}// -----------------------------------ChanHitList *ChanMgr::findHitList(ChanInfo &info){
for(int i=0; i<MAX_HITLISTS; i++) if (hitlists[i].isUsed()) if (hitlists[i].info.matchNameID(info)) return &hitlists[i]; return NULL;}// -----------------------------------
ChanHitList *ChanMgr::findHitListByID(GnuID &id)
{
for(int i=0; i<MAX_HITLISTS; i++)
if (hitlists[i].isUsed())
if (hitlists[i].info.id.isSame(id))
return &hitlists[i];
return NULL;
}
// -----------------------------------int ChanMgr::numHitLists(){ int num=0; for(int i=0; i<MAX_HITLISTS; i++) if (hitlists[i].isUsed()) num++; return num;}// -----------------------------------ChanHitList *ChanMgr::addHitList(ChanInfo &info){ ChanHitList *hl = NULL; for(int i=0; i<MAX_HITLISTS; i++) if (!hitlists[i].isUsed()) {
char idstr[64];
info.id.toStr(idstr);
LOG_DEBUG("Created new hitlist: %s",idstr); hl = &hitlists[i]; break; } if (hl) { hl->info = info; peercastApp->addChannel(&hl->info);
} return hl;}// -----------------------------------void ChanMgr::clearDeadHits(bool clearTrackers){ for(int i=0; i<MAX_HITLISTS; i++) if (hitlists[i].isUsed())
{
unsigned int interval;
if (hitlists[i].numTrackers())
interval = hostUpdateInterval+30;
else
interval = 1200; // old YP
if (hitlists[i].clearDeadHits(interval,clearTrackers) == 0) {
if (!isBroadcasting(hitlists[i].info.id))
{
if (!chanMgr->findChannelByID(hitlists[i].info.id))
{ LOG_DEBUG("Deleting hitlist");
peercastApp->delChannel(&hitlists[i].info); hitlists[i].init();
}
} }
}}// -----------------------------------
bool ChanMgr::isBroadcasting(GnuID &id)
{
Channel *ch = findChannelByID(id);
if (ch)
return ch->isBroadcasting();
return false;
}
// -----------------------------------
bool ChanMgr::isBroadcasting()
{
for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].isActive())
if (channels[i].isBroadcasting())
return true;
return false;
}
// -----------------------------------int ChanMgr::numConnected(){ int tot = 0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) tot++; return tot;}// -----------------------------------int ChanMgr::numRelayed(){ int tot = 0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].isPlaying()) if (channels[i].numRelays()>0) tot++; return tot;}// -----------------------------------int ChanMgr::numListeners(){ int tot = 0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].isPlaying()) if (channels[i].numListeners()>0) tot++; return tot;}// -----------------------------------int ChanMgr::numIdle(){ int tot = 0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].status == Channel::S_IDLE) tot++; return tot;}// -----------------------------------unsigned int ChanMgr::totalInput(){ unsigned int tot = 0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) if (channels[i].isPlaying()) if (channels[i].sock) tot+=channels[i].sock->bytesInPerSec; return tot;}
// -----------------------------------
void ChanMgr::deadHit(GnuID &id, ChanHit &hit)
{
ChanHitList *chl = findHitListByID(id);
if (chl)
chl->deadHit(hit);
}// -----------------------------------ChanHit *ChanMgr::addHit(GnuID &id, ChanHit &h){ if (searchActive) lastHit = sys->getTime();
ChanHitList *hl=NULL;
hl = findHitListByID(id);
if (!hl)
{
ChanInfo info;
info.id = id;
hl = addHitList(info);
}
if (hl) {
return hl->addHit(h); }else return NULL;}// -----------------------------------class ChanFindInfo : public ThreadInfo{public: ChanInfo info; bool keep;};// -----------------------------------THREAD_PROC findAndPlayChannelProc(ThreadInfo *th){ ChanFindInfo *cfi = (ChanFindInfo *)th; ChanInfo info; info = cfi->info; Channel *ch = chanMgr->findChannelByNameID(info); if (!ch) ch = chanMgr->findAndRelay(info); if (ch) {
chanMgr->playChannel(ch->info); if (cfi->keep) ch->stayConnected = cfi->keep; } delete cfi; return 0;}// -----------------------------------void ChanMgr::findAndPlayChannel(ChanInfo &info, bool keep){ ChanFindInfo *cfi = new ChanFindInfo; cfi->info = info; cfi->keep = keep; cfi->func = findAndPlayChannelProc; sys->startThread(cfi);}
// -----------------------------------
void ChanMgr::playChannel(ChanInfo &info)
{
char str[128],fname[128],idStr[128];
sprintf(str,"http://localhost:%d",servMgr->serverHost.port);
info.id.toStr(idStr);
PlayList::TYPE type;
if ((info.contentType == ChanInfo::T_WMA) || (info.contentType == ChanInfo::T_WMV))
{
type = PlayList::T_ASX;
// WMP seems to have a bug where it doesn`t re-read asx files if they have the same name
// so we prepend the channel id to make it unique - NOTE: should be deleted afterwards.
sprintf(fname,"%s.asx",idStr);
}else
{
type = PlayList::T_SCPLS;
sprintf(fname,"play.pls");
}
PlayList *pls = new PlayList(type,1);
pls->addChannel(str,info);
FileStream file;
file.openWriteReplace(fname);
pls->write(file);
file.close();
LOG_DEBUG("Executing: %s",fname);
sys->executeFile(fname);
delete pls;
}
// -----------------------------------void ChanHitList::init(){ info.init(); memset(hits,0,sizeof(ChanHit)*MAX_HITS); lastHitTime = 0;}
// -----------------------------------
ChanHit ChanHitList::findHit(GnuID &sid, bool tracker)
{
ChanHit hit;
hit.init();
for(int i=0; i<MAX_HITS; i++)
if (hits[i].host.ip)
if (hits[i].sessionID.isSame(sid))
if (hits[i].tracker == tracker)
{
hit = hits[i];
break;
}
return hit;
}
// -----------------------------------
void ChanHit::pickNearestIP(Host &h)
{
for(int i=0; i<2; i++)
{
if (h.classType() == rhost[i].classType())
{
host = rhost[i];
break;
}
}
}
// -----------------------------------
void ChanHit::init()
{
host.init();
rhost[0].init();
rhost[1].init();
numListeners = 0;
numRelays = 0;
firewalled = busy = stable = false;
hops = 0;
index = 0;
time = upTime = 0;
agentStr[0]=0;
numSkips=0;
packetID.clear();
maxPreviewTime=0;
tracker=false;
lastContact = 0;
recv = true;
yp=false;
}
// -----------------------------------
void ChanHit::initLocal(int numl,int numr,int nums,int uptm,bool connected,bool b)
{
init();
busy = b;
firewalled = (servMgr->getFirewall() != ServMgr::FW_OFF);
numListeners = numl;
numRelays = numr;
numSkips = nums;
upTime = uptm;
stable = servMgr->totalStreams>0;
strcpy(agentStr,PCX_AGENT);
sessionID = servMgr->sessionID;
recv = connected;
host = servMgr->serverHost;
rhost[0] = Host(host.ip,host.port);
rhost[1] = Host(ClientSocket::getIP(NULL),host.port);
if (firewalled)
rhost[0].port = 0;
}
// -----------------------------------
void ChanHit::writeAtoms(AtomStream &atom,bool tryHost, GnuID &chanID)
{
bool addChan=chanID.isSet();
if (tryHost)
{
atom.writeParent(PCP_HOST,5 + (addChan?1:0));
if (addChan)
atom.writeBytes(PCP_HOST_CHANID,chanID.id,16);
atom.writeInt(PCP_HOST_IP,rhost[0].ip);
atom.writeShort(PCP_HOST_PORT,rhost[0].port);
atom.writeInt(PCP_HOST_IP,rhost[1].ip);
atom.writeShort(PCP_HOST_PORT,rhost[1].port);
atom.writeChar(PCP_HOST_TRACKER,tracker);
}else
{
atom.writeParent(PCP_HOST,13 + (addChan?1:0));
if (addChan)
atom.writeBytes(PCP_HOST_CHANID,chanID.id,16);
atom.writeBytes(PCP_HOST_ID,sessionID.id,16);
atom.writeInt(PCP_HOST_IP,rhost[0].ip);
atom.writeShort(PCP_HOST_PORT,rhost[0].port);
atom.writeInt(PCP_HOST_IP,rhost[1].ip);
atom.writeShort(PCP_HOST_PORT,rhost[1].port);
atom.writeChar(PCP_HOST_BUSY,busy);
atom.writeChar(PCP_HOST_PUSH,firewalled);
atom.writeChar(PCP_HOST_RECV,recv);
atom.writeInt(PCP_HOST_NUML,numListeners);
atom.writeInt(PCP_HOST_NUMR,numRelays);
atom.writeString(PCP_HOST_AGENT,agentStr);
atom.writeInt(PCP_HOST_UPTIME,upTime);
atom.writeChar(PCP_HOST_TRACKER,tracker);
}
}
// -----------------------------------
int ChanHitList::getTotalListeners()
{
int cnt=0;
for(int i=0; i<MAX_HITS; i++)
if (hits[i].host.ip)
cnt+=hits[i].numListeners;
return cnt;
}
// -----------------------------------
int ChanHitList::getTotalRelays()
{
int cnt=0;
for(int i=0; i<MAX_HITS; i++)
if (hits[i].host.ip)
cnt+=hits[i].numRelays;
return cnt;
}
// -----------------------------------
int ChanHitList::getTotalFirewalled()
{
int cnt=0;
for(int i=0; i<MAX_HITS; i++)
if (hits[i].host.ip)
if (hits[i].firewalled)
cnt++;
return cnt;
}
// -----------------------------------
int ChanHitList::contactTrackers(bool connected, int numl, int nums, int uptm)
{
return 0;
}
// -----------------------------------bool ChanHitList::isAvailable() { return (numHits()-numBusy())>0;}// -----------------------------------ChanHit *ChanHitList::addHit(ChanHit &h){ int i;
// dont add our own hits
if (servMgr->sessionID.isSame(h.sessionID))
return NULL;
lastHitTime = sys->getTime(); h.time = lastHitTime; for(i=0; i<MAX_HITS; i++)
{
ChanHit &ch = hits[i];
if ((ch.rhost[0].ip == h.rhost[0].ip) && (ch.rhost[0].port == h.rhost[0].port)) if (((ch.rhost[1].ip == h.rhost[1].ip) && (ch.rhost[1].port == h.rhost[1].port)) || (!ch.rhost[1].isValid()))
{ hits[i] = h; return &hits[i]; }
} for(i=0; i<MAX_HITS; i++) if (hits[i].host.ip == 0) {
hits[i] = h; return &hits[i]; } return NULL;}// -----------------------------------int ChanHitList::clearDeadHits(unsigned int timeout, bool clearTrackers){ int cnt=0; unsigned int ctime = sys->getTime(); for(int i=0; i<MAX_HITS; i++) if (hits[i].host.ip) {
if (((ctime-hits[i].time) > timeout) && (clearTrackers || (!clearTrackers & !hits[i].tracker)))
deadHit(hits[i]);
else
cnt++; } return cnt;}// -----------------------------------void ChanHitList::deadHit(ChanHit &h){ for(int i=0; i<MAX_HITS; i++) if (hits[i].rhost[0].isSame(h.rhost[0]) && hits[i].rhost[1].isSame(h.rhost[1]))
{ hits[i].init();
}}// -----------------------------------int ChanHitList::numHits(){ int cnt=0; for(int i=0; i<MAX_HITS; i++) if (hits[i].host.ip)
cnt++; return cnt;}
// -----------------------------------int ChanHitList::numListeners(){ int cnt=0; for(int i=0; i<MAX_HITS; i++) if (hits[i].host.ip) cnt += hits[i].numListeners; return cnt;}// -----------------------------------
int ChanHitList::numTrackers()
{
int cnt=0;
for(int i=0; i<MAX_HITS; i++)
if ((hits[i].host.ip) && (hits[i].tracker))
cnt++;
return cnt;
}
// -----------------------------------int ChanHitList::numBusy(){ int cnt=0; for(int i=0; i<MAX_HITS; i++) if (hits[i].host.ip)
cnt += hits[i].busy?1:0; return cnt;}// -----------------------------------int ChanHitList::numStable(){ int cnt=0; for(int i=0; i<MAX_HITS; i++) if (hits[i].host.ip)
cnt += hits[i].stable?1:0; return cnt;}// -----------------------------------int ChanHitList::numFirewalled(){ int cnt=0; for(int i=0; i<MAX_HITS; i++) if (hits[i].host.ip)
cnt += hits[i].firewalled?1:0; return cnt;}// -----------------------------------int ChanHitList::closestHit(){ int hop=10000; for(int i=0; i<MAX_HITS; i++) if (hits[i].host.ip) if (hits[i].hops < hop) hop = hits[i].hops; return hop;}// -----------------------------------int ChanHitList::furthestHit(){ int hop=0; for(int i=0; i<MAX_HITS; i++) if (hits[i].host.ip) if (hits[i].hops > hop) hop = hits[i].hops; return hop;}// -----------------------------------unsigned int ChanHitList::newestHit(){ unsigned int time=0; for(int i=0; i<MAX_HITS; i++) if (hits[i].host.ip) if (hits[i].time > time) time = hits[i].time; return time;}// -----------------------------------ChanHit ChanHitList::getHit(Host *sh, unsigned int wait, bool useFirewalled, bool trackersOnly, bool useBusy){
ChanHit best,*bestP=NULL;
best.init();
best.hops = 255;
unsigned int ctime = sys->getTime();
for(int i=0; i<MAX_HITS; i++) { ChanHit *c = &hits[i]; if (c->host.ip)
{
if ((wait==0) || ((ctime-c->lastContact) >= wait))
if ((!c->busy || (c->busy && useBusy)) && (c->hops<best.hops))
{
if (trackersOnly && c->tracker)
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -