⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.cpp

📁 这是和p2p相关的一份源码
💻 CPP
📖 第 1 页 / 共 5 页
字号:
		default:		type = T_UNKNOWN;
	}
	len = in.readShort();	in.readShort();	if (len > MAX_DATALEN)		throw StreamException("Bad ChanPacket");	in.read(data,len);}// -----------------------------------
int ChanPacketBuffer::copyFrom(ChanPacketBuffer &buf, unsigned int reqPos)
{
	lock.on();
	buf.lock.on();

	firstPos = 0;
	lastPos = 0;
	safePos = 0;
	readPos = 0;

	for(unsigned int i=buf.firstPos; i<=buf.lastPos; i++)
	{
		ChanPacket *src = &buf.packets[i%MAX_PACKETS];
		if (src->type & accept)
		{
			if (src->pos >= reqPos)
			{
				lastPos = writePos;
				packets[writePos++] = *src;
			}
		}

	}


	buf.lock.off();
	lock.off();
	return lastPos-firstPos;
}
// -----------------------------------
bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
{
	if (writePos == 0)
		return false;

	unsigned int fpos = getStreamPos(firstPos);
	if (spos < fpos)
		spos = fpos;

	for(unsigned int i=firstPos; i<=lastPos; i++)
	{
		ChanPacket &p = packets[i%MAX_PACKETS];
		if (p.pos >= spos)
		{
			pack = p;
			return true;
		}
	}
	return false;
}
// -----------------------------------
unsigned int	ChanPacketBuffer::getLatestPos()
{
	if (!writePos)
		return 0;
	else
		return getStreamPos(lastPos);
}

// -----------------------------------
unsigned int	ChanPacketBuffer::findOldestPos(unsigned int spos)
{
	unsigned int min = getStreamPos(safePos);
	unsigned int max = getStreamPos(lastPos);

	if (min > spos)
		return min;

	if (max < spos)
		return max;

	return spos;
}

// -----------------------------------
unsigned int	ChanPacketBuffer::getStreamPos(unsigned int index)
{
	return packets[index%MAX_PACKETS].pos;
}
// -----------------------------------
unsigned int	ChanPacketBuffer::getStreamPosEnd(unsigned int index)
{
	return packets[index%MAX_PACKETS].pos+packets[index%MAX_PACKETS].len;
}
// -----------------------------------bool ChanPacketBuffer::writePacket(ChanPacket &pack, bool updateReadPos){
	if (pack.len)
	{
		if (willSkip())	// too far behind
			return false;
		lock.on();
		packets[writePos%MAX_PACKETS] = pack;		lastPos = writePos;		writePos++;
		if (writePos >= MAX_PACKETS)
			firstPos = writePos-MAX_PACKETS;
		else
			firstPos = 0;

		if (writePos >= NUM_SAFEPACKETS)
			safePos = writePos - NUM_SAFEPACKETS;
		else
			safePos = 0;

		if (updateReadPos)
			readPos = writePos;

		lastWriteTime = sys->getTime();
		lock.off();
		return true;
	}

	return false;}// -----------------------------------
void	ChanPacketBuffer::readPacket(ChanPacket &pack)
{

	unsigned int tim = sys->getTime();

	if (readPos < firstPos)	
		throw StreamException("Read too far behind");

	while (readPos >= writePos)
	{
		sys->sleepIdle();
		if ((sys->getTime() - tim) > 30)
			throw TimeoutException();
	}
	lock.on();

	pack = 	packets[readPos%MAX_PACKETS];
	readPos++;

	sys->sleepIdle();

	lock.off();
}
// -----------------------------------
bool	ChanPacketBuffer::willSkip()
{
	return ((writePos-readPos) >= MAX_PACKETS);
}

// -----------------------------------void Channel::getStreamPath(char *str){	char idStr[64];	getIDStr(idStr);	sprintf(str,"/stream/%s%s",idStr,info.getTypeExt(info.contentType));}// -----------------------------------void ChanMgr::startSearch(ChanInfo &info){	searchInfo = info;	clearHitLists();	numFinds = 0;	lastHit = 0;//	lastSearch = 0;	searchActive = true;}// -----------------------------------
void ChanMgr::quit()
{
	closeAll();
}
// -----------------------------------
void ChanMgr::closeAll()
{
	for(int i=0; i<MAX_CHANNELS; i++)
		if (channels[i].isActive())
			channels[i].close();
}// -----------------------------------Channel *ChanMgr::findChannelByNameID(ChanInfo &info){	for(int i=0; i<MAX_CHANNELS; i++)
		if (channels[i].isActive())
			if (channels[i].info.matchNameID(info))
				return &channels[i];
	return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByName(const char *n){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (stricmp(channels[i].info.name,n)==0)				return &channels[i];	return NULL;}// -----------------------------------Channel *ChanMgr::findListenerChannel(){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].numListeners())				return &channels[i];	return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByIndex(int index){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].index == index)				return &channels[i];	return NULL;}	// -----------------------------------Channel *ChanMgr::findChannelByMount(const char *str){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (strcmp(channels[i].mount,str)==0)				return &channels[i];	return NULL;}	// -----------------------------------Channel *ChanMgr::findChannelByID(GnuID &id){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].info.id.isSame(id))				return &channels[i];	return NULL;}	// -----------------------------------int ChanMgr::findChannels(ChanInfo &info, Channel **ch, int max){	int cnt=0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].info.match(info))			{				ch[cnt++] = &channels[i];				if (cnt >= max)					break;			}	return cnt;}// -----------------------------------int ChanMgr::findChannelsByStatus(Channel **ch, int max, Channel::STATUS status){	int cnt=0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].status == status)			{				ch[cnt++] = &channels[i];				if (cnt >= max)					break;			}	return cnt;}// -----------------------------------Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected){	Channel *c = chanMgr->createChannel(info,NULL);	if (c)	{		c->stayConnected = stayConnected;
		c->startGet();		return c;	}	return NULL;}// -----------------------------------Channel *ChanMgr::findAndRelay(ChanInfo &info){	char idStr[64];	info.id.toStr(idStr);	LOG_CHANNEL("Searching for: %s (%s)",idStr,info.name.cstr());	peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Finding channel...");


	Channel *c = NULL;	c = findChannelByNameID(info);	if (!c)	{		c = chanMgr->createChannel(info,NULL);		if (c)
		{			c->setStatus(Channel::S_SEARCHING);			
			c->startGet();
		}	}
	for(int i=0; i<600; i++)	// search for 1 minute.
	{
		c = findChannelByNameID(info);

		if (!c)
		{			peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Channel not found");
			return NULL;
		}

		
		if (c->isPlaying() && (c->info.contentType!=ChanInfo::T_UNKNOWN))
			break;
		sys->sleep(100);	}	return c;}// -----------------------------------ChanMgr::ChanMgr(){	int i;	for(i=0; i<MAX_CHANNELS; i++)		channels[i].init();	for(i=0; i<MAX_HITLISTS; i++)	{		hitlists[i].index = i;		hitlists[i].init();	}	broadcastMsg.clear();	broadcastMsgInterval=10;	broadcastID.generate();	deadHitAge = 600;
	icyIndex = 0;	icyMetaInterval = 8192;		maxStreamsPerChannel = 0;	searchInfo.init();	minBroadcastTTL = 1;	maxBroadcastTTL = 7;	pushTimeout = 60;	// 1 minute	pushTries = 5;		// 5 times	maxPushHops = 8;	// max 8 hops away	maxUptime = 0;		// 0 = none
	prefetchTime = 10;	// n seconds

	hostUpdateInterval = 180; // 2 minutes

	bufferTime = 5;

	trackerHitList.init();
	autoQuery = 0;		lastQuery = 0;

	lastYPConnect = 0;

}
// -----------------------------------
ChanHit	ChanMgr::getTracker(Host *sh,unsigned int wait,bool useFirewalled)
{
	ChanHit hit;
	hit.init();

	if (!hit.host.ip)
		hit = trackerHitList.getHit(sh,wait,useFirewalled,true);


	if (!hit.host.ip)
		for(int i=0; i<MAX_HITLISTS; i++)
			if (hitlists[i].isUsed())
			{
				hit = hitlists[i].getHit(sh,wait,useFirewalled,true);
				if (hit.host.ip)
					break;
			}

	return hit;
}
// -----------------------------------
ChanHit		ChanMgr::findHit(GnuID &sid,bool tracker)
{
	ChanHit hit;
	hit.init();

	for(int i=0; i<MAX_HITLISTS; i++)
		if (hitlists[i].isUsed())
		{
			hit = hitlists[i].findHit(sid,tracker);
			if (hit.host.ip)
				break;
		}
		
	return hit;
}
// -----------------------------------bool ChanMgr::writeVariable(Stream &out, const String &var){	char buf[128];	if (var == "numHitLists")		sprintf(buf,"%d",numHitLists());	else if (var == "numRelayed")		sprintf(buf,"%d",numRelayed());	else if (var == "numConnected")		sprintf(buf,"%d",numConnected());	else if (var == "totalListeners")		sprintf(buf,"%d",numListeners());	else if (var == "totalInPerSec")		sprintf(buf,"%.1f",BYTES_TO_KBPS(totalInput()));	else if (var == "totalOutPerSec")		sprintf(buf,"%.1f",BYTES_TO_KBPS(servMgr->totalOutput(true)));	else if (var == "totalPerSec")		sprintf(buf,"%.1f",BYTES_TO_KBPS(totalInput()+servMgr->totalOutput(true)));	else		return false;	out.writeString(buf);	return true;}
// -----------------------------------
void ChanMgr::broadcastTrackerUpdate(GnuID &svID, bool force)
{
	for(int i=0; i<MAX_CHANNELS; i++)
	{
		Channel *c = &channels[i];
		if ( c->isActive() && c->isBroadcasting() )
			c->broadcastTrackerUpdate(svID,force);
	}
}

// -----------------------------------
int ChanMgr::broadcastPacketUp(ChanPacket &pack,GnuID &chanID, GnuID &srcID, GnuID &destID)
{
	int cnt=0;

	if (destID.isSet())
	{
		for(int i=0; i<MAX_CHANNELS; i++)
			if (channels[i].sendPacketUp(pack,chanID,srcID,destID))
				return 1;
	}

	GnuID noID;
	noID.clear();

	for(int i=0; i<MAX_CHANNELS; i++)
		if (channels[i].sendPacketUp(pack,chanID,srcID,noID))
			cnt++;

	return cnt;
}
// -----------------------------------void ChanMgr::broadcastRelays(Servent *serv, int minTTL, int maxTTL){	if (!servMgr->relayBroadcast)		return;	//if ((servMgr->getFirewall() == ServMgr::FW_OFF) || servMgr->serverHost.localIP())	{		Host sh = servMgr->serverHost;		bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF);		bool busy = (servMgr->pubInFull() && servMgr->outFull()) || servMgr->streamFull();		bool stable = servMgr->totalStreams>0;
		GnuPacket hit;		int numChans=0;		for(int i=0; i<MAX_CHANNELS; i++)		{			Channel *c = &channels[i];			if (c->isPlaying())			{

				bool tracker = c->isBroadcasting();
				int ttl = (c->info.getUptime() / servMgr->relayBroadcast);	// 1 hop per N seconds				if (ttl < minTTL)					ttl = minTTL;				if (ttl > maxTTL)					ttl = maxTTL;				if (hit.initHit(sh,c,NULL,push,busy,stable,tracker,ttl))				{					int numOut=0;					numChans++;					if (serv)					{						serv->outputPacket(hit,false);						numOut++;					}					else						numOut+=servMgr->broadcast(hit,NULL);					LOG_NETWORK("Sent ch.%d to %d servents, TTL %d",c->index,numOut,ttl);						}			}		}		//if (numChans)		//	LOG_NETWORK("Sent %d channels to %d servents",numChans,numOut);			}}// -----------------------------------
void ChanMgr::setUpdateInterval(unsigned int v)
{
	hostUpdateInterval = v;
}


// -----------------------------------
// message check
#if 0
				ChanPacket pack;
				MemoryStream mem(pack.data,sizeof(pack.data));
				AtomStream atom(mem);
				atom.writeParent(PCP_BCST,3);
					atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ALL);
					atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
					atom.writeParent(PCP_MESG,1);
						atom.writeString(PCP_MESG_DATA,msg.cstr());

				mem.len = mem.pos;
				mem.rewind();
				pack.len = mem.len;

				GnuID noID;
				noID.clear();

				BroadcastState bcs;
				PCPStream::readAtom(atom,bcs);
				//int cnt = servMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
				//int cnt = servMgr->broadcastPacketDown(pack,noID,servMgr->sessionID);
				//int cnt = chanMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
				//LOG_DEBUG("Sent message to %d clients",cnt);
#endif
// -----------------------------------void ChanMgr::setBroadcastMsg(String &msg){
	if (!msg.isSame(broadcastMsg))	{		broadcastMsg = msg;		for(int i=0; i<MAX_CHANNELS; i++)		{			Channel *c = &channels[i];			if (c->isActive())
			{

				GnuID id = servMgr->sessionID;				if (c->status == Channel::S_BROADCASTING)				{					c->info.comment = broadcastMsg;					c->updateMeta();
					id = c->info.id;
				}
			}		}	}}

// -----------------------------------void ChanMgr::clearHitLists(){	for(int i=0; i<MAX_HITLISTS; i++)	{		peercastApp->delChannel(&hitlists[i].info);		hitlists[i].init();	}}// -----------------------------------Channel *ChanMgr::createChannel(ChanInfo &info, const char *mount){	lock.on();	for(int i=0; i<MAX_CHANNELS; i++)	{		Channel *c = &channels[i];		if (!c->isPlaying())
		{
			c->close();
			for(int j=0; j<100; j++)	// wait 10 seconds
			{

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -