📄 channel.cpp.svn-base
字号:
{
ChanHitList *chl = chanMgr->findHitListByID(info.id);
int numHits = 0;
if (chl)
numHits = chl->numHits();
sprintf(buf,"%d",numHits);
}else
return false;
out.writeString(buf);
return true;
}
// -----------------------------------
void ChanMgr::broadcastTrackerUpdate(GnuID &svID, bool force)
{
Channel *c = channel;
while(c)
{
if ( c->isActive() && c->isBroadcasting() )
c->broadcastTrackerUpdate(svID,force);
c=c->next;
}
}
// -----------------------------------
int ChanMgr::broadcastPacketUp(ChanPacket &pack,GnuID &chanID, GnuID &srcID, GnuID &destID)
{
int cnt=0;
Channel *c = channel;
while(c)
{
if (c->sendPacketUp(pack,chanID,srcID,destID))
cnt++;
c=c->next;
}
return cnt;
}
// -----------------------------------void ChanMgr::broadcastRelays(Servent *serv, int minTTL, int maxTTL){ //if ((servMgr->getFirewall() == ServMgr::FW_OFF) || servMgr->serverHost.localIP()) { Host sh = servMgr->serverHost; bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF); bool busy = (servMgr->pubInFull() && servMgr->outFull()) || servMgr->relaysFull(); bool stable = servMgr->totalStreams>0;
GnuPacket hit; int numChans=0; Channel *c = channel;
while(c)
{ if (c->isPlaying()) {
bool tracker = c->isBroadcasting();
int ttl = (c->info.getUptime() / servMgr->relayBroadcast); // 1 hop per N seconds if (ttl < minTTL) ttl = minTTL; if (ttl > maxTTL) ttl = maxTTL; if (hit.initHit(sh,c,NULL,push,busy,stable,tracker,ttl)) { int numOut=0; numChans++; if (serv) { serv->outputPacket(hit,false); numOut++; } LOG_NETWORK("Sent channel to %d servents, TTL %d",numOut,ttl); } }
c=c->next; } //if (numChans) // LOG_NETWORK("Sent %d channels to %d servents",numChans,numOut); }}// -----------------------------------
void ChanMgr::setUpdateInterval(unsigned int v)
{
hostUpdateInterval = v;
}
// -----------------------------------
// message check
#if 0
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack.data));
AtomStream atom(mem);
atom.writeParent(PCP_BCST,3);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ALL);
atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
atom.writeParent(PCP_MESG,1);
atom.writeString(PCP_MESG_DATA,msg.cstr());
mem.len = mem.pos;
mem.rewind();
pack.len = mem.len;
GnuID noID;
noID.clear();
BroadcastState bcs;
PCPStream::readAtom(atom,bcs);
//int cnt = servMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
//int cnt = servMgr->broadcastPacketDown(pack,noID,servMgr->sessionID);
//int cnt = chanMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
//LOG_DEBUG("Sent message to %d clients",cnt);
#endif
// -----------------------------------void ChanMgr::setBroadcastMsg(String &msg){
if (!msg.isSame(broadcastMsg)) { broadcastMsg = msg; Channel *c = channel;
while(c)
{
if (c->isActive() && c->isBroadcasting())
{
ChanInfo newInfo = c->info; newInfo.comment = broadcastMsg; c->updateInfo(newInfo);
}
c=c->next; } }}
// -----------------------------------void ChanMgr::clearHitLists(){
while (hitlist)
{
peercastApp->delChannel(&hitlist->info);
ChanHitList *next = hitlist->next;
delete hitlist;
hitlist = next;
}
}
// -----------------------------------
Channel *ChanMgr::deleteChannel(Channel *delchan)
{
lock.on();
Channel *ch = channel,*prev=NULL,*next=NULL;
while (ch)
{
if (ch == delchan)
{
Channel *next = ch->next;
if (prev)
prev->next = next;
else
channel = next;
delete delchan;
break;
}
prev = ch;
ch=ch->next;
}
lock.off();
return next;
}// -----------------------------------Channel *ChanMgr::createChannel(ChanInfo &info, const char *mount){ lock.on();
Channel *nc=NULL;
nc = new Channel();
nc->next = channel;
channel = nc;
nc->info = info; nc->info.lastPlayStart = 0; nc->info.lastPlayEnd = 0;
nc->info.status = ChanInfo::S_UNKNOWN; if (mount) nc->mount.set(mount); nc->setStatus(Channel::S_WAIT); nc->type = Channel::T_ALLOCATED;
nc->info.createdTime = sys->getTime();
LOG_CHANNEL("New channel created");
lock.off(); return nc;}
// -----------------------------------
int ChanMgr::pickHits(ChanHitSearch &chs)
{
ChanHitList *chl = hitlist;
while(chl)
{
if (chl->isUsed())
if (chl->pickHits(chs))
{
chl->info.id;
return 1;
}
chl = chl->next;
}
return 0;
}
// -----------------------------------ChanHitList *ChanMgr::findHitList(ChanInfo &info){
ChanHitList *chl = hitlist;
while(chl)
{
if (chl->isUsed()) if (chl->info.matchNameID(info)) return chl;
chl = chl->next;
} return NULL;}// -----------------------------------
ChanHitList *ChanMgr::findHitListByID(GnuID &id)
{
ChanHitList *chl = hitlist;
while(chl)
{
if (chl->isUsed())
if (chl->info.id.isSame(id))
return chl;
chl = chl->next;
}
return NULL;
}
// -----------------------------------int ChanMgr::numHitLists(){ int num=0; ChanHitList *chl = hitlist;
while(chl)
{
if (chl->isUsed()) num++;
chl = chl->next;
} return num;}// -----------------------------------ChanHitList *ChanMgr::addHitList(ChanInfo &info){
ChanHitList *chl = new ChanHitList();
chl->next = hitlist;
hitlist = chl;
chl->used = true;
chl->info = info; chl->info.createdTime = sys->getTime();
peercastApp->addChannel(&chl->info);
return chl;}// -----------------------------------void ChanMgr::clearDeadHits(bool clearTrackers){ unsigned int interval;
if (servMgr->isRoot)
interval = 1200; // mainly for old 0.119 clients
else
interval = hostUpdateInterval+30;
ChanHitList *chl = hitlist,*prev = NULL;
while (chl)
{ if (chl->isUsed())
{
if (chl->clearDeadHits(interval,clearTrackers) == 0) {
if (!isBroadcasting(chl->info.id))
{
if (!chanMgr->findChannelByID(chl->info.id))
{ peercastApp->delChannel(&chl->info);
ChanHitList *next = chl->next;
if (prev)
prev->next = next;
else
hitlist = next;
delete chl;
chl = next;
continue; }
} }
}
prev = chl;
chl = chl->next;
}}// -----------------------------------
bool ChanMgr::isBroadcasting(GnuID &id)
{
Channel *ch = findChannelByID(id);
if (ch)
return ch->isBroadcasting();
return false;
}
// -----------------------------------
bool ChanMgr::isBroadcasting()
{
Channel *ch = channel;
while (ch)
{
if (ch->isActive())
if (ch->isBroadcasting())
return true;
ch = ch->next;
}
return false;
}
// -----------------------------------int ChanMgr::numChannels(){ int tot = 0; Channel *ch = channel;
while (ch)
{
if (ch->isActive()) tot++; ch = ch->next;
}
return tot;}// -----------------------------------
void ChanMgr::deadHit(ChanHit &hit)
{
ChanHitList *chl = findHitListByID(hit.chanID);
if (chl)
chl->deadHit(hit);
}// -----------------------------------
void ChanMgr::delHit(ChanHit &hit)
{
ChanHitList *chl = findHitListByID(hit.chanID);
if (chl)
chl->delHit(hit);
}
// -----------------------------------
void ChanMgr::addHit(Host &h,GnuID &id,bool tracker)
{
ChanHit hit;
hit.init();
hit.host = h;
hit.rhost[0] = h;
hit.rhost[1].init();
hit.tracker = tracker;
hit.recv = true;
hit.chanID = id;
addHit(hit);
}
// -----------------------------------ChanHit *ChanMgr::addHit(ChanHit &h){ if (searchActive) lastHit = sys->getTime();
ChanHitList *hl=NULL;
hl = findHitListByID(h.chanID);
if (!hl)
{
ChanInfo info;
info.id = h.chanID;
hl = addHitList(info);
}
if (hl)
{ return hl->addHit(h);
}else
return NULL;}// -----------------------------------class ChanFindInfo : public ThreadInfo{public: ChanInfo info; bool keep;};// -----------------------------------THREAD_PROC findAndPlayChannelProc(ThreadInfo *th){ ChanFindInfo *cfi = (ChanFindInfo *)th; ChanInfo info; info = cfi->info;
Channel *ch = chanMgr->findChannelByNameID(info);
chanMgr->currFindAndPlayChannel = info.id;
if (!ch) ch = chanMgr->findAndRelay(info); if (ch) {
// check that a different channel hasn`t be selected already.
if (chanMgr->currFindAndPlayChannel.isSame(ch->info.id))
chanMgr->playChannel(ch->info);
if (cfi->keep) ch->stayConnected = cfi->keep; } delete cfi; return 0;}// -----------------------------------void ChanMgr::findAndPlayChannel(ChanInfo &info, bool keep){ ChanFindInfo *cfi = new ChanFindInfo; cfi->info = info; cfi->keep = keep; cfi->func = findAndPlayChannelProc;
sys->startThread(cfi);}
// -----------------------------------
void ChanMgr::playChannel(ChanInfo &info)
{
char str[128],fname[256],idStr[128];
sprintf(str,"http://localhost:%d",servMgr->serverHost.port);
info.id.toStr(idStr);
PlayList::TYPE type;
if ((info.contentType == ChanInfo::T_WMA) || (info.contentType == ChanInfo::T_WMV))
{
type = PlayList::T_ASX;
// WMP seems to have a bug where it doesn`t re-read asx files if they have the same name
// so we prepend the channel id to make it unique - NOTE: should be deleted afterwards.
sprintf(fname,"%s/%s.asx",peercastApp->getPath(),idStr);
}else if (info.contentType == ChanInfo::T_OGM)
{
type = PlayList::T_RAM;
sprintf(fname,"%s/play.ram",peercastApp->getPath());
}else
{
type = PlayList::T_SCPLS;
sprintf(fname,"%s/play.pls",peercastApp->getPath());
}
PlayList *pls = new PlayList(type,1);
pls->addChannel(str,info);
LOG_DEBUG("Writing %s",fname);
FileStream file;
file.openWriteReplace(fname);
pls->write(file);
file.close();
LOG_DEBUG("Executing: %s",fname);
sys->executeFile(fname);
delete pls;
}
// -----------------------------------ChanHitList::ChanHitList(){ info.init(); lastHitTime = 0;
used = false;
hit = NULL;
}
// -----------------------------------
ChanHitList::~ChanHitList()
{
while (hit)
hit = deleteHit(hit);
}
// -----------------------------------
void ChanHit::pickNearestIP(Host &h)
{
for(int i=0; i<2; i++)
{
if (h.classType() == rhost[i].classType())
{
host = rhost[i];
break;
}
}
}
// -----------------------------------
void ChanHit::init()
{
host.init();
rhost[0].init();
rhost[1].init();
next = NULL;
numListeners = 0;
numRelays = 0;
dead = tracker = firewalled = stable = yp = false;
recv = cin = direct = relay = true;
direct = 0;
numHops = 0;
time = upTime = 0;
lastContact = 0;
version = 0;
sessionID.clear();
chanID.clear();
oldestPos = newestPos = 0;
}
// -----------------------------------
void ChanHit::initLocal(int numl,int numr,int,int uptm,bool connected,unsigned int oldp,unsigned int newp)
{
init();
firewalled = (servMgr->getFirewall() != ServMgr::FW_OFF);
numListeners = numl;
numRelays = numr;
upTime = uptm;
stable = servMgr->totalStreams>0;
sessionID = servMgr->sessionID;
recv = connected;
direct = !servMgr->directFull();
relay = !servMgr->relaysFull();
cin = !servMgr->controlInFull();
host = servMgr->serverHost;
version = PCP_CLIENT_VERSION;
rhost[0] = Host(host.ip,host.port);
rhost[1] = Host(ClientSocket::getIP(NULL),host.port);
if (firewalled)
rhost[0].port = 0;
oldestPos = oldp;
newestPos = newp;
}
// -----------------------------------
void ChanHit::writeAtoms(AtomStream &atom,GnuID &chanID)
{
bool addChan=chanID.isSet();
int fl1 = 0;
if (recv) fl1 |= PCP_HOST_FLAGS1_RECV;
if (relay) fl1 |= PCP_HOST_FLAGS1_RELAY;
if (direct) fl1 |= PCP_HOST_FLAGS1_DIRECT;
if (cin) fl1 |= PCP_HOST_FLAGS1_CIN;
if (tracker) fl1 |= PCP_HOST_FLAGS1_TRACKER;
if (firewalled) fl1 |= PCP_HOST_FLAGS1_PUSH;
atom.writeParent(PCP_HOST,12 + (addChan?1:0));
if (addChan)
atom.writeBytes(PCP_HOST_CHANID,chanID.id,16);
atom.writeBytes(PCP_HOST_ID,sessionID.id,16);
atom.writeInt(PCP_HOST_IP,rhost[0].ip);
atom.writeShort(PCP_HOST_PORT,rhost[0].port);
atom.writeInt(PCP_HOST_IP,rhost[1].ip);
atom.writeShort(PCP_HOST_PORT,rhost[1].port);
atom.writeInt(PCP_HOST_NUML,numListeners);
atom.writeInt(PCP_HOST_NUMR,numRelays);
atom.writeInt(PCP_HOST_UPTIME,upTime);
atom.writeInt(PCP_HOST_VERSION,version);
atom.writeChar(PCP_HOST_FLAGS1,fl1);
atom.writeInt(PCP_HOST_OLDPOS,oldestPos);
atom.writeInt(PCP_HOST_NEWPOS,newestPos);
}
// -----------------------------------
bool ChanHit::writeVariable(Stream &out, const String &var)
{
char buf[1024];
if (var == "rhost0")
rhost[0].toStr(buf);
else if (var == "rhost1")
rhost[1].toStr(buf);
else if (var == "numHops")
sprintf(buf,"%d",numHops);
else if (var == "numListeners")
sprintf(buf,"%d",numListeners);
else if (var == "numRelays")
sprintf(buf,"%d",numRelays);
else if (var == "uptime")
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -