📄 servent.cpp.svn-base
字号:
break; case GnuStream::R_ACCEPTED: stats.add(Stats::NUMACCEPTED); break; case GnuStream::R_DUPLICATE: stats.add(Stats::NUMDUP); break; case GnuStream::R_DEAD: stats.add(Stats::NUMDEAD); break; case GnuStream::R_DISCARD: stats.add(Stats::NUMDISCARDED); break; case GnuStream::R_BADVERSION: stats.add(Stats::NUMOLD); break; case GnuStream::R_DROP: stats.add(Stats::NUMDROPPED); break; } LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, v%05x from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,pack.id.getVersion(),ipstr); }else{ LOG_ERROR("Bad packet"); } } GnuPacket *p; if ((p=outPacketsPri.curr())) // priority packet { gnuStream.sendPacket(*p); seenIDs.addGnuID(p->id); outPacketsPri.next(); } else if ((p=outPacketsNorm.curr())) // or.. normal packet { gnuStream.sendPacket(*p); seenIDs.addGnuID(p->id); outPacketsNorm.next(); } int lpt = sys->getTime()-lastPacket; if (!doneBigPing) { if ((sys->getTime()-lastPing) > 15) { gnuStream.ping(7); lastPing = sys->getTime(); doneBigPing = true; } }else{ if (lpt > packetTimeoutSecs) { if ((sys->getTime()-lastPing) > packetTimeoutSecs) { gnuStream.ping(1); lastPing = sys->getTime(); } } } if (lpt > abortTimeoutSecs) throw TimeoutException(); unsigned int totIn = sock->totalBytesIn-lastTotalIn; unsigned int totOut = sock->totalBytesOut-lastTotalOut; unsigned int bytes = totIn+totOut; lastTotalIn = sock->totalBytesIn; lastTotalOut = sock->totalBytesOut; const int serventBandwidth = 1000;
int delay = sys->idleSleepTime; if ((bytes) && (serventBandwidth >= 8)) delay = (bytes*1000)/(serventBandwidth/8); // set delay relative packetsize if (delay < (int)sys->idleSleepTime) delay = sys->idleSleepTime; //LOG("delay %d, in %d, out %d",delay,totIn,totOut); sys->sleep(delay); }}
// -----------------------------------void Servent::processRoot(){ try { gnuStream.init(sock); setStatus(S_CONNECTED); gnuStream.ping(2); while (thread.active && sock->active()) { if (gnuStream.readPacket(pack)) { char ipstr[64]; sock->host.toStr(ipstr); unsigned int ver = pack.id.getVersion(); servMgr->addVersion(ver); LOG_NETWORK("packet in: %d v%05x from %s",pack.func,pack.getVersion(),ipstr); //if (pack.id.getVersion() < MIN_PACKETVER) // break; if (pack.func == 0) // if ping then pong back some hosts and close { Host hl[32]; int cnt = servMgr->getNewestServents(hl,32,sock->host); if (cnt) { int start = sys->rnd() % cnt; for(int i=0; i<8; i++) { GnuPacket pong; pack.hops = 1; pong.initPong(hl[start],false,pack); gnuStream.sendPacket(pong); char ipstr[64]; hl[start].toStr(ipstr); //LOG_NETWORK("Pong %d: %s",start+1,ipstr); start = (start+1) % cnt; } char str[64]; sock->host.toStr(str); LOG_NETWORK("Sent 8 pongs to %s",str); }else { LOG_NETWORK("No Pongs to send"); //return; } }else if (pack.func == 1) // pong? { MemoryStream pong(pack.data,pack.len); int ip,port; port = pong.readShort(); ip = pong.readLong(); ip = SWAP4(ip); LOG_NETWORK("pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port); if ((ip) && (port)) { Host h(ip,port); servMgr->addHost(h,ServHost::T_SERVENT,0); } return; } if (gnuStream.packetsIn > 5) // die if we get too many packets return; } } }catch(StreamException &e) { LOG_ERROR("Relay: %s",e.msg); }}// -----------------------------------int Servent::givProc(ThreadInfo *thread){ thread->lock(); Servent *sv = (Servent*)thread->data; try {
sv->handshakeGiv(sv->givID);
sv->handshakeIncoming();
}catch(StreamException &e) { LOG_ERROR("GIV: %s",e.msg); } sv->kill(); return 0;}
// -----------------------------------
void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
{
bool nonFW = servMgr->canConnectToMe(rhost);
bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);
// say helo
atom.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0));
atom.writeString(PCP_HELO_AGENT,PCX_AGENT);
atom.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
if (nonFW)
atom.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
if (testFW)
atom.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
int numc,numd;
ID4 id = atom.read(numc,numd);
if (id != PCP_OLEH)
{
LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
throw StreamException("Got unexpected PCP response");
}
char arg[64];
GnuID clientID;
clientID.clear();
rid.clear();
int version=0;
Host thisHost;
// read OLEH response
for(int i=0; i<numc; i++)
{
int c,dlen;
ID4 id = atom.read(c,dlen);
if (id == PCP_HELO_AGENT)
{
atom.readString(arg,sizeof(arg),dlen);
agent.set(arg);
}else if (id == PCP_HELO_REMOTEIP)
{
thisHost.ip = atom.readInt();
}else if (id == PCP_HELO_PORT)
{
thisHost.port = atom.readShort();
}else if (id == PCP_HELO_VERSION)
{
version = atom.readInt();
}else if (id == PCP_HELO_SESSIONID)
{
atom.readBytes(rid.id,16);
if (rid.isSame(servMgr->sessionID))
throw StreamException("Servent loopback");
}else
{
LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
atom.skip(c,dlen);
}
}
// update server ip/firewall status
if (thisHost.isValid())
{
if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
{
//if (thisHost.globalIP())
{
servMgr->serverHost.ip = thisHost.ip;
char ipstr[64];
servMgr->serverHost.IPtoStr(ipstr);
LOG_DEBUG("Got remote ip: %s",ipstr);
if (thisHost.port && thisHost.globalIP())
servMgr->setFirewall(ServMgr::FW_OFF);
else
servMgr->setFirewall(ServMgr::FW_ON);
}
}
}
if (!rid.isSet())
{
atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
throw StreamException("Remote host not identified");
}
}
// -----------------------------------
void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
{
int numc,numd;
ID4 id = atom.read(numc,numd);
if (id != PCP_HELO)
{
LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
throw StreamException("Got unexpected PCP response");
}
char arg[64];
ID4 osType;
int version=0;
int pingPort=0;
GnuID clientID;
clientID.clear();
rhost.port = 0;
for(int i=0; i<numc; i++)
{
int c,dlen;
ID4 id = atom.read(c,dlen);
if (id == PCP_HELO_AGENT)
{
atom.readString(arg,sizeof(arg),dlen);
agent.set(arg);
}else if (id == PCP_HELO_VERSION)
{
version = atom.readInt();
}else if (id == PCP_HELO_SESSIONID)
{
atom.readBytes(rid.id,16);
if (rid.isSame(servMgr->sessionID))
throw StreamException("Servent loopback");
}else if (id == PCP_HELO_OSTYPE)
{
osType = atom.readInt();
}else if (id == PCP_HELO_PORT)
{
rhost.port = atom.readShort();
}else if (id == PCP_HELO_PING)
{
pingPort = atom.readShort();
}else
{
LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
atom.skip(c,dlen);
}
}
if (version)
LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);
if (!rhost.globalIP() && servMgr->serverHost.globalIP())
{
rhost.ip = servMgr->serverHost.ip;
rhost.port = 0;
}else if (pingPort)
{
rhost.port = pingPort;
if (!rhost.globalIP() || !pingHost(rhost,rid))
rhost.port = 0;
}
atom.writeParent(PCP_OLEH,5);
atom.writeString(PCP_HELO_AGENT,PCX_AGENT);
atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
atom.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
atom.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
atom.writeShort(PCP_HELO_PORT,rhost.port);
if (version)
{
if (version < PCP_CLIENT_MINVERSION)
{
atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
throw StreamException("Agent is not valid");
}
}
if (!rid.isSet())
{
atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
throw StreamException("Remote host not identified");
}
if (servMgr->isRoot)
servMgr->writeRootAtoms(atom,false);
}
// -----------------------------------
void Servent::processIncomingPCP(bool suggestOthers)
{
pcpStream.readVersion(*sock);
AtomStream atom(*sock);
Host rhost = sock->host;
handshakeIncomingPCP(atom,rhost,remoteID,agent);
bool alreadyConnected = servMgr->findConnection(remoteID)!=NULL;
bool unavailable = servMgr->controlInFull();
if (unavailable || alreadyConnected)
{
if (suggestOthers)
{
ChanHit best;
if (!rhost.globalIP())
best = chanMgr->getTracker(&servMgr->serverHost,0,false); // find best hit on this network
if (!best.host.ip)
best = chanMgr->getTracker(&rhost,0,false); // find best hit on same network
if (!best.host.ip)
best = chanMgr->getTracker(NULL,0,false); // else find best hit on other networks
// force push
//best.host.ip = 0;
//
if (best.host.ip)
{
GnuID noID;
noID.clear();
best.writeAtoms(atom,true,noID);
}else if (rhost.port)
{
// send push request to best firewalled tracker on other network
best = chanMgr->getTracker(NULL,0,true); // else find push host
if (best.host.ip)
{
GnuID noID;
noID.clear();
servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
}
}
}
int r = 0;
if (alreadyConnected)
r = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
else if (unavailable)
r = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
LOG_ERROR("Sending QUIT to incoming: %d",r);
atom.writeInt(PCP_QUIT,r);
return;
}
type = T_CIN;
setStatus(S_CONNECTED);
atom.writeInt(PCP_OK,0);
// ask for update
atom.writeParent(PCP_ROOT,1);
atom.writeParent(PCP_ROOT_UPDATE,0);
pcpStream.process(*sock,remoteID);
}
// -----------------------------------int Servent::outgoingProc(ThreadInfo *thread){ thread->lock(); Servent *sv = (Servent*)thread->data;
while (sv->thread.active)
{
char ipStr[64];
sv->setStatus(S_WAIT);
ChanHit best;
do
{
if (servMgr->rootHost.isEmpty())
break;
if (sv->pushSock)
{
sv->sock = sv->pushSock;
sv->pushSock = NULL;
best.host = sv->sock->host;
break;
}
// find local tracker
best = chanMgr->getTracker(&servMgr->serverHost,MIN_TRACKER_RETRY,false);
// else find global tracker
if (!best.host.ip)
best = chanMgr->getTracker(NULL,MIN_TRACKER_RETRY,false);
unsigned int ctime = sys->getTime();
if ((!best.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
{
best.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
best.yp = true;
chanMgr->lastYPConnect = ctime;
}
sys->sleepIdle();
}while ((!best.host.ip) && (sv->thread.active));
if (!best.host.ip) // give up
break;
best.host.toStr(ipStr);
try {
LOG_DEBUG("Outgoing to %s: Connecting..",ipStr);
if (!sv->sock)
{
sv->setStatus(S_CONNECTING);
sv->sock = sys->createSocket();
if (!sv->sock)
throw StreamException("Unable to create socket");
sv->sock->open(best.host);
sv->sock->connect();
}
sv->sock->setReadTimeout(30000);
AtomStream atom(*sv->sock);
sv->setStatus(S_HANDSHAKE);
Host rhost = sv->sock->host;
atom.writeInt(PCP_CONNECT,1);
handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent);
sv->setStatus(S_CONNECTED);
LOG_DEBUG("Outgoing to %s: OK",ipStr);
BroadcastState bcs;
bcs.srcID = sv->remoteID;
while (!sv->sock->eof() && !peercastInst->isQuitting && (chanMgr->isBroadcasting()))
{
sv->pcpStream.readPacket(*sv->sock,bcs);
sys->sleepIdle();
}
sv->setStatus(S_CLOSING);
sv->pcpStream.flush(*sv->sock);
atom.writeInt(PCP_QUIT,0);
LOG_DEBUG("Outgoing to %s: closed",ipStr);
}catch(TimeoutException &e) { LOG_ERROR("Outgoing to %s: timeout (%s)",ipStr,e.msg); sv->setStatus(S_TIMEOUT); }catch(StreamException &e) { LOG_ERROR("Outgoing to %s: %s",ipStr,e.msg); sv->setStatus(S_REFUSED); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -