📄 channel.cpp.svn-base
字号:
utf8 = info.track.contact;
utf8.convertTo(String::T_UNICODESAFE);
strcpy(buf,utf8.cstr());
}else if (var == "contactURL")
sprintf(buf,"%s",info.url.cstr());
else if (var == "streamPos")
sprintf(buf,"%d",streamPos);
else if (var == "sourceType")
strcpy(buf,getSrcTypeStr());
else if (var == "sourceProtocol")
strcpy(buf,ChanInfo::getProtocolStr(info.srcProtocol));
else if (var == "sourceURL")
{
if (sourceURL.isEmpty())
sourceHost.host.toStr(buf);
else
strcpy(buf,sourceURL.cstr());
}
else if (var == "headPos")
sprintf(buf,"%d",headPack.pos);
else if (var == "headLen")
sprintf(buf,"%d",headPack.len);
else if (var == "numHits")
{
ChanHitList *chl = chanMgr->findHitListByID(info.id);
int numHits = 0;
if (chl)
numHits = chl->numHits();
sprintf(buf,"%d",numHits);
}else
return false;
out.writeString(buf);
return true;
}
// -----------------------------------
void ChanMgr::broadcastTrackerUpdate(GnuID &svID, bool force)
{
for(int i=0; i<MAX_CHANNELS; i++)
{
Channel *c = &channels[i];
if ( c->isActive() && c->isBroadcasting() )
c->broadcastTrackerUpdate(svID,force);
}
}
// -----------------------------------
int ChanMgr::broadcastPacketUp(ChanPacket &pack,GnuID &chanID, GnuID &srcID, GnuID &destID)
{
int cnt=0;
for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].sendPacketUp(pack,chanID,srcID,destID))
cnt++;
return cnt;
}
// -----------------------------------void ChanMgr::broadcastRelays(Servent *serv, int minTTL, int maxTTL){ //if ((servMgr->getFirewall() == ServMgr::FW_OFF) || servMgr->serverHost.localIP()) { Host sh = servMgr->serverHost; bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF); bool busy = (servMgr->pubInFull() && servMgr->outFull()) || servMgr->relaysFull(); bool stable = servMgr->totalStreams>0;
GnuPacket hit; int numChans=0; for(int i=0; i<MAX_CHANNELS; i++) { Channel *c = &channels[i]; if (c->isPlaying()) {
bool tracker = c->isBroadcasting();
int ttl = (c->info.getUptime() / servMgr->relayBroadcast); // 1 hop per N seconds if (ttl < minTTL) ttl = minTTL; if (ttl > maxTTL) ttl = maxTTL; if (hit.initHit(sh,c,NULL,push,busy,stable,tracker,ttl)) { int numOut=0; numChans++; if (serv) { serv->outputPacket(hit,false); numOut++; } LOG_NETWORK("Sent ch.%d to %d servents, TTL %d",c->index,numOut,ttl); } } } //if (numChans) // LOG_NETWORK("Sent %d channels to %d servents",numChans,numOut); }}// -----------------------------------
void ChanMgr::setUpdateInterval(unsigned int v)
{
hostUpdateInterval = v;
}
// -----------------------------------
// message check
#if 0
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack.data));
AtomStream atom(mem);
atom.writeParent(PCP_BCST,3);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ALL);
atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
atom.writeParent(PCP_MESG,1);
atom.writeString(PCP_MESG_DATA,msg.cstr());
mem.len = mem.pos;
mem.rewind();
pack.len = mem.len;
GnuID noID;
noID.clear();
BroadcastState bcs;
PCPStream::readAtom(atom,bcs);
//int cnt = servMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
//int cnt = servMgr->broadcastPacketDown(pack,noID,servMgr->sessionID);
//int cnt = chanMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
//LOG_DEBUG("Sent message to %d clients",cnt);
#endif
// -----------------------------------void ChanMgr::setBroadcastMsg(String &msg){
if (!msg.isSame(broadcastMsg)) { broadcastMsg = msg; for(int i=0; i<MAX_CHANNELS; i++) { Channel *c = &channels[i]; if (c->isActive() && c->isBroadcasting())
{
ChanInfo newInfo = c->info; newInfo.comment = broadcastMsg; c->updateInfo(newInfo);
} } }}
// -----------------------------------void ChanMgr::clearHitLists(){ for(int i=0; i<MAX_HITLISTS; i++) { peercastApp->delChannel(&hitlists[i].info); hitlists[i].init(); }}// -----------------------------------Channel *ChanMgr::createChannel(ChanInfo &info, const char *mount){ lock.on();
int i;
Channel *nc=NULL; for(i=0; i<MAX_CHANNELS; i++) { Channel *c = &channels[i]; if (!c->isActive())
{
nc=c;
break;
}
}
if (!nc)
{
for(i=0; i<MAX_CHANNELS; i++)
{
Channel *c = &channels[i];
if (!c->isPlaying())
{
c->close();
for(int j=0; j<100; j++) // wait 10 seconds
{
if (!c->isActive())
{
nc = c;
break;
}
sys->sleep(100);
}
if (nc)
break;
}
}
}
if (nc)
{ nc->info = info; nc->info.lastPlayStart = 0; nc->info.lastPlayEnd = 0;
nc->info.status = ChanInfo::S_UNKNOWN; if (mount) nc->mount.set(mount); nc->index = i+1; nc->setStatus(Channel::S_WAIT); nc->type = Channel::T_ALLOCATED;
nc->info.createdTime = sys->getTime();
LOG_CHANNEL("New channel (%d) created",nc->index);
lock.off(); return nc; }else
{ LOG_ERROR("Unable to create channel");
lock.off(); return NULL;
}}
// -----------------------------------
int ChanMgr::pickHits(ChanHitSearch &chs)
{
for(int i=0; i<MAX_HITLISTS; i++)
if (hitlists[i].isUsed())
if (hitlists[i].pickHits(chs))
{
hitlists[i].info.id;
return 1;
}
return 0;
}
// -----------------------------------ChanHitList *ChanMgr::findHitList(ChanInfo &info){
for(int i=0; i<MAX_HITLISTS; i++) if (hitlists[i].isUsed()) if (hitlists[i].info.matchNameID(info)) return &hitlists[i]; return NULL;}// -----------------------------------
ChanHitList *ChanMgr::findHitListByID(GnuID &id)
{
for(int i=0; i<MAX_HITLISTS; i++)
if (hitlists[i].isUsed())
if (hitlists[i].info.id.isSame(id))
return &hitlists[i];
return NULL;
}
// -----------------------------------int ChanMgr::numHitLists(){ int num=0; for(int i=0; i<MAX_HITLISTS; i++) if (hitlists[i].isUsed()) num++; return num;}// -----------------------------------ChanHitList *ChanMgr::addHitList(ChanInfo &info){ ChanHitList *hl = NULL; for(int i=0; i<MAX_HITLISTS; i++) if (!hitlists[i].isUsed()) {
char idstr[64];
info.id.toStr(idstr);
LOG_DEBUG("Created new hitlist: %s",idstr); hl = &hitlists[i];
break; } if (hl) {
hl->used = true;
hl->info = info; hl->info.createdTime = sys->getTime();
peercastApp->addChannel(&hl->info);
} return hl;}// -----------------------------------void ChanMgr::clearDeadHits(bool clearTrackers){ unsigned int interval;
if (servMgr->isRoot)
interval = 1200; // mainly for old 0.119 clients
else
interval = hostUpdateInterval+30;
for(int i=0; i<MAX_HITLISTS; i++) if (hitlists[i].isUsed())
{
if (hitlists[i].clearDeadHits(interval,clearTrackers) == 0) {
if (!isBroadcasting(hitlists[i].info.id))
{
if (!chanMgr->findChannelByID(hitlists[i].info.id))
{ LOG_DEBUG("Deleting hitlist");
peercastApp->delChannel(&hitlists[i].info); hitlists[i].init();
}
} }
}}// -----------------------------------
bool ChanMgr::isBroadcasting(GnuID &id)
{
Channel *ch = findChannelByID(id);
if (ch)
return ch->isBroadcasting();
return false;
}
// -----------------------------------
bool ChanMgr::isBroadcasting()
{
for(int i=0; i<MAX_CHANNELS; i++)
if (channels[i].isActive())
if (channels[i].isBroadcasting())
return true;
return false;
}
// -----------------------------------int ChanMgr::numChannels(){ int tot = 0; for(int i=0; i<MAX_CHANNELS; i++) if (channels[i].isActive()) tot++; return tot;}// -----------------------------------
void ChanMgr::deadHit(ChanHit &hit)
{
ChanHitList *chl = findHitListByID(hit.chanID);
if (chl)
chl->deadHit(hit);
}// -----------------------------------
void ChanMgr::delHit(ChanHit &hit)
{
ChanHitList *chl = findHitListByID(hit.chanID);
if (chl)
chl->delHit(hit);
}
// -----------------------------------ChanHit *ChanMgr::addHit(ChanHit &h){ if (searchActive) lastHit = sys->getTime();
ChanHitList *hl=NULL;
hl = findHitListByID(h.chanID);
if (!hl)
{
ChanInfo info;
info.id = h.chanID;
hl = addHitList(info);
}
if (hl)
{ return hl->addHit(h);
}else
return NULL;}// -----------------------------------class ChanFindInfo : public ThreadInfo{public: ChanInfo info; bool keep;};// -----------------------------------THREAD_PROC findAndPlayChannelProc(ThreadInfo *th){ ChanFindInfo *cfi = (ChanFindInfo *)th; ChanInfo info; info = cfi->info;
Channel *ch = chanMgr->findChannelByNameID(info);
chanMgr->currFindAndPlayChannel = info.id;
if (!ch) ch = chanMgr->findAndRelay(info); if (ch) {
// check that a different channel hasn`t be selected already.
if (chanMgr->currFindAndPlayChannel.isSame(ch->info.id))
chanMgr->playChannel(ch->info);
if (cfi->keep) ch->stayConnected = cfi->keep; } delete cfi; return 0;}// -----------------------------------void ChanMgr::findAndPlayChannel(ChanInfo &info, bool keep){ ChanFindInfo *cfi = new ChanFindInfo; cfi->info = info; cfi->keep = keep; cfi->func = findAndPlayChannelProc;
sys->startThread(cfi);}
// -----------------------------------
void ChanMgr::playChannel(ChanInfo &info)
{
char str[128],fname[256],idStr[128];
sprintf(str,"http://localhost:%d",servMgr->serverHost.port);
info.id.toStr(idStr);
PlayList::TYPE type;
if ((info.contentType == ChanInfo::T_WMA) || (info.contentType == ChanInfo::T_WMV))
{
type = PlayList::T_ASX;
// WMP seems to have a bug where it doesn`t re-read asx files if they have the same name
// so we prepend the channel id to make it unique - NOTE: should be deleted afterwards.
sprintf(fname,"%s/%s.asx",peercastApp->getPath(),idStr);
}else
{
type = PlayList::T_SCPLS;
sprintf(fname,"%s/play.pls",peercastApp->getPath());
}
PlayList *pls = new PlayList(type,1);
pls->addChannel(str,info);
LOG_DEBUG("Writing %s",fname);
FileStream file;
file.openWriteReplace(fname);
pls->write(file);
file.close();
LOG_DEBUG("Executing: %s",fname);
sys->executeFile(fname);
delete pls;
}
// -----------------------------------void ChanHitList::init(){ info.init(); memset(hits,0,sizeof(ChanHit)*MAX_HITS); lastHitTime = 0;
used = false;
}
// -----------------------------------
void ChanHit::pickNearestIP(Host &h)
{
for(int i=0; i<2; i++)
{
if (h.classType() == rhost[i].classType())
{
host = rhost[i];
break;
}
}
}
// -----------------------------------
void ChanHit::init()
{
host.init();
rhost[0].init();
rhost[1].init();
numListeners = 0;
numRelays = 0;
dead = tracker = firewalled = stable = yp = false;
recv = cin = direct = relay = true;
direct = 0;
numHops = 0;
time = upTime = 0;
agentStr[0]=0;
lastContact = 0;
version = 0;
sessionID.clear();
chanID.clear();
}
// -----------------------------------
void ChanHit::initLocal(int numl,int numr,int,int uptm,bool connected)
{
init();
firewalled = (servMgr->getFirewall() != ServMgr::FW_OFF);
numListeners = numl;
numRelays = numr;
upTime = uptm;
stable = servMgr->totalStreams>0;
strcpy(agentStr,PCX_AGENT);
sessionID = servMgr->sessionID;
recv = connected;
direct = !servMgr->directFull();
relay = !servMgr->relaysFull();
cin = !servMgr->controlInFull();
host = servMgr->serverHost;
rhost[0] = Host(host.ip,host.port);
rhost[1] = Host(ClientSocket::getIP(NULL),host.port);
if (firewalled)
rhost[0].port = 0;
}
// -----------------------------------
void ChanHit::writeAtoms(AtomStream &atom,bool tryHost, GnuID &chanID)
{
bool addChan=chanID.isSet();
if (tryHost)
{
atom.writeParent(PCP_HOST,5 + (addChan?1:0));
if (addChan)
atom.writeBytes(PCP_HOST_CHANID,chanID.id,16);
atom.writeInt(PCP_HOST_IP,rhost[0].ip);
atom.writeShort(PCP_HOST_PORT,rhost[0].port);
atom.writeInt(PCP_HOST_IP,rhost[1].ip);
atom.writeShort(PCP_HOST_PORT,rhost[1].port);
atom.writeChar(PCP_HOST_TRACKER,tracker);
}else
{
atom.writeParent(PCP_HOST,15 + (addChan?1:0));
if (addChan)
atom.writeBytes(PCP_HOST_CHANID,chanID.id,16);
atom.writeBytes(PCP_HOST_ID,sessionID.id,16);
atom.writeInt(PCP_HOST_IP,rhost[0].ip);
atom.writeShort(PCP_HOST_PORT,rhost[0].port);
atom.writeInt(PCP_HOST_IP,rhost[1].ip);
atom.writeShort(PCP_HOST_PORT,rhost[1].port);
atom.writeInt(PCP_HOST_NUML,numListeners);
atom.writeInt(PCP_HOST_NUMR,numRelays);
atom.writeInt(PCP_HOST_UPTIME,upTime);
atom.writeInt(PCP_HOST_VERSION,PCP_CLIENT_VERSION);
// depreciated
atom.writeString(PCP_HOST_AGENT,agentStr);
atom.writeChar(PCP_HOST_BUSY,!relay);
atom.writeChar(PCP_HOST_PUSH,firewalled);
atom.writeChar(PCP_HOST_RECV,recv);
atom.writeChar(PCP_HOST_TRACKER,tracker);
int fl1 = 0;
if (recv) fl1 |= PCP_HOST_FLAGS1_RECV;
if (relay) fl1 |= PCP_HOST_FLAGS1_RELAY;
if (direct) fl1 |= PCP_HOST_FLAGS1_DIRECT;
if (cin) fl1 |= PCP_HOST_FLAGS1_CIN;
if (tracker) fl1 |= PCP_HOST_FLAGS1_TRACKER;
if (firewalled) fl1 |= PCP_HOST_FLAGS1_PUSH;
atom.writeChar(PCP_HOST_FLAGS1,fl1);
}
}
// -----------------------------------
bool ChanHit::writeVariable(Stream &out, const String &var)
{
char buf[1024];
if (var == "rhost0")
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -