⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 servent.cpp.svn-base

📁 这是和p2p相关的一份源码
💻 SVN-BASE
📖 第 1 页 / 共 2 页
字号:
					if (ret == GnuStream::R_PROCESS)					{						GnuID routeID;						GnuPacket out;						out.func = 255;						ret = gnuStream.processPacket(pack,out,this,routeID);						if (out.func != 255)						{							LOG_NETWORK("Sent return packet: %s",GNU_FUNC_STR(out.func));							outputPacket(out,false);						}					}					switch(ret)					{						case GnuStream::R_BROADCAST:							if (!servMgr->broadcast(pack,this))								stats.add(Stats::NUMDROPPED);							else								stats.add(Stats::NUMBROADCASTED);							break;						case GnuStream::R_ROUTE:							if (!servMgr->route(pack,routeID,NULL))								stats.add(Stats::NUMDROPPED);							else								stats.add(Stats::NUMROUTED);							break;						case GnuStream::R_ACCEPTED:							stats.add(Stats::NUMACCEPTED);							break;						case GnuStream::R_DUPLICATE:							stats.add(Stats::NUMDUP);							break;						case GnuStream::R_DEAD:							stats.add(Stats::NUMDEAD);							break;						case GnuStream::R_DISCARD:							stats.add(Stats::NUMDISCARDED);							break;						case GnuStream::R_BADVERSION:							stats.add(Stats::NUMOLD);							break;					}					LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, v%05x from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,pack.id.getVersion(),ipstr);				}else{					LOG_ERROR("Bad packet");				}			}			GnuPacket *p;			if ((p=outPacketsPri.curr()))				// priority packet			{				gnuStream.sendPacket(*p);				seenIDs.addGnuID(p->id);				outPacketsPri.next();			} else if ((p=outPacketsNorm.curr())) 		// or.. normal packet			{				gnuStream.sendPacket(*p);				seenIDs.addGnuID(p->id);				outPacketsNorm.next();			}			int lpt =  sys->getTime()-lastPacket;			if (!doneBigPing)			{				if ((sys->getTime()-lastPing) > 15)				{					gnuStream.ping(7);					lastPing = sys->getTime();					doneBigPing = true;				}			}else{				if (lpt > packetTimeoutSecs)				{										if ((sys->getTime()-lastPing) > packetTimeoutSecs)					{						gnuStream.ping(1);						lastPing = sys->getTime();					}				}			}			if (lpt > abortTimeoutSecs)				throw TimeoutException();			unsigned int tot = sock->totalBytesIn+sock->totalBytesOut;			unsigned int bytes = (tot-lastTotal);			lastTotal = tot;			unsigned int delay = sys->idleSleepTime;			if ((bytes) && (servMgr->serventBandwidth >= 8))				delay = (bytes*1000)/(servMgr->serventBandwidth/8);			if (delay < sys->idleSleepTime)				delay = sys->idleSleepTime;			sys->sleep(delay);		}	}catch(StreamException &e)	{		LOG_ERROR("Process Error: %s",e.msg);	}}// -----------------------------------void Servent::processRelay(){	try 	{			sock->timeout = 10000;		// 10 seconds		gnuStream.init(sock);		setStatus(S_CONNECTED);		gnuStream.ping(2);		while (thread.active && sock->active())		{			if (gnuStream.readPacket(pack))			{				char ipstr[64];				sock->host.toStr(ipstr);				unsigned int ver = pack.id.getVersion();				servMgr->addVersion(ver);								LOG_NETWORK("packet in: %d v%05x from %s",pack.func,pack.getVersion(),ipstr);				//if (pack.id.getVersion() < MIN_PACKETVER)				//	break;				if (pack.func == 0)		// if ping then pong back some hosts and close				{										Host hl[64];					int cnt = servMgr->getNewestHosts(hl,64,sock->host);						if (cnt)					{						int start = sys->rnd() % cnt;						for(int i=0; i<8; i++)						{							GnuPacket pong;							pack.hops = 1;							pong.initPong(hl[start],false,pack);							gnuStream.sendPacket(pong);							start = (start+1) % cnt;						}						char str[64];						sock->host.toStr(str);						LOG_NETWORK("Sent 8 pongs to %s",str);					}else					{						LOG_NETWORK("No Pongs to send");						return;					}				}else if (pack.func == 1)		// pong?				{					MemoryStream pong(pack.data,pack.len);					int ip,port;					port = pong.readShort();					ip = pong.readLong();					ip = SWAP4(ip);					LOG_NETWORK("pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);					if ((ip) && (port))					{						Host h(ip,port);						servMgr->addHost(h,true);					}					return;				}				if (gnuStream.packetsIn > 5)	// die if we get too many packets					return;			}		}	}catch(StreamException &e)	{		LOG_ERROR("Relay: %s",e.msg);	}}// -----------------------------------int Servent::givProc(ThreadInfo *thread){	thread->lock();	Servent *sv = (Servent*)thread->data;	try 	{		sv->processStream(true,false);	}catch(StreamException &e)	{		LOG_ERROR("GIV: %s",e.msg);	}	sv->endThread();	return 0;}// -----------------------------------int Servent::outgoingProc(ThreadInfo *thread){	thread->lock();	Servent *sv = (Servent*)thread->data;			try 	{		sv->setStatus(S_CONNECTING);		sv->sock->timeout = 10000;		// 10 seconds handshake		sv->sock->connect();		sv->setStatus(S_HANDSHAKE);		sv->handshakeOut();		sv->process();	}catch(TimeoutException &e)	{		LOG_ERROR("Outgoing Timeout: %s",e.msg);		servMgr->deadHost(sv->sock->host);		sv->setStatus(S_TIMEOUT);	}catch(StreamException &e)	{		servMgr->deadHost(sv->sock->host);		LOG_ERROR("Outgoing: %s",e.msg);		sv->setStatus(S_REFUSED);		if (sv->type == T_LOOKUP)			sys->sleep(30000);	}	sv->endThread();	return 0;}// -----------------------------------int Servent::incomingProc(ThreadInfo *thread){	thread->lock();	Servent *sv = (Servent*)thread->data;		try 	{		sv->handshakeIncoming();	}catch(HTTPException &e)	{		try		{			sv->sock->writeLine(e.msg);			if (e.code == 401)				sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");			sv->sock->writeLine("");		}catch(StreamException &){}		LOG_ERROR("Incoming HTTP error: %s",e.msg);	}catch(StreamException &e)	{		LOG_ERROR("Incoming Connect error: %s",e.msg);	}	sv->endThread();	return 0;}// -----------------------------------void Servent::processServent(){	setStatus(S_HANDSHAKE);	handshakeIn();	if (!sock)		throw StreamException("Servent has no socket");	if (servMgr->isRoot && !servMgr->needConnections())		processRelay();	else		process();}// -----------------------------------void Servent::processStream(bool push, bool isRaw){			Channel *ch = NULL;	try 	{		setStatus(S_HANDSHAKE);		ch = chanMgr->findChannelByID(chanID);		if (!ch)			throw StreamException("Not found");		chanIndex = ch->index;		if (push)			handshakeGiv(ch);			else			handshakeStream(ch,isRaw);		servMgr->totalStreams++;		if (isRaw)		{			if (addMetadata)				sendRawMetaChannel(ch,chanMgr->icyMetaInterval);			else				sendRawChannel(ch);		}else			sendChannel(ch);		setStatus(S_CLOSING);	}catch(StreamException &e)	{		setStatus(S_ERROR);		LOG_ERROR("Stream Error: %s",e.msg);	}}// -----------------------------------------#if 0// debug		FileStream file;		file.openReadOnly("c://test.mp3");		LOG_DEBUG("raw file read");		char buf[4000];		int cnt=0;		while (!file.eof())		{			LOG_DEBUG("send %d",cnt++);			file.read(buf,sizeof(buf));			sock->write(buf,sizeof(buf));		}		file.close();		LOG_DEBUG("raw file sent");	return;// debug#endif// -----------------------------------bool Servent::waitForChannelHeader(Channel *ch){	for(int i=0; i<60; i++)	{		if (ch->isPlaying() && ch->syncPos)			return true;		if (!thread.active || !sock->active())			break;		sys->sleep(1000);	}	return false;}// -----------------------------------void Servent::sendRawChannel(Channel *ch){	ch->numListeners++;	outputBitrate = ch->getBitrate();	try	{		sock->timeout = 10000;		setStatus(S_CONNECTED);		LOG_DEBUG("Starting Raw stream: %s",ch->info.name.cstr());		if (!waitForChannelHeader(ch))			throw StreamException("Channel not ready");		if (ch->headMeta.len)		{			LOG_DEBUG("Send %d bytes header ",ch->headMeta.len);			sock->write(&ch->headMeta.data,ch->headMeta.len);		}		currPos=0;		unsigned int mySync=0,chanSync=0;		unsigned int syncTimeout=0;		ChanPacket pack;		while ((thread.active) && sock->active())		{			if (!ch->isActive())				throw StreamException("Channel closed");			currPos = ch->chanData.readPacket(currPos,pack);			//LOG_CHANNEL("Send POS: %d",currPos);			if (pack.type == 'SYNC')			{				chanSync = *(unsigned int *)pack.data;				//LOG_CHANNEL("Send SYNC: %d",chanSync);				int rs = mySync-chanSync;				if (rs > ChanPacketBuffer::MAX_PACKETS)					mySync = chanSync-1;			}			if (pack.type == 'DATA')			{				// skip packets until we`re back in sync				if (chanSync <= mySync)				{					syncTimeout++;					if (syncTimeout > 30)						throw StreamException("Unable to sync");					LOG_CHANNEL("SYNC wait: %d - %d",mySync-chanSync,syncTimeout);				}else				{					//LOG_CHANNEL("Send DATA: %d",chanSync);					syncTimeout=0;					mySync = chanSync;					sock->write(pack.data,pack.len);				}			}		}	}catch(StreamException &e)	{		LOG_ERROR("Stream channel: %s",e.msg);	}	if (ch->numListeners)		ch->numListeners--;}// -----------------------------------void Servent::sendRawMetaChannel(Channel *ch, int interval){	ch->numListeners++;	outputBitrate = ch->getBitrate();	try	{		sock->timeout = 10000;		setStatus(S_CONNECTED);		LOG_DEBUG("Starting Raw Meta stream: %s",ch->info.name.cstr());		if (!waitForChannelHeader(ch))			throw StreamException("Channel not ready");		if (ch->headMeta.len)		{			LOG_DEBUG("Send %d bytes header ",ch->headMeta.len);			sock->write(&ch->headMeta.data,ch->headMeta.len);		}		String lastTitle,lastURL;		currPos=0;		unsigned int mySync=0,chanSync=0;		int		lastMsgTime=sys->getTime();		bool	showMsg=true;		ChanPacket pack;		char buf[16384];		int bufPos=0;		if ((interval > sizeof(buf)) || (interval < 1))			throw StreamException("Bad ICY Meta Interval value");		unsigned int syncTimeout=0;		while ((thread.active) && sock->active())		{			if (!ch->isActive())				throw StreamException("Channel closed");			currPos = ch->chanData.readPacket(currPos,pack);			MemoryStream mem(pack.data,pack.len);			if (pack.type == 'SYNC')			{				chanSync = mem.readLong();				int rs = mySync-chanSync;				if (rs > ChanPacketBuffer::MAX_PACKETS)					mySync = chanSync-1;			}			if (pack.type == 'DATA')			{				// skip packets until we`re back in sync				if (chanSync <= mySync)				{					syncTimeout++;					if (syncTimeout > 30)						throw StreamException("Unable to sync");					LOG_DEBUG("SYNC wait: %d - %d",mySync-chanSync,syncTimeout);				}else				{					syncTimeout = 0;					mySync = chanSync;					int len = pack.len;					char *p = pack.data;					while (len)					{						int rl = len;						if ((bufPos+rl) > interval)							rl = interval-bufPos;						memcpy(&buf[bufPos],p,rl);						bufPos+=rl;						p+=rl;						len-=rl;						if (bufPos >= interval)						{							bufPos = 0;								sock->write(buf,interval);							if (chanMgr->broadcastMsgInterval)								if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)								{									showMsg ^= true;									lastMsgTime = sys->getTime();								}							String *metaTitle = &ch->info.track.title;							if (!ch->info.comment.isEmpty() && (showMsg))								metaTitle = &ch->info.comment;							if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))							{								char tmp[1024];								String title,url;								title = *metaTitle;								url = ch->info.url;								title.convertTo(String::T_META);								url.convertTo(String::T_META);								sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());								int len = ((strlen(tmp) + 15+1) / 16);								sock->writeChar(len);								sock->write(tmp,len*16);								lastTitle = *metaTitle;								lastURL = ch->info.url;								LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());							}else								sock->writeChar(0);											}					}				}			}		}	}catch(StreamException &e)	{		LOG_ERROR("Stream channel: %s",e.msg);	}	if (ch->numListeners)		ch->numListeners--;}// -----------------------------------void Servent::sendChannel(Channel *ch){	ch->numRelays++;	outputBitrate = ch->getBitrate();	try	{		sock->timeout = 10000;		setStatus(S_CONNECTED);		LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());		currPos=0;		sock->writeTag("PCST");		ChanPacket pack;		pack.init('HEAD',ch->headMeta.data,ch->headMeta.len);		pack.write(*sock);		pack.init('META',ch->insertMeta.data,ch->insertMeta.len);		pack.write(*sock);				while ((thread.active) && sock->active())		{			if (!ch->isActive())				throw StreamException("Channel closed");			unsigned int np = ch->chanData.readPacket(currPos,pack);			if ((np-currPos) > 1)				LOG_DEBUG("sendChannel skip: %d",np-currPos);			currPos = np;			pack.write(*sock);		}	}catch(StreamException &e)	{		LOG_ERROR("Stream channel: %s",e.msg);	}	if (ch->numRelays)		ch->numRelays--;}// -----------------------------------int Servent::serverProc(ThreadInfo *thread){	thread->lock();	Servent *sv = (Servent*)thread->data;	try 	{		if (!sv->sock)			throw StreamException("Server has no socket");		sv->setStatus(S_LISTENING);		//LOG4("Listening on port %d",sv->sock->host.port);		char servIP[64];		sv->sock->host.toStr(servIP);		if (servMgr->isRoot)			LOG_DEBUG("Root Server started: %s",servIP);		else			LOG_DEBUG("Server started: %s",servIP);				while ((thread->active) && (sv->sock->active()))		{			ClientSocket *cs = sv->sock->accept();			if (cs)			{								// if global IP then we`re not firewalled				if (cs->host.globalIP())					servMgr->setFirewall(ServMgr::FW_OFF);				Servent *ns = servMgr->allocServent();				if (ns)					ns->initIncoming(cs,sv->allow);				else					LOG_ERROR("Out of servents");			}		}	}catch(StreamException &e)	{		LOG_ERROR("Server Error: %s:%d",e.msg,e.err);	}		LOG_DEBUG("Server stopped");	sv->endThread();	return 0;} 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -