📄 servmgr.cpp
字号:
if (sv->type == Servent::T_STREAM)
if (sv->chanID.isSame(cid))
if (sv->outputProtocol == proto)
cnt++;
sv=sv->next;
}
return cnt;
}
// --------------------------------------------------
unsigned int ServMgr::numStreams(ChanInfo::PROTOCOL proto)
{
int cnt = 0;
Servent *sv = servents;
while (sv)
{
if (sv->isConnected())
if (sv->type == Servent::T_STREAM)
if (sv->outputProtocol == proto)
cnt++;
sv=sv->next;
}
return cnt;
}
// --------------------------------------------------
bool ServMgr::getChannel(char *str,ChanInfo &info, bool relay)
{
// remove file extension (only added for winamp)
//char *ext = strstr(str,".");
//if (ext) *ext = 0;
procConnectArgs(str,info);
Channel *ch;
ch = chanMgr->findChannelByNameID(info);
if (ch)
{
if (!ch->isPlaying())
{
if (relay)
ch->info.lastPlayTime = 0; // force reconnect
else
return false;
}
info = ch->info; // get updated channel info
return true;
}else
{
if (relay)
{
ch = chanMgr->findAndRelay(info);
if (ch)
{
info = ch->info; //get updated channel info
return true;
}
}
}
return false;
}
// --------------------------------------------------int ServMgr::findChannel(ChanInfo &info){
#if 0 char idStr[64]; info.id.toStr(idStr); if (info.id.isSet()) { // if we have an ID then try and connect to known hosts carrying channel. ServHost sh = getOutgoingServent(info.id); addOutgoing(sh.host,info.id,true); } GnuPacket pack; XML xml; XML::Node *n = info.createQueryXML(); xml.setRoot(n); pack.initFind(NULL,&xml,servMgr->queryTTL); addReplyID(pack.id); int cnt = broadcast(pack,NULL); LOG_NETWORK("Querying network: %s %s - %d servents",info.name.cstr(),idStr,cnt); return cnt;
#endif
return 0;}// --------------------------------------------------// add outgoing network connection from string (ip:port format)bool ServMgr::addOutgoing(Host h, GnuID &netid, bool pri){
#if 0 if (h.ip) { if (!findServent(h.ip,h.port,netid)) { Servent *sv = allocServent(); if (sv) { if (pri) sv->priorityConnect = true; sv->networkID = netid; sv->initOutgoing(h,Servent::T_OUTGOING); return true; } } }#endif
return false;}// --------------------------------------------------
Servent *ServMgr::findConnection(GnuID &sid)
{
Servent *sv = servents;
while (sv)
{
if (sv->isConnected())
if (sv->remoteID.isSame(sid))
return sv;
sv=sv->next;
}
return NULL;
}
// --------------------------------------------------void ServMgr::procConnectArgs(char *str,ChanInfo &info){ char arg[512]; char curr[256];
char *args = strstr(str,"?");
if (args)
*args++=0;
info.initNameID(str);
if (args) { while (args=nextCGIarg(args,curr,arg)) {
LOG_DEBUG("cmd: %s, arg: %s",curr,arg);
if (strcmp(curr,"sip")==0) // sip - add network connection to client with channel
{ Host h; h.fromStrName(arg,DEFAULT_PORT); if (addOutgoing(h,servMgr->networkID,true)) LOG_NETWORK("Added connection: %s",arg); }else if (strcmp(curr,"pip")==0) // pip - add private network connection to client with channel
{ Host h; h.fromStrName(arg,DEFAULT_PORT); if (addOutgoing(h,info.id,true)) LOG_NETWORK("Added private connection: %s",arg); }else if (strcmp(curr,"ip")==0)
// ip - add hit
{
Host h;
h.fromStrName(arg,DEFAULT_PORT);
ChanHit hit;
hit.init();
hit.host = h;
hit.rhost[0] = h;
hit.rhost[1].init();
chanMgr->addHit(info.id,hit);
}else if (strcmp(curr,"tip")==0)
// tip - add tracker hit
{
Host h;
h.fromStrName(arg,DEFAULT_PORT);
ChanHit hit;
hit.init();
hit.host = h;
hit.rhost[0] = h;
hit.rhost[1].init();
hit.tracker = true;
chanMgr->addHit(info.id,hit);
}
} }}// --------------------------------------------------bool ServMgr::start(){ static ThreadInfo serverThread,clientThread,idleThread,trackerThread; serverThread.func = ServMgr::serverProc; if (!sys->startThread(&serverThread)) return false;// clientThread.func = ServMgr::clientProc;// if (!sys->startThread(&clientThread))// return false; idleThread.func = ServMgr::idleProc; if (!sys->startThread(&idleThread)) return false;
// trackerThread.func = ServMgr::trackerProc;
// if (!sys->startThread(&trackerThread))
// return false;
return true;}// --------------------------------------------------int ServMgr::clientProc(ThreadInfo *thread){
#if 0 thread->lock(); GnuID netID; netID = servMgr->networkID; while(thread->active) { if (servMgr->autoConnect) { if (servMgr->needConnections() || servMgr->forceLookup) { if (servMgr->needHosts() || servMgr->forceLookup) { // do lookup to find some hosts Host lh; lh.fromStrName(servMgr->connectHost,DEFAULT_PORT); if (!servMgr->findServent(lh.ip,lh.port,netID)) { Servent *sv = servMgr->allocServent(); if (sv) { LOG_DEBUG("Lookup: %s",servMgr->connectHost); sv->networkID = netID; sv->initOutgoing(lh,Servent::T_LOOKUP); servMgr->forceLookup = false; } } } for(int i=0; i<MAX_TRYOUT; i++) { if (servMgr->outUsedFull()) break; if (servMgr->tryFull()) break; ServHost sh = servMgr->getOutgoingServent(netID); if (!servMgr->addOutgoing(sh.host,netID,false)) servMgr->deadHost(sh.host,ServHost::T_SERVENT); sys->sleep(servMgr->tryoutDelay); break; } } }else{
#if 0 Servent *s = servMgr->servents; while (s) { if (s->type == Servent::T_OUTGOING) s->thread.active = false; s=s->next; }
#endif } sys->sleepIdle(); } thread->unlock();
#endif return 0;}
// -----------------------------------
bool ServMgr::acceptGIV(ClientSocket *sock)
{
Servent *sv = servents;
while (sv)
{
if (sv->type == Servent::T_COUT)
{
if (sv->acceptGIV(sock))
return true;
}
sv=sv->next;
}
return false;
}
// -----------------------------------
void ServMgr::broadcastPushRequest(ChanHit &hit, Host &to, GnuID &chanID, Servent::TYPE type)
{
ChanPacket pack;
MemoryStream pmem(pack.data,sizeof(pack.data));
AtomStream atom(pmem);
atom.writeParent(PCP_BCST,4);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ALL);
atom.writeBytes(PCP_BCST_DEST,hit.sessionID.id,16);
atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
atom.writeParent(PCP_PUSH,3);
atom.writeInt(PCP_PUSH_IP,to.ip);
atom.writeShort(PCP_PUSH_PORT,to.port);
atom.writeBytes(PCP_PUSH_CHANID,chanID.id,16);
pack.len = pmem.pos;
pack.type = ChanPacket::T_PCP;
GnuID noID;
noID.clear();
int cnt = servMgr->broadcastPacket(pack,noID,servMgr->sessionID,hit.sessionID,type);
LOG_DEBUG("Broadcasted push request to %d %s clients",cnt,Servent::getTypeStr(type));
}
// --------------------------------------------------
void ServMgr::writeRootAtoms(AtomStream &atom, bool getUpdate)
{
atom.writeParent(PCP_ROOT,4 + (getUpdate?1:0));
atom.writeInt(PCP_ROOT_UPDINT,chanMgr->hostUpdateInterval);
atom.writeString(PCP_ROOT_URL,"forum/viewforum.php?f=9");
atom.writeInt(PCP_ROOT_CHECKVER,PCP_CLIENT_VERSION);
atom.writeString(PCP_MESG_ASCII,rootMsg.cstr());
if (getUpdate)
atom.writeParent(PCP_ROOT_UPDATE,0);
}
// --------------------------------------------------
void ServMgr::broadcastRootSettings(bool getUpdate)
{
if (isRoot)
{
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack.data));
AtomStream atom(mem);
atom.writeParent(PCP_BCST,3);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ALL);
atom.writeBytes(PCP_BCST_FROM,sessionID.id,16);
writeRootAtoms(atom,getUpdate);
mem.len = mem.pos;
mem.rewind();
pack.len = mem.len;
GnuID noID;
noID.clear();
broadcastPacket(pack,noID,servMgr->sessionID,noID,Servent::T_CIN);
}
}
// --------------------------------------------------
int ServMgr::broadcastPacket(ChanPacket &pack,GnuID &chanID,GnuID &srcID, GnuID &destID, Servent::TYPE type)
{
int cnt=0;
Servent *sv;
// find destination servent
if (destID.isSet())
{
sv = servents;
while (sv)
{
if (sv->sendPacket(pack,chanID,srcID,destID,type))
return 1;
sv=sv->next;
}
}
// else send to everyone
GnuID noID;
noID.clear();
sv = servents;
while (sv)
{
if (sv->sendPacket(pack,chanID,srcID,noID,type))
cnt++;
sv=sv->next;
}
return cnt;
}
// --------------------------------------------------int ServMgr::idleProc(ThreadInfo *thread){ thread->lock(); unsigned int lastPasvFind=0; unsigned int lastBroadcast=0; // nothing much to do for the first couple of seconds, so just hang around. sys->sleep(2000); unsigned int lastBWcheck=0; unsigned int bytesIn=0,bytesOut=0;
unsigned int lastBroadcastConnect = 0;
while(thread->active) { stats.update();
//if (servMgr->relayBroadcast) // if ((sys->getTime()-lastBroadcast) > servMgr->relayBroadcast) // { // chanMgr->broadcastRelays(NULL,chanMgr->minBroadcastTTL,chanMgr->maxBroadcastTTL); // lastBroadcast = sys->getTime(); // } // auto query network for relays. if (chanMgr->autoQuery) { unsigned int tim = sys->getTime(); if ((tim-chanMgr->lastQuery) > chanMgr->autoQuery) { chanMgr->lastQuery = tim; ChanInfo info; info.init(); // find all channels servMgr->findChannel(info); } }
if (chanMgr->isBroadcasting())
{
unsigned int ctime = sys->getTime();
if ((ctime-lastBroadcastConnect) > 30)
{
servMgr->connectBroadcaster();
lastBroadcastConnect = ctime;
}
}
// clear dead hits
chanMgr->clearDeadHits(servMgr->isRoot);#if 0 // check for too many connections and kill some if needed. //if (!servMgr->isRelay) { if (servMgr->pubInOver()) { Servent *s=servMgr->findOldestServent(Servent::T_INCOMING,false); if (s) s->thread.active = false; } if (servMgr->outOver()) { Servent *s=servMgr->findOldestServent(Servent::T_OUTGOING,false); if (s) s->thread.active = false; } }#endif if (servMgr->shutdownTimer) { if (--servMgr->shutdownTimer <= 0) { debugtest = 1; peercastInst->saveSettings(); sys->exit(); } } sys->sleep(500); } thread->unlock(); return 0;}// --------------------------------------------------int ServMgr::serverProc(ThreadInfo *thread){ thread->lock(); Servent *serv = servMgr->allocServent(); Servent *serv2 = servMgr->allocServent(); unsigned int lastLookupTime=0; while (thread->active) { if (servMgr->restartServer) { serv->abort(); // force close serv2->abort(); // force close servMgr->restartServer = false; } if (servMgr->autoServe) { serv->allow = servMgr->allowServer1; serv2->allow = servMgr->allowServer2; // force lookup every 24 hours to check for new messages if ((sys->getTime()-lastLookupTime) > 60*60*24) { servMgr->forceLookup = true; lastLookupTime = sys->getTime(); } if ((!serv->sock) || (!serv2->sock)) { LOG_DEBUG("Starting servers"); servMgr->forceLookup = true; //if (servMgr->serverHost.ip != 0) { if (servMgr->forceNormal) servMgr->setFirewall(ServMgr::FW_OFF); else servMgr->setFirewall(ServMgr::FW_UNKNOWN); Host h = servMgr->serverHost; if (!serv->sock) serv->initServer(h); h.port++; if (!serv2->sock) serv2->initServer(h); } } }else{ // stop server serv->abort(); // force close serv2->abort(); // force close // cancel incoming connectuions Servent *s = servMgr->servents; while (s) { if (s->type == Servent::T_INCOMING) s->thread.active = false; s=s->next; } servMgr->setFirewall(ServMgr::FW_ON); } sys->sleepIdle(); } thread->unlock(); return 0;}// -----------------------------------XML::Node *ServMgr::createServentXML(){ return new XML::Node("servent agent=\"%s\" ",PCX_AGENT);}// --------------------------------------------------const char *ServHost::getTypeStr(TYPE t){ switch(t) { case T_NONE: return "NONE"; case T_STREAM: return "STREAM"; case T_CHANNEL: return "CHANNEL"; case T_SERVENT: return "SERVENT"; case T_TRACKER: return "TRACKER";
} return "UNKNOWN";}// --------------------------------------------------ServHost::TYPE ServHost::getTypeFromStr(const char *s){ if (stricmp(s,"NONE")==0) return T_NONE; else if (stricmp(s,"SERVENT")==0) return T_SERVENT; else if (stricmp(s,"STREAM")==0) return T_STREAM; else if (stricmp(s,"CHANNEL")==0) return T_CHANNEL; else if (stricmp(s,"TRACKER")==0)
return T_TRACKER;
return T_NONE;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -