⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 servent.cpp.svn-base

📁 这是和p2p相关的一份源码
💻 SVN-BASE
📖 第 1 页 / 共 4 页
字号:
						break;					case GnuStream::R_ACCEPTED:						stats.add(Stats::NUMACCEPTED);						break;					case GnuStream::R_DUPLICATE:						stats.add(Stats::NUMDUP);						break;					case GnuStream::R_DEAD:						stats.add(Stats::NUMDEAD);						break;					case GnuStream::R_DISCARD:						stats.add(Stats::NUMDISCARDED);						break;					case GnuStream::R_BADVERSION:						stats.add(Stats::NUMOLD);						break;					case GnuStream::R_DROP:						stats.add(Stats::NUMDROPPED);						break;				}				LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, v%05x from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,pack.id.getVersion(),ipstr);			}else{				LOG_ERROR("Bad packet");			}		}		GnuPacket *p;		if ((p=outPacketsPri.curr()))				// priority packet		{			gnuStream.sendPacket(*p);			seenIDs.addGnuID(p->id);			outPacketsPri.next();		} else if ((p=outPacketsNorm.curr())) 		// or.. normal packet		{			gnuStream.sendPacket(*p);			seenIDs.addGnuID(p->id);			outPacketsNorm.next();		}		int lpt =  sys->getTime()-lastPacket;		if (!doneBigPing)		{			if ((sys->getTime()-lastPing) > 15)			{				gnuStream.ping(7);				lastPing = sys->getTime();				doneBigPing = true;			}		}else{			if (lpt > packetTimeoutSecs)			{								if ((sys->getTime()-lastPing) > packetTimeoutSecs)				{					gnuStream.ping(1);					lastPing = sys->getTime();				}			}		}		if (lpt > abortTimeoutSecs)			throw TimeoutException();		unsigned int totIn = sock->totalBytesIn-lastTotalIn;		unsigned int totOut = sock->totalBytesOut-lastTotalOut;		unsigned int bytes = totIn+totOut;		lastTotalIn = sock->totalBytesIn;		lastTotalOut = sock->totalBytesOut;		const int serventBandwidth = 1000;
		int delay = sys->idleSleepTime;		if ((bytes) && (serventBandwidth >= 8))			delay = (bytes*1000)/(serventBandwidth/8);	// set delay relative packetsize		if (delay < (int)sys->idleSleepTime)			delay = sys->idleSleepTime;		//LOG("delay %d, in %d, out %d",delay,totIn,totOut);		sys->sleep(delay);	}}


// -----------------------------------void Servent::processRoot(){	try 	{			gnuStream.init(sock);		setStatus(S_CONNECTED);		gnuStream.ping(2);		while (thread.active && sock->active())		{			if (gnuStream.readPacket(pack))			{				char ipstr[64];				sock->host.toStr(ipstr);				unsigned int ver = pack.id.getVersion();				servMgr->addVersion(ver);								LOG_NETWORK("packet in: %d v%05x from %s",pack.func,pack.getVersion(),ipstr);				//if (pack.id.getVersion() < MIN_PACKETVER)				//	break;				if (pack.func == 0)		// if ping then pong back some hosts and close				{										Host hl[32];					int cnt = servMgr->getNewestServents(hl,32,sock->host);						if (cnt)					{						int start = sys->rnd() % cnt;						for(int i=0; i<8; i++)						{							GnuPacket pong;							pack.hops = 1;							pong.initPong(hl[start],false,pack);							gnuStream.sendPacket(pong);							char ipstr[64];							hl[start].toStr(ipstr);							//LOG_NETWORK("Pong %d: %s",start+1,ipstr);							start = (start+1) % cnt;						}						char str[64];						sock->host.toStr(str);						LOG_NETWORK("Sent 8 pongs to %s",str);					}else					{						LOG_NETWORK("No Pongs to send");						//return;					}				}else if (pack.func == 1)		// pong?				{					MemoryStream pong(pack.data,pack.len);					int ip,port;					port = pong.readShort();					ip = pong.readLong();					ip = SWAP4(ip);					LOG_NETWORK("pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);					if ((ip) && (port))					{						Host h(ip,port);						servMgr->addHost(h,ServHost::T_SERVENT,0);					}					return;				}				if (gnuStream.packetsIn > 5)	// die if we get too many packets					return;			}		}	}catch(StreamException &e)	{		LOG_ERROR("Relay: %s",e.msg);	}}// -----------------------------------int Servent::givProc(ThreadInfo *thread){	thread->lock();	Servent *sv = (Servent*)thread->data;	try 	{
		sv->handshakeGiv(sv->givID);
		sv->handshakeIncoming();
	}catch(StreamException &e)	{		LOG_ERROR("GIV: %s",e.msg);	}	sv->kill();	return 0;}

// -----------------------------------
void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
{


	bool nonFW = servMgr->canConnectToMe(rhost);
	bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);

	// say helo
	atom.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0));
		atom.writeString(PCP_HELO_AGENT,PCX_AGENT);
		atom.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
		atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
		if (nonFW)
			atom.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
		if (testFW)
			atom.writeShort(PCP_HELO_PING,servMgr->serverHost.port);


	int numc,numd;
	ID4 id = atom.read(numc,numd);
	if (id != PCP_OLEH)
	{
		LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
		atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
		throw StreamException("Got unexpected PCP response");
	}


	char arg[64];

	GnuID clientID;
	clientID.clear();
	rid.clear();
	int version=0;

	Host thisHost;

	// read OLEH response
	for(int i=0; i<numc; i++)
	{
		int c,dlen;
		ID4 id = atom.read(c,dlen);

		if (id == PCP_HELO_AGENT)
		{
			atom.readString(arg,sizeof(arg),dlen);
			agent.set(arg);

		}else if (id == PCP_HELO_REMOTEIP)
		{
			thisHost.ip = atom.readInt();

		}else if (id == PCP_HELO_PORT)
		{
			thisHost.port = atom.readShort();

		}else if (id == PCP_HELO_VERSION)
		{
			version = atom.readInt();

		}else if (id == PCP_HELO_SESSIONID)
		{
			atom.readBytes(rid.id,16);
			if (rid.isSame(servMgr->sessionID))
				throw StreamException("Servent loopback");

		}else
		{
			LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
			atom.skip(c,dlen);
		}

    }


	// update server ip/firewall status
	if (thisHost.isValid())
	{
		if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
		{
			//if (thisHost.globalIP())
			{
				servMgr->serverHost.ip = thisHost.ip;

				char ipstr[64];
				servMgr->serverHost.IPtoStr(ipstr);
				LOG_DEBUG("Got remote ip: %s",ipstr);

				if (thisHost.port && thisHost.globalIP())
					servMgr->setFirewall(ServMgr::FW_OFF);
				else
					servMgr->setFirewall(ServMgr::FW_ON);
			}
		}
	}



	if (!rid.isSet())
	{
		atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
		throw StreamException("Remote host not identified");
	}

}
// -----------------------------------
void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
{
	int numc,numd;
	ID4 id = atom.read(numc,numd);

	if (id != PCP_HELO)
	{
		LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
		atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
		throw StreamException("Got unexpected PCP response");
	}

	char arg[64];

	ID4 osType;

	int version=0;

	int pingPort=0;

	GnuID clientID;
	clientID.clear();

	rhost.port = 0;

	for(int i=0; i<numc; i++)
	{
		int c,dlen;
		ID4 id = atom.read(c,dlen);

		if (id == PCP_HELO_AGENT)
		{
			atom.readString(arg,sizeof(arg),dlen);
			agent.set(arg);

		}else if (id == PCP_HELO_VERSION)
		{
			version = atom.readInt();

		}else if (id == PCP_HELO_SESSIONID)
		{
			atom.readBytes(rid.id,16);
			if (rid.isSame(servMgr->sessionID))
				throw StreamException("Servent loopback");

		}else if (id == PCP_HELO_OSTYPE)
		{
			osType = atom.readInt();
		}else if (id == PCP_HELO_PORT)
		{
			rhost.port = atom.readShort();
		}else if (id == PCP_HELO_PING)
		{
			pingPort = atom.readShort();
		}else
		{
			LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
			atom.skip(c,dlen);
		}

    }

	if (version)
		LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);


	if (!rhost.globalIP() && servMgr->serverHost.globalIP())
	{
		rhost.ip = servMgr->serverHost.ip;
		rhost.port = 0;
	}else if (pingPort)
	{
		rhost.port = pingPort;
		if (!rhost.globalIP() || !pingHost(rhost,rid))
			rhost.port = 0;
	}

	atom.writeParent(PCP_OLEH,5);
		atom.writeString(PCP_HELO_AGENT,PCX_AGENT);
		atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
		atom.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
		atom.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
		atom.writeShort(PCP_HELO_PORT,rhost.port);

	if (version)
	{
		if (version < PCP_CLIENT_MINVERSION)
		{
			atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
			throw StreamException("Agent is not valid");
		}
	}

	if (!rid.isSet())
	{
		atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
		throw StreamException("Remote host not identified");
	}



	if (servMgr->isRoot)
		servMgr->writeRootAtoms(atom,false);

}

// -----------------------------------
void Servent::processIncomingPCP(bool suggestOthers)
{
	pcpStream.readVersion(*sock);

	AtomStream atom(*sock);
	Host rhost = sock->host;

	handshakeIncomingPCP(atom,rhost,remoteID,agent);

	bool alreadyConnected = servMgr->findConnection(remoteID)!=NULL;
	bool unavailable = servMgr->controlInFull();

	if (unavailable || alreadyConnected)
	{
		if (suggestOthers)
		{

			ChanHit best;

			if (!rhost.globalIP())
				best = chanMgr->getTracker(&servMgr->serverHost,0,false); // find best hit on this network			
			if (!best.host.ip)
				best = chanMgr->getTracker(&rhost,0,false); // find best hit on same network			
			if (!best.host.ip)
				best = chanMgr->getTracker(NULL,0,false); // else find best hit on other networks

			// force push
			//best.host.ip = 0;
			// 

			if (best.host.ip)
			{
				GnuID noID;
				noID.clear();
				best.writeAtoms(atom,true,noID);

			}else if (rhost.port)
			{
				// send push request to best firewalled tracker on other network
				best = chanMgr->getTracker(NULL,0,true); // else find push host
				if (best.host.ip)
				{
					GnuID noID;
					noID.clear();
					servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
				}
			}
		}

		int r = 0;
		if (alreadyConnected)
			r = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
		else if (unavailable)
			r = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;

		LOG_ERROR("Sending QUIT to incoming: %d",r);

		atom.writeInt(PCP_QUIT,r);
		return;		
	}
	

	type = T_CIN;
	setStatus(S_CONNECTED);

	atom.writeInt(PCP_OK,0);

	// ask for update
	atom.writeParent(PCP_ROOT,1);
		atom.writeParent(PCP_ROOT_UPDATE,0);


	pcpStream.process(*sock,remoteID);
}

// -----------------------------------int Servent::outgoingProc(ThreadInfo *thread){	thread->lock();	Servent *sv = (Servent*)thread->data;		


	while (sv->thread.active)
	{

		char ipStr[64];

		sv->setStatus(S_WAIT);

		ChanHit best;
		do
		{
			if (servMgr->rootHost.isEmpty())
				break;

			if (sv->pushSock)
			{
				sv->sock = sv->pushSock;
				sv->pushSock = NULL;
				best.host = sv->sock->host;
				break;
			}

			// find local tracker
			best = chanMgr->getTracker(&servMgr->serverHost,MIN_TRACKER_RETRY,false); 

			// else find global tracker
			if (!best.host.ip)
				best = chanMgr->getTracker(NULL,MIN_TRACKER_RETRY,false); 

			unsigned int ctime = sys->getTime();

			if ((!best.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
			{
				best.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
				best.yp = true;
				chanMgr->lastYPConnect = ctime;
			}
			sys->sleepIdle();

		}while ((!best.host.ip) && (sv->thread.active));


		if (!best.host.ip)		// give up
			break;


		best.host.toStr(ipStr);

		try 		{

			LOG_DEBUG("Outgoing to %s: Connecting..",ipStr);

			if (!sv->sock)
			{
				sv->setStatus(S_CONNECTING);
				sv->sock = sys->createSocket();
				if (!sv->sock)
					throw StreamException("Unable to create socket");
				sv->sock->open(best.host);
				sv->sock->connect();

			}

			sv->sock->setReadTimeout(30000);
			AtomStream atom(*sv->sock);

			sv->setStatus(S_HANDSHAKE);
			Host rhost = sv->sock->host;
			atom.writeInt(PCP_CONNECT,1);
			handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent);

			sv->setStatus(S_CONNECTED);

			LOG_DEBUG("Outgoing to %s: OK",ipStr);

			BroadcastState bcs;
			bcs.srcID = sv->remoteID;
			while (!sv->sock->eof() && !peercastInst->isQuitting && (chanMgr->isBroadcasting()))
			{
				sv->pcpStream.readPacket(*sv->sock,bcs);
				sys->sleepIdle();
			}
			sv->setStatus(S_CLOSING);

			sv->pcpStream.flush(*sv->sock);
			atom.writeInt(PCP_QUIT,0);


			LOG_DEBUG("Outgoing to %s: closed",ipStr);
		}catch(TimeoutException &e)		{			LOG_ERROR("Outgoing to %s: timeout (%s)",ipStr,e.msg);			sv->setStatus(S_TIMEOUT);		}catch(StreamException &e)		{			LOG_ERROR("Outgoing to %s: %s",ipStr,e.msg);			sv->setStatus(S_REFUSED);		}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -