⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 servent.cpp.svn-base

📁 这是和p2p相关的一份源码
💻 SVN-BASE
📖 第 1 页 / 共 3 页
字号:
		}    }	// if this is a priority connection and all incoming connections 	// are full then kill an old connection to make room. Otherwise reject connection.	if (!priorityConnect)		if (servMgr->inFull())			throw HTTPException(HTTP_SC_UNAVAILABLE,503);	if ((stricmp(servMgr->network.cstr(),"peercast")!=0) && (!subnetValid))		throw HTTPException(HTTP_SC_NOTFOUND,404);	if (!versionValid)	{		throw HTTPException(HTTP_SC_UNAUTHORIZED,401);	}    sock->writeLine(GNU_OK);    sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT);    sock->writeLine("%s %s",PCX_HS_SUBNET,servMgr->network.cstr());	if (servMgr->isRoot)	{		sock->writeLine("%s %d",PCX_HS_BCTTL,9);		//sock->writeLine("%s %d",PCX_HS_FULLHIT,2);		if (diffRootVer)		{			sock->writeString(PCX_HS_DL);			switch(osType)			{				case 1:					sock->writeLine(PCX_DL_LINUXDYN);					break;				case 2:					sock->writeLine(PCX_DL_LINUXSTA);					break;				case 3:					sock->writeLine(PCX_DL_WIN32);					break;				case 4:					sock->writeLine(PCX_DL_MACOSX);					break;				case 5:					sock->writeLine(PCX_DL_WINAMP2);					break;				default:					sock->writeLine(PCX_DL_URL);					break;			}		}		sock->writeLine("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());	}	char hostIP[64];	Host h = sock->host;	h.IPtoStr(hostIP);    sock->writeLine("Remote-IP: %s",hostIP);    sock->writeLine("");	while (http.nextHeader());}// -----------------------------------void Servent::handshakeStream(Channel *ch, bool isRaw){	sock->timeout = 10000;	HTTP http(*sock);	char idStr[64];	ch->info.id.toStr(idStr);	while (http.nextHeader())	{		char *arg = http.getArgStr();		if (!arg)			continue;		if (http.isHeader(HTTP_HS_AGENT))			agent.set(arg);		if (ch->info.contentType == ChanInfo::T_MP3)			if (http.isHeader("icy-metadata"))				addMetadata = atoi(arg) > 0;		LOG_DEBUG("Stream: %s",http.cmdLine);	}	if (addMetadata && isRaw)		// winamp mp3 metadata check	{	    sock->writeLine(ICY_OK);		sock->writeLine("%s %s",HTTP_HS_SERVER,PCX_AGENT);		sock->writeLine("icy-name:%s",ch->getName());		sock->writeLine("icy-br:%d",ch->getBitrate());		sock->writeLine("icy-genre:%s",ch->info.genre.cstr());		sock->writeLine("icy-url:%s",ch->info.url.cstr());		sock->writeLine("icy-metaint:%d",chanMgr->icyMetaInterval);		sock->writeLine("%s %s",PCX_HS_CHANNELID,idStr);	    sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MP3);	}else	{	    sock->writeLine(HTTP_SC_OK);		sock->writeLine("%s %s",HTTP_HS_SERVER,PCX_AGENT);	//	sock->writeLine("icy-metaint:0");		sock->writeLine("x-audiocast-name: %s",ch->getName());		sock->writeLine("x-audiocast-bitrate: %d",ch->getBitrate());		sock->writeLine("x-audiocast-genre: %s",ch->info.genre.cstr());		sock->writeLine("x-audiocast-description: %s",ch->info.desc.cstr());		sock->writeLine("x-audiocast-url: %s",ch->info.url.cstr());		sock->writeLine("%s %s",PCX_HS_CHANNELID,idStr);		if (isRaw)		{			switch (ch->info.contentType)			{				case ChanInfo::T_OGG:					sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XOGG);					break;				case ChanInfo::T_MP3:					sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MP3);					break;				case ChanInfo::T_MOV:					sock->writeLine("Accept-Ranges: bytes");					sock->writeLine("Connection: close");					sock->writeLine("Content-Length: 10000000");					sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MOV);					break;				case ChanInfo::T_MPG:					sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MPG);					break;				case ChanInfo::T_NSV:					sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_NSV);					break;			}				}else		{			sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);		}	}	    sock->writeLine("");}// -----------------------------------void Servent::handshakeGiv(Channel *ch){	sock->timeout = 10000;	char idstr[64];	pushID.toStr(idstr);    sock->writeLine("GIV %d:%s/%s",chanIndex,idstr,ch->info.name.cstr());	handshakeStream(ch,false);}// -----------------------------------void Servent::process(){	try 	{			gnuStream.init(sock);		setStatus(S_CONNECTED);		sock->timeout = 10000;		// 10 seconds		gnuStream.ping(2);		if (type != T_LOOKUP)			chanMgr->broadcastRelays(2,this);		lastPacket = lastPing = sys->getTime();		bool doneBigPing=false;		const unsigned int	abortTimeoutSecs = 60;		// abort connection after 60 secs of no activitiy		const unsigned int	packetTimeoutSecs = 30;		// ping connection after 30 secs of no activity		unsigned int currBytes=0,lastTotal=0;		unsigned int lastWait=0;		while (thread.active && sock->active())		{			if (sock->readPending())			{				lastPacket = sys->getTime();				if (gnuStream.readPacket(pack))				{					unsigned int ver = pack.id.getVersion();					char ipstr[64];					sock->host.toStr(ipstr);					GnuID routeID;					GnuStream::R_TYPE ret = GnuStream::R_PROCESS;					if ((ver < MIN_PACKETVER) || (ver >= MAX_PACKETVER))						ret = GnuStream::R_BADVERSION;					if (pack.func != GNU_FUNC_PONG)						if (servMgr->seenPacket(pack))							ret = GnuStream::R_DUPLICATE;					seenIDs.addGnuID(pack.id);					servMgr->addVersion(ver);					if (ret == GnuStream::R_PROCESS)					{						GnuID routeID;						ret = gnuStream.processPacket(pack,this,routeID);					}					switch(ret)					{						case GnuStream::R_BROADCAST:							if (servMgr->broadcast(pack,this))								stats.add(Stats::NUMBROADCASTED);							else								stats.add(Stats::NUMDROPPED);							break;						case GnuStream::R_ROUTE:							if (servMgr->route(pack,routeID,NULL))								stats.add(Stats::NUMROUTED);							else								stats.add(Stats::NUMDROPPED);							break;						case GnuStream::R_ACCEPTED:							stats.add(Stats::NUMACCEPTED);							break;						case GnuStream::R_DUPLICATE:							stats.add(Stats::NUMDUP);							break;						case GnuStream::R_DEAD:							stats.add(Stats::NUMDEAD);							break;						case GnuStream::R_DISCARD:							stats.add(Stats::NUMDISCARDED);							break;						case GnuStream::R_BADVERSION:							stats.add(Stats::NUMOLD);							break;					}					LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, v%05x from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,pack.id.getVersion(),ipstr);				}else{					LOG_ERROR("Bad packet");				}			}			GnuPacket *p;			if ((p=outPacketsPri.curr()))				// priority packet			{				gnuStream.sendPacket(*p);				seenIDs.addGnuID(p->id);				outPacketsPri.next();			} else if ((p=outPacketsNorm.curr())) 		// or.. normal packet			{				gnuStream.sendPacket(*p);				seenIDs.addGnuID(p->id);				outPacketsNorm.next();			}			int lpt =  sys->getTime()-lastPacket;			if (!doneBigPing)			{				if ((sys->getTime()-lastPing) > 15)				{					gnuStream.ping(7);					lastPing = sys->getTime();					doneBigPing = true;				}			}else{				if (lpt > packetTimeoutSecs)				{										if ((sys->getTime()-lastPing) > packetTimeoutSecs)					{						gnuStream.ping(1);						lastPing = sys->getTime();					}				}			}			if (lpt > abortTimeoutSecs)				throw TimeoutException();			unsigned int tot = sock->totalBytesIn+sock->totalBytesOut;			unsigned int bytes = (tot-lastTotal);			lastTotal = tot;			unsigned int delay = sys->idleSleepTime;			if ((bytes) && (servMgr->serventBandwidth >= 8))				delay = (bytes*1000)/(servMgr->serventBandwidth/8);			if (delay < sys->idleSleepTime)				delay = sys->idleSleepTime;			sys->sleep(delay);			// if this is a private servent (has networkID) then check to see			// that the channel is still connected. Otherwise, closedown.			if (networkID.isSet())			{				bool hasChan=true;				Channel *ch = chanMgr->findChannelByID(networkID);				if (ch)				{					if (ch->status == Channel::S_IDLE)						throw StreamException("NetID idle");				}			}					}	}catch(StreamException &e)	{		LOG_ERROR("Servent closed: %s",e.msg);	}}// -----------------------------------void Servent::processRelay(){	try 	{			sock->timeout = 10000;		// 10 seconds		gnuStream.init(sock);		setStatus(S_CONNECTED);		gnuStream.ping(2);		while (thread.active && sock->active())		{			if (gnuStream.readPacket(pack))			{				char ipstr[64];				sock->host.toStr(ipstr);				unsigned int ver = pack.id.getVersion();				servMgr->addVersion(ver);								LOG_NETWORK("packet in: %d v%05x from %s",pack.func,pack.getVersion(),ipstr);				//if (pack.id.getVersion() < MIN_PACKETVER)				//	break;				if (pack.func == 0)		// if ping then pong back some hosts and close				{										Host hl[32];					int cnt = servMgr->getNewestServents(hl,32,sock->host);						if (cnt)					{						int start = sys->rnd() % cnt;						for(int i=0; i<8; i++)						{							GnuPacket pong;							pack.hops = 1;							pong.initPong(hl[start],false,pack);							gnuStream.sendPacket(pong);							char ipstr[64];							hl[start].toStr(ipstr);							//LOG_NETWORK("Pong %d: %s",start+1,ipstr);							start = (start+1) % cnt;						}						char str[64];						sock->host.toStr(str);						LOG_NETWORK("Sent 8 pongs to %s",str);					}else					{						LOG_NETWORK("No Pongs to send");						//return;					}				}else if (pack.func == 1)		// pong?				{					MemoryStream pong(pack.data,pack.len);					int ip,port;					port = pong.readShort();					ip = pong.readLong();					ip = SWAP4(ip);					LOG_NETWORK("pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);					if ((ip) && (port))					{						Host h(ip,port);						servMgr->addHost(h,ServHost::T_SERVENT,0,networkID);					}					return;				}				if (gnuStream.packetsIn > 5)	// die if we get too many packets					return;			}		}	}catch(StreamException &e)	{		LOG_ERROR("Relay: %s",e.msg);	}}// -----------------------------------int Servent::givProc(ThreadInfo *thread){	thread->lock();	Servent *sv = (Servent*)thread->data;	try 	{		sv->processStream(true,false);	}catch(StreamException &e)	{		LOG_ERROR("GIV: %s",e.msg);	}	sv->endThread();	return 0;}// -----------------------------------int Servent::outgoingProc(ThreadInfo *thread){	thread->lock();	Servent *sv = (Servent*)thread->data;			try 	{		sv->setStatus(S_CONNECTING);		sv->sock->timeout = 10000;		// 10 seconds handshake		sv->sock->connect();		sv->setStatus(S_HANDSHAKE);		sv->handshakeOut();		sv->process();	}catch(TimeoutException &e)	{		LOG_ERROR("Outgoing Timeout: %s",e.msg);		servMgr->deadHost(sv->sock->host,ServHost::T_SERVENT);		sv->setStatus(S_TIMEOUT);	}catch(StreamException &e)	{		servMgr->deadHost(sv->sock->host,ServHost::T_SERVENT);		LOG_ERROR("Outgoing: %s",e.msg);		sv->setStatus(S_REFUSED);		if (sv->type == T_LOOKUP)			sys->sleep(30000);	}	sv->endThread();	return 0;}// -----------------------------------int Servent::incomingProc(ThreadInfo *thread){	thread->lock();	Servent *sv = (Servent*)thread->data;		try 	{		sv->handshakeIncoming();	}catch(HTTPException &e)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -