⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 servent.cpp.svn-base

📁 这是和p2p相关的一份源码
💻 SVN-BASE
📖 第 1 页 / 共 3 页
字号:
	{		try		{			sv->sock->writeLine(e.msg);			if (e.code == 401)				sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");			sv->sock->writeLine("");		}catch(StreamException &){}		LOG_ERROR("Incoming HTTP error: %s",e.msg);	}catch(StreamException &e)	{		LOG_ERROR("Incoming Connect error: %s",e.msg);	}	sv->endThread();	return 0;}// -----------------------------------void Servent::processServent(){	setStatus(S_HANDSHAKE);	handshakeIn();	if (!sock)		throw StreamException("Servent has no socket");	if (servMgr->isRoot && !servMgr->needConnections())		processRelay();	else		process();}// -----------------------------------void Servent::processStream(bool push, bool isRaw){			Channel *ch = NULL;	try 	{		setStatus(S_HANDSHAKE);		ch = chanMgr->findChannelByID(chanID);		if (!ch)			throw StreamException("Not found");		chanIndex = ch->index;		if (push)			handshakeGiv(ch);			else			handshakeStream(ch,isRaw);		servMgr->totalStreams++;		Host host = sock->host;		host.port = 0;	// force to 0 so we ignore the incoming port		GnuID noid;		noid.clear();		if (isRaw)		{			// we have agent string now, so check again.			if (!canPreview())				throw StreamException("Preview disallowed");			servMgr->addHost(host,ServHost::T_STREAM,0,noid);			if ((addMetadata) && (chanMgr->icyMetaInterval))				sendRawMetaChannel(ch,chanMgr->icyMetaInterval);			else				sendRawChannel(ch);		}else		{			servMgr->addHost(host,ServHost::T_CHANNEL,0,noid);			sendChannel(ch);		}		setStatus(S_CLOSING);	}catch(StreamException &e)	{		setStatus(S_ERROR);		LOG_ERROR("Stream Error: %s",e.msg);	}}// -----------------------------------------#if 0// debug		FileStream file;		file.openReadOnly("c://test.mp3");		LOG_DEBUG("raw file read");		char buf[4000];		int cnt=0;		while (!file.eof())		{			LOG_DEBUG("send %d",cnt++);			file.read(buf,sizeof(buf));			sock->write(buf,sizeof(buf));		}		file.close();		LOG_DEBUG("raw file sent");	return;// debug#endif// -----------------------------------bool Servent::waitForChannelHeader(Channel *ch){	for(int i=0; i<60; i++)	{		if (ch->isPlaying() && ch->syncPos)			return true;		if (!thread.active || !sock->active())			break;		sys->sleep(1000);	}	return false;}// -----------------------------------bool Servent::checkPreview(unsigned int connectTime){	Host h = sock->host;	h.port = 0;			// ignore incoming port number	if (!servMgr->isFiltered(ServFilter::F_DIRECT,h))		return false;	if (isPrivate())	// always allow private clients		return true;	if (agent.contains("PeerCast"))		// allow connections from peercast clients (direct relays etc..)		return true;	if (servMgr->maxPreviewTime == 0)		return true;	if ((sys->getTime()-connectTime) <= servMgr->maxPreviewTime)		return true;	return false;}// -----------------------------------bool Servent::canPreview(){		// probably not needed, but just in case we bind to the actual IP address and not localhost	if (sock->host.ip == servMgr->serverHost.ip)		return true;	Host h = getHost();	h.port = 0;	if (!servMgr->isFiltered(ServFilter::F_DIRECT,h))		return false;	if (isPrivate())	// always allow private clients		return true;	if (agent.contains("PeerCast"))		// allow connections from peercast clients (direct relays etc..)		return true;	if (servMgr->seenHost(sock->host,ServHost::T_STREAM,servMgr->maxPreviewWait))		return false;	return true;}// -----------------------------------void Servent::sendRawChannel(Channel *ch){	ch->numListeners++;	outputBitrate = ch->getBitrate();	try	{		sock->timeout = 10000;		setStatus(S_CONNECTED);		LOG_DEBUG("Starting Raw stream: %s",ch->info.name.cstr());		if (!waitForChannelHeader(ch))			throw StreamException("Channel not ready");		if (ch->headMeta.len)		{			LOG_DEBUG("Send %d bytes header ",ch->headMeta.len);			sock->write(&ch->headMeta.data,ch->headMeta.len);		}		currPos=0;		unsigned int mySync=0,chanSync=0;		unsigned int syncTimeout=0;		unsigned int connectTime=sys->getTime();		ChanPacket pack;		while ((thread.active) && sock->active())		{			if (!checkPreview(connectTime))				throw StreamException("Preview time limit reached");			if (!ch->isActive())				throw StreamException("Channel closed");			currPos = ch->chanData.readPacket(currPos,pack);			//LOG_CHANNEL("Send POS: %d",currPos);			if (pack.type == 'SYNC')			{				chanSync = *(unsigned int *)pack.data;				//LOG_CHANNEL("Send SYNC: %d - %d",mySync,chanSync);				int rs = mySync-chanSync;				if (rs > ChanPacketBuffer::MAX_PACKETS)					mySync = chanSync-1;			}			if (pack.type == 'DATA')			{				// skip packets until we`re back in sync				if (chanSync > mySync)				{					//LOG_CHANNEL("Send DATA: %d",pack.len);					syncTimeout=0;					mySync = chanSync;					sock->write(pack.data,pack.len);				}else					LOG_CHANNEL("SYNC wait: %d - %d",mySync-chanSync,syncTimeout);			}			syncTimeout++;			if (syncTimeout > 30)				throw StreamException("Unable to sync");		}	}catch(StreamException &e)	{		LOG_ERROR("Stream channel: %s",e.msg);	}	if (ch->numListeners)		ch->numListeners--;}// -----------------------------------void Servent::sendRawMetaChannel(Channel *ch, int interval){	ch->numListeners++;	outputBitrate = ch->getBitrate();	try	{		sock->timeout = 10000;		setStatus(S_CONNECTED);		LOG_DEBUG("Starting Raw Meta stream: %s (metaint: %d)",ch->info.name.cstr(),interval);		if (!waitForChannelHeader(ch))			throw StreamException("Channel not ready");		if (ch->headMeta.len)		{			LOG_DEBUG("Send %d bytes header ",ch->headMeta.len);			sock->write(&ch->headMeta.data,ch->headMeta.len);		}		String lastTitle,lastURL;		currPos=0;		unsigned int mySync=0,chanSync=0;		int		lastMsgTime=sys->getTime();		bool	showMsg=true;		ChanPacket pack;		char buf[16384];		int bufPos=0;		if ((interval > sizeof(buf)) || (interval < 1))			throw StreamException("Bad ICY Meta Interval value");		unsigned int syncTimeout=0;		unsigned int connectTime = sys->getTime();		while ((thread.active) && sock->active())		{			if (!checkPreview(connectTime))				throw StreamException("Preview time limit reached");			if (!ch->isActive())				throw StreamException("Channel closed");			currPos = ch->chanData.readPacket(currPos,pack);			MemoryStream mem(pack.data,pack.len);			if (pack.type == 'SYNC')			{				chanSync = mem.readLong();				int rs = mySync-chanSync;				if (rs > ChanPacketBuffer::MAX_PACKETS)					mySync = chanSync-1;			}			if (pack.type == 'DATA')			{				// skip packets until we`re back in sync				if (chanSync > mySync)				{					syncTimeout = 0;					mySync = chanSync;					int len = pack.len;					char *p = pack.data;					while (len)					{						int rl = len;						if ((bufPos+rl) > interval)							rl = interval-bufPos;						memcpy(&buf[bufPos],p,rl);						bufPos+=rl;						p+=rl;						len-=rl;						if (bufPos >= interval)						{							bufPos = 0;								sock->write(buf,interval);							if (chanMgr->broadcastMsgInterval)								if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)								{									showMsg ^= true;									lastMsgTime = sys->getTime();								}							String *metaTitle = &ch->info.track.title;							if (!ch->info.comment.isEmpty() && (showMsg))								metaTitle = &ch->info.comment;							if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))							{								char tmp[1024];								String title,url;								title = *metaTitle;								url = ch->info.url;								title.convertTo(String::T_META);								url.convertTo(String::T_META);								sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());								int len = ((strlen(tmp) + 15+1) / 16);								sock->writeChar(len);								sock->write(tmp,len*16);								lastTitle = *metaTitle;								lastURL = ch->info.url;								LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());							}else							{								sock->writeChar(0);												}						}					}				}else				{					LOG_DEBUG("SYNC wait: %d - %d",mySync-chanSync,syncTimeout);				}			}			syncTimeout++;			if (syncTimeout > 30)				throw StreamException("Unable to sync");		}	}catch(StreamException &e)	{		LOG_ERROR("Stream channel: %s",e.msg);	}	if (ch->numListeners)		ch->numListeners--;}// -----------------------------------void Servent::sendChannel(Channel *ch){	ch->numRelays++;	outputBitrate = ch->getBitrate();	try	{		sock->timeout = 10000;		setStatus(S_CONNECTED);		LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());		currPos=0;		sock->writeTag("PCST");		ChanPacket pack;		pack.init('HEAD',ch->headMeta.data,ch->headMeta.len);		pack.write(*sock);		pack.init('META',ch->insertMeta.data,ch->insertMeta.len);		pack.write(*sock);				while ((thread.active) && sock->active())		{			if (!ch->isActive())				throw StreamException("Channel closed");			unsigned int np = ch->chanData.readPacket(currPos,pack);			if ((np-currPos) > 1)				LOG_DEBUG("sendChannel skip: %d",np-currPos);			currPos = np;			pack.write(*sock);		}	}catch(StreamException &e)	{		LOG_ERROR("Stream channel: %s",e.msg);	}	if (ch->numRelays)		ch->numRelays--;}// -----------------------------------int Servent::serverProc(ThreadInfo *thread){	thread->lock();	Servent *sv = (Servent*)thread->data;	try 	{		if (!sv->sock)			throw StreamException("Server has no socket");		sv->setStatus(S_LISTENING);		//LOG4("Listening on port %d",sv->sock->host.port);		char servIP[64];		sv->sock->host.toStr(servIP);		if (servMgr->isRoot)			LOG_DEBUG("Root Server started: %s",servIP);		else			LOG_DEBUG("Server started: %s",servIP);				while ((thread->active) && (sv->sock->active()))		{			ClientSocket *cs = sv->sock->accept();			if (cs)			{								// if global IP then we`re not firewalled				if (cs->host.globalIP())					servMgr->setFirewall(ServMgr::FW_OFF);				Servent *ns = servMgr->allocServent();				if (ns)					ns->initIncoming(cs,sv->allow);				else					LOG_ERROR("Out of servents");			}		}	}catch(StreamException &e)	{		LOG_ERROR("Server Error: %s:%d",e.msg,e.err);	}		LOG_DEBUG("Server stopped");	sv->endThread();	return 0;} 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -