⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.cpp

📁 这是和p2p相关的一份源码
💻 CPP
📖 第 1 页 / 共 5 页
字号:
				if (!c->isActive())
					break;
				sys->sleep(100);
			}
			if (c->isActive())
				break;
			c->info = info;			c->info.lastPlayTime = 0;			c->info.status = ChanInfo::S_UNKNOWN;			if (mount)				c->mount.set(mount);			c->index = i+1;			c->setStatus(Channel::S_WAIT);			c->type = Channel::T_ALLOCATED;

			LOG_CHANNEL("New channel (%d) created",c->index);			lock.off();			return c;		}
	}	lock.off();	return NULL;}// -----------------------------------ChanHitList *ChanMgr::findHitList(ChanInfo &info){
	for(int i=0; i<MAX_HITLISTS; i++)		if (hitlists[i].isUsed())			if (hitlists[i].info.matchNameID(info))				return &hitlists[i];	return NULL;}// -----------------------------------
ChanHitList *ChanMgr::findHitListByID(GnuID &id)
{
	for(int i=0; i<MAX_HITLISTS; i++)
		if (hitlists[i].isUsed())
			if (hitlists[i].info.id.isSame(id))
				return &hitlists[i];
	return NULL;
}
// -----------------------------------int ChanMgr::numHitLists(){	int num=0;	for(int i=0; i<MAX_HITLISTS; i++)		if (hitlists[i].isUsed())			num++;	return num;}// -----------------------------------ChanHitList *ChanMgr::addHitList(ChanInfo &info){	ChanHitList *hl = NULL;	for(int i=0; i<MAX_HITLISTS; i++)		if (!hitlists[i].isUsed())		{
			char idstr[64];
			info.id.toStr(idstr);
			LOG_DEBUG("Created new hitlist: %s",idstr);			hl = &hitlists[i];			break;		}	if (hl)	{		hl->info = info;		peercastApp->addChannel(&hl->info);
	}	return hl;}// -----------------------------------void ChanMgr::clearDeadHits(bool clearTrackers){	for(int i=0; i<MAX_HITLISTS; i++)		if (hitlists[i].isUsed())
		{
			unsigned int interval;
			if (hitlists[i].numTrackers())
				interval = hostUpdateInterval+30;
			else
				interval = 1200;		// old YP

			if (hitlists[i].clearDeadHits(interval,clearTrackers) == 0)			{
				if (!isBroadcasting(hitlists[i].info.id))
				{
					if (!chanMgr->findChannelByID(hitlists[i].info.id))
					{						LOG_DEBUG("Deleting hitlist");
						peercastApp->delChannel(&hitlists[i].info);						hitlists[i].init();
					}
				}			}
		}}// -----------------------------------
bool	ChanMgr::isBroadcasting(GnuID &id)
{
	Channel *ch = findChannelByID(id);
	if (ch)
		return ch->isBroadcasting();

	return false;
}
// -----------------------------------
bool	ChanMgr::isBroadcasting()
{
	for(int i=0; i<MAX_CHANNELS; i++)
		if (channels[i].isActive())
			if (channels[i].isBroadcasting())
				return true;
	return false;
}
// -----------------------------------int ChanMgr::numConnected(){	int tot = 0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			tot++;	return tot;}// -----------------------------------int ChanMgr::numRelayed(){	int tot = 0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].isPlaying())				if (channels[i].numRelays()>0)					tot++;	return tot;}// -----------------------------------int ChanMgr::numListeners(){	int tot = 0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].isPlaying())				if (channels[i].numListeners()>0)					tot++;	return tot;}// -----------------------------------int ChanMgr::numIdle(){	int tot = 0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].status == Channel::S_IDLE)				tot++;	return tot;}// -----------------------------------unsigned int ChanMgr::totalInput(){	unsigned int tot = 0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].isPlaying())				if (channels[i].sock)					tot+=channels[i].sock->bytesInPerSec;	return tot;}
// -----------------------------------
void ChanMgr::deadHit(GnuID &id, ChanHit &hit)
{
	ChanHitList *chl = findHitListByID(id);
	if (chl)
		chl->deadHit(hit);
}// -----------------------------------ChanHit	*ChanMgr::addHit(GnuID &id, ChanHit &h){	if (searchActive)		lastHit = sys->getTime();

	ChanHitList *hl=NULL;

	hl = findHitListByID(id);
	if (!hl)
	{
		ChanInfo info;
		info.id = id;
		hl = addHitList(info);
	}
	if (hl)	{
		return hl->addHit(h);	}else		return NULL;}// -----------------------------------class ChanFindInfo : public ThreadInfo{public:	ChanInfo	info;	bool	keep;};// -----------------------------------THREAD_PROC findAndPlayChannelProc(ThreadInfo *th){	ChanFindInfo *cfi = (ChanFindInfo *)th;	ChanInfo info;	info = cfi->info;	Channel *ch = chanMgr->findChannelByNameID(info);	if (!ch)		ch = chanMgr->findAndRelay(info);	if (ch)	{
		chanMgr->playChannel(ch->info);		if (cfi->keep)			ch->stayConnected = cfi->keep;	}	delete cfi;	return 0;}// -----------------------------------void ChanMgr::findAndPlayChannel(ChanInfo &info, bool keep){	ChanFindInfo *cfi = new ChanFindInfo;	cfi->info = info;	cfi->keep = keep;	cfi->func = findAndPlayChannelProc;	sys->startThread(cfi);}
// -----------------------------------
void ChanMgr::playChannel(ChanInfo &info)
{
	char str[128],fname[128],idStr[128];

	sprintf(str,"http://localhost:%d",servMgr->serverHost.port);
	info.id.toStr(idStr);

	PlayList::TYPE type;


	if ((info.contentType == ChanInfo::T_WMA) || (info.contentType == ChanInfo::T_WMV))
	{
		type = PlayList::T_ASX;
		// WMP seems to have a bug where it doesn`t re-read asx files if they have the same name
		// so we prepend the channel id to make it unique - NOTE: should be deleted afterwards.
		sprintf(fname,"%s.asx",idStr);	
	}else
	{
		type = PlayList::T_SCPLS;
		sprintf(fname,"play.pls");
	}


	PlayList *pls = new PlayList(type,1);
	pls->addChannel(str,info);


	FileStream file;
	file.openWriteReplace(fname);
	pls->write(file);
	file.close();


	LOG_DEBUG("Executing: %s",fname);
	sys->executeFile(fname);
	delete pls;

}
// -----------------------------------void ChanHitList::init(){	info.init();	memset(hits,0,sizeof(ChanHit)*MAX_HITS);	lastHitTime = 0;}
// -----------------------------------
ChanHit ChanHitList::findHit(GnuID &sid, bool tracker)
{
	ChanHit hit;
	hit.init();

	for(int i=0; i<MAX_HITS; i++)
		if (hits[i].host.ip)
			if (hits[i].sessionID.isSame(sid))
				if (hits[i].tracker == tracker)
				{
					hit = hits[i];
					break;
				}

	return hit;
}
// -----------------------------------
void ChanHit::pickNearestIP(Host &h)
{
	for(int i=0; i<2; i++)
	{
		if (h.classType() == rhost[i].classType())
		{
			host = rhost[i];
			break;
		}
	}
}

// -----------------------------------
void ChanHit::init()
{
	host.init();

	rhost[0].init();
	rhost[1].init();

	numListeners = 0;
	numRelays = 0;
	firewalled = busy = stable = false;
	hops = 0;
	index = 0;
	time = upTime = 0;
	agentStr[0]=0;
	numSkips=0;
	packetID.clear();
	maxPreviewTime=0;
	tracker=false;
	lastContact = 0;
	recv = true;
	yp=false;
}

// -----------------------------------
void ChanHit::initLocal(int numl,int numr,int nums,int uptm,bool connected,bool b)
{
	init();
	busy = b;
	firewalled = (servMgr->getFirewall() != ServMgr::FW_OFF);
	numListeners = numl;
	numRelays = numr;
	numSkips = nums;
	upTime = uptm;
	stable = servMgr->totalStreams>0;
	strcpy(agentStr,PCX_AGENT);
	sessionID = servMgr->sessionID;
	recv = connected;

	host = servMgr->serverHost;

	rhost[0] = Host(host.ip,host.port);
	rhost[1] = Host(ClientSocket::getIP(NULL),host.port);

	if (firewalled)
		rhost[0].port = 0;
}

// -----------------------------------
void ChanHit::writeAtoms(AtomStream &atom,bool tryHost, GnuID &chanID)
{
	bool addChan=chanID.isSet();	

	if (tryHost)
	{
		atom.writeParent(PCP_HOST,5 + (addChan?1:0));
			if (addChan)
				atom.writeBytes(PCP_HOST_CHANID,chanID.id,16);
			atom.writeInt(PCP_HOST_IP,rhost[0].ip);
			atom.writeShort(PCP_HOST_PORT,rhost[0].port);
			atom.writeInt(PCP_HOST_IP,rhost[1].ip);
			atom.writeShort(PCP_HOST_PORT,rhost[1].port);
			atom.writeChar(PCP_HOST_TRACKER,tracker);
	}else
	{
		atom.writeParent(PCP_HOST,13  + (addChan?1:0));
			if (addChan)
				atom.writeBytes(PCP_HOST_CHANID,chanID.id,16);
			atom.writeBytes(PCP_HOST_ID,sessionID.id,16);
			atom.writeInt(PCP_HOST_IP,rhost[0].ip);
			atom.writeShort(PCP_HOST_PORT,rhost[0].port);
			atom.writeInt(PCP_HOST_IP,rhost[1].ip);
			atom.writeShort(PCP_HOST_PORT,rhost[1].port);
			atom.writeChar(PCP_HOST_BUSY,busy);
			atom.writeChar(PCP_HOST_PUSH,firewalled);
			atom.writeChar(PCP_HOST_RECV,recv);
			atom.writeInt(PCP_HOST_NUML,numListeners);
			atom.writeInt(PCP_HOST_NUMR,numRelays);
			atom.writeString(PCP_HOST_AGENT,agentStr);
			atom.writeInt(PCP_HOST_UPTIME,upTime);
			atom.writeChar(PCP_HOST_TRACKER,tracker);
	}

}
// -----------------------------------
int ChanHitList::getTotalListeners()
{
	int cnt=0;
	for(int i=0; i<MAX_HITS; i++)
		if (hits[i].host.ip)
			cnt+=hits[i].numListeners;
	return cnt;
}
// -----------------------------------
int ChanHitList::getTotalRelays()
{
	int cnt=0;
	for(int i=0; i<MAX_HITS; i++)
		if (hits[i].host.ip)
			cnt+=hits[i].numRelays;
	return cnt;
}
// -----------------------------------
int ChanHitList::getTotalFirewalled()
{
	int cnt=0;
	for(int i=0; i<MAX_HITS; i++)
		if (hits[i].host.ip)
			if (hits[i].firewalled)
				cnt++;
	return cnt;
}

// -----------------------------------
int ChanHitList::contactTrackers(bool connected, int numl, int nums, int uptm)
{
	return 0;
}
// -----------------------------------bool	ChanHitList::isAvailable() {	return (numHits()-numBusy())>0;}// -----------------------------------ChanHit *ChanHitList::addHit(ChanHit &h){	int i;
	// dont add our own hits
	if (servMgr->sessionID.isSame(h.sessionID))
		return NULL;

	lastHitTime = sys->getTime();	h.time = lastHitTime;	for(i=0; i<MAX_HITS; i++)
	{
		ChanHit &ch = hits[i];
		if ((ch.rhost[0].ip == h.rhost[0].ip) && (ch.rhost[0].port == h.rhost[0].port))			if (((ch.rhost[1].ip == h.rhost[1].ip) && (ch.rhost[1].port == h.rhost[1].port)) || (!ch.rhost[1].isValid()))
			{				hits[i] = h;				return &hits[i];			}
	}	for(i=0; i<MAX_HITS; i++)		if (hits[i].host.ip == 0)		{
			hits[i] = h;			return &hits[i];		}	return NULL;}// -----------------------------------int	ChanHitList::clearDeadHits(unsigned int timeout, bool clearTrackers){	int cnt=0;	unsigned int ctime = sys->getTime();	for(int i=0; i<MAX_HITS; i++)		if (hits[i].host.ip)		{
			if (((ctime-hits[i].time) > timeout) && (clearTrackers || (!clearTrackers & !hits[i].tracker)))
				deadHit(hits[i]);
			else
				cnt++;		}	return cnt;}// -----------------------------------void	ChanHitList::deadHit(ChanHit &h){	for(int i=0; i<MAX_HITS; i++)		if (hits[i].rhost[0].isSame(h.rhost[0]) && hits[i].rhost[1].isSame(h.rhost[1]))
		{			hits[i].init();
		}}// -----------------------------------int	ChanHitList::numHits(){	int cnt=0;	for(int i=0; i<MAX_HITS; i++)		if (hits[i].host.ip)
			cnt++;	return cnt;}
// -----------------------------------int	ChanHitList::numListeners(){	int cnt=0;	for(int i=0; i<MAX_HITS; i++)		if (hits[i].host.ip)			cnt += hits[i].numListeners;	return cnt;}// -----------------------------------
int	ChanHitList::numTrackers()
{
	int cnt=0;
	for(int i=0; i<MAX_HITS; i++)
		if ((hits[i].host.ip) && (hits[i].tracker))
			cnt++;

	return cnt;
}
// -----------------------------------int	ChanHitList::numBusy(){	int cnt=0;	for(int i=0; i<MAX_HITS; i++)		if (hits[i].host.ip)
			cnt += hits[i].busy?1:0;	return cnt;}// -----------------------------------int	ChanHitList::numStable(){	int cnt=0;	for(int i=0; i<MAX_HITS; i++)		if (hits[i].host.ip)
			cnt += hits[i].stable?1:0;	return cnt;}// -----------------------------------int	ChanHitList::numFirewalled(){	int cnt=0;	for(int i=0; i<MAX_HITS; i++)		if (hits[i].host.ip)
			cnt += hits[i].firewalled?1:0;	return cnt;}// -----------------------------------int	ChanHitList::closestHit(){	int hop=10000;	for(int i=0; i<MAX_HITS; i++)		if (hits[i].host.ip)			if (hits[i].hops < hop)				hop = hits[i].hops;	return hop;}// -----------------------------------int	ChanHitList::furthestHit(){	int hop=0;	for(int i=0; i<MAX_HITS; i++)		if (hits[i].host.ip)			if (hits[i].hops > hop)				hop = hits[i].hops;	return hop;}// -----------------------------------unsigned int	ChanHitList::newestHit(){	unsigned int time=0;	for(int i=0; i<MAX_HITS; i++)		if (hits[i].host.ip)			if (hits[i].time > time)				time = hits[i].time;	return time;}// -----------------------------------ChanHit	ChanHitList::getHit(Host *sh, unsigned int wait, bool useFirewalled, bool trackersOnly, bool useBusy){
	ChanHit best,*bestP=NULL;
	best.init();
	best.hops = 255;

	unsigned int ctime = sys->getTime();

	for(int i=0; i<MAX_HITS; i++)		{		ChanHit *c = &hits[i];		if (c->host.ip)
		{
			if ((wait==0) || ((ctime-c->lastContact) >= wait))
			if ((!c->busy || (c->busy && useBusy)) && (c->hops<best.hops))
			{

				if (trackersOnly && c->tracker)
				{

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -