⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.cpp.svn-base

📁 这是和p2p相关的一份源码
💻 SVN-BASE
📖 第 1 页 / 共 5 页
字号:
			utf8 = info.track.contact;

		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());

	}else if (var == "contactURL")
		sprintf(buf,"%s",info.url.cstr());
	else if (var == "streamPos")
		sprintf(buf,"%d",streamPos);
	else if (var == "sourceType")
		strcpy(buf,getSrcTypeStr());
	else if (var == "sourceProtocol")
		strcpy(buf,ChanInfo::getProtocolStr(info.srcProtocol));
	else if (var == "sourceURL")
	{
		if (sourceURL.isEmpty())
			sourceHost.host.toStr(buf);
		else
			strcpy(buf,sourceURL.cstr());
	}
	else if (var == "headPos")
		sprintf(buf,"%d",headPack.pos);
	else if (var == "headLen")
		sprintf(buf,"%d",headPack.len);
	else if (var == "numHits")
	{
		ChanHitList *chl = chanMgr->findHitListByID(info.id);
		int numHits = 0;
		if (chl)
			numHits = chl->numHits();
		sprintf(buf,"%d",numHits);
	}else
		return false;

	out.writeString(buf);
	return true;
}

// -----------------------------------
void ChanMgr::broadcastTrackerUpdate(GnuID &svID, bool force)
{
	for(int i=0; i<MAX_CHANNELS; i++)
	{
		Channel *c = &channels[i];
		if ( c->isActive() && c->isBroadcasting() )
			c->broadcastTrackerUpdate(svID,force);
	}
}

// -----------------------------------
int ChanMgr::broadcastPacketUp(ChanPacket &pack,GnuID &chanID, GnuID &srcID, GnuID &destID)
{
	int cnt=0;

	for(int i=0; i<MAX_CHANNELS; i++)
		if (channels[i].sendPacketUp(pack,chanID,srcID,destID))
			cnt++;

	return cnt;
}
// -----------------------------------void ChanMgr::broadcastRelays(Servent *serv, int minTTL, int maxTTL){	//if ((servMgr->getFirewall() == ServMgr::FW_OFF) || servMgr->serverHost.localIP())	{		Host sh = servMgr->serverHost;		bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF);		bool busy = (servMgr->pubInFull() && servMgr->outFull()) || servMgr->relaysFull();		bool stable = servMgr->totalStreams>0;
		GnuPacket hit;		int numChans=0;		for(int i=0; i<MAX_CHANNELS; i++)		{			Channel *c = &channels[i];			if (c->isPlaying())			{

				bool tracker = c->isBroadcasting();
				int ttl = (c->info.getUptime() / servMgr->relayBroadcast);	// 1 hop per N seconds				if (ttl < minTTL)					ttl = minTTL;				if (ttl > maxTTL)					ttl = maxTTL;				if (hit.initHit(sh,c,NULL,push,busy,stable,tracker,ttl))				{					int numOut=0;					numChans++;					if (serv)					{						serv->outputPacket(hit,false);						numOut++;					}					LOG_NETWORK("Sent ch.%d to %d servents, TTL %d",c->index,numOut,ttl);						}			}		}		//if (numChans)		//	LOG_NETWORK("Sent %d channels to %d servents",numChans,numOut);			}}// -----------------------------------
void ChanMgr::setUpdateInterval(unsigned int v)
{
	hostUpdateInterval = v;
}


// -----------------------------------
// message check
#if 0
				ChanPacket pack;
				MemoryStream mem(pack.data,sizeof(pack.data));
				AtomStream atom(mem);
				atom.writeParent(PCP_BCST,3);
					atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ALL);
					atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
					atom.writeParent(PCP_MESG,1);
						atom.writeString(PCP_MESG_DATA,msg.cstr());

				mem.len = mem.pos;
				mem.rewind();
				pack.len = mem.len;

				GnuID noID;
				noID.clear();

				BroadcastState bcs;
				PCPStream::readAtom(atom,bcs);
				//int cnt = servMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
				//int cnt = servMgr->broadcastPacketDown(pack,noID,servMgr->sessionID);
				//int cnt = chanMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
				//LOG_DEBUG("Sent message to %d clients",cnt);
#endif
// -----------------------------------void ChanMgr::setBroadcastMsg(String &msg){
	if (!msg.isSame(broadcastMsg))	{		broadcastMsg = msg;		for(int i=0; i<MAX_CHANNELS; i++)		{			Channel *c = &channels[i];			if (c->isActive() && c->isBroadcasting())
			{
				ChanInfo newInfo = c->info;				newInfo.comment = broadcastMsg;				c->updateInfo(newInfo);
			}		}	}}

// -----------------------------------void ChanMgr::clearHitLists(){	for(int i=0; i<MAX_HITLISTS; i++)	{		peercastApp->delChannel(&hitlists[i].info);		hitlists[i].init();	}}// -----------------------------------Channel *ChanMgr::createChannel(ChanInfo &info, const char *mount){	lock.on();

	int i;
	Channel *nc=NULL;	for(i=0; i<MAX_CHANNELS; i++)	{		Channel *c = &channels[i];		if (!c->isActive())
		{
			nc=c;
			break;
		}
	}


	if (!nc)
	{
		for(i=0; i<MAX_CHANNELS; i++)
		{
			Channel *c = &channels[i];
			if (!c->isPlaying())
			{
				c->close();
				for(int j=0; j<100; j++)	// wait 10 seconds
				{
					if (!c->isActive())
					{
						nc = c;
						break;
					}
					sys->sleep(100);
				}
				if (nc)
					break;
			}
		}
	}



	if (nc)
	{		nc->info = info;		nc->info.lastPlayStart = 0;		nc->info.lastPlayEnd = 0;
		nc->info.status = ChanInfo::S_UNKNOWN;		if (mount)			nc->mount.set(mount);		nc->index = i+1;		nc->setStatus(Channel::S_WAIT);		nc->type = Channel::T_ALLOCATED;
		nc->info.createdTime = sys->getTime();

		LOG_CHANNEL("New channel (%d) created",nc->index);
		lock.off();		return nc;	}else
	{		LOG_ERROR("Unable to create channel");
		lock.off();		return NULL;
	}}
// -----------------------------------
int ChanMgr::pickHits(ChanHitSearch &chs)
{
	for(int i=0; i<MAX_HITLISTS; i++)
		if (hitlists[i].isUsed())
			if (hitlists[i].pickHits(chs))
			{
				hitlists[i].info.id;
				return 1;
			}
	return 0;
}
// -----------------------------------ChanHitList *ChanMgr::findHitList(ChanInfo &info){
	for(int i=0; i<MAX_HITLISTS; i++)		if (hitlists[i].isUsed())			if (hitlists[i].info.matchNameID(info))				return &hitlists[i];	return NULL;}// -----------------------------------
ChanHitList *ChanMgr::findHitListByID(GnuID &id)
{
	for(int i=0; i<MAX_HITLISTS; i++)
		if (hitlists[i].isUsed())
			if (hitlists[i].info.id.isSame(id))
				return &hitlists[i];
	return NULL;
}
// -----------------------------------int ChanMgr::numHitLists(){	int num=0;	for(int i=0; i<MAX_HITLISTS; i++)		if (hitlists[i].isUsed())			num++;	return num;}// -----------------------------------ChanHitList *ChanMgr::addHitList(ChanInfo &info){	ChanHitList *hl = NULL;	for(int i=0; i<MAX_HITLISTS; i++)		if (!hitlists[i].isUsed())		{
			char idstr[64];
			info.id.toStr(idstr);
			LOG_DEBUG("Created new hitlist: %s",idstr);			hl = &hitlists[i];
			break;		}	if (hl)	{
		hl->used = true;
		hl->info = info;		hl->info.createdTime = sys->getTime();
		peercastApp->addChannel(&hl->info);
	}	return hl;}// -----------------------------------void ChanMgr::clearDeadHits(bool clearTrackers){	unsigned int interval;
	
	if (servMgr->isRoot)
		interval = 1200;		// mainly for old 0.119 clients
	else
		interval = hostUpdateInterval+30;

	for(int i=0; i<MAX_HITLISTS; i++)		if (hitlists[i].isUsed())
		{
			if (hitlists[i].clearDeadHits(interval,clearTrackers) == 0)			{
				if (!isBroadcasting(hitlists[i].info.id))
				{
					if (!chanMgr->findChannelByID(hitlists[i].info.id))
					{						LOG_DEBUG("Deleting hitlist");
						peercastApp->delChannel(&hitlists[i].info);						hitlists[i].init();
					}
				}			}
		}}// -----------------------------------
bool	ChanMgr::isBroadcasting(GnuID &id)
{
	Channel *ch = findChannelByID(id);
	if (ch)
		return ch->isBroadcasting();

	return false;
}
// -----------------------------------
bool	ChanMgr::isBroadcasting()
{
	for(int i=0; i<MAX_CHANNELS; i++)
		if (channels[i].isActive())
			if (channels[i].isBroadcasting())
				return true;
	return false;
}
// -----------------------------------int ChanMgr::numChannels(){	int tot = 0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			tot++;	return tot;}// -----------------------------------
void ChanMgr::deadHit(ChanHit &hit)
{
	ChanHitList *chl = findHitListByID(hit.chanID);
	if (chl)
		chl->deadHit(hit);
}// -----------------------------------
void ChanMgr::delHit(ChanHit &hit)
{
	ChanHitList *chl = findHitListByID(hit.chanID);
	if (chl)
		chl->delHit(hit);
}

// -----------------------------------ChanHit *ChanMgr::addHit(ChanHit &h){	if (searchActive)		lastHit = sys->getTime();

	ChanHitList *hl=NULL;

	hl = findHitListByID(h.chanID);
	if (!hl)
	{
		ChanInfo info;
		info.id = h.chanID;
		hl = addHitList(info);
	}
	if (hl)
	{		return hl->addHit(h);
	}else
		return NULL;}// -----------------------------------class ChanFindInfo : public ThreadInfo{public:	ChanInfo	info;	bool	keep;};// -----------------------------------THREAD_PROC findAndPlayChannelProc(ThreadInfo *th){	ChanFindInfo *cfi = (ChanFindInfo *)th;	ChanInfo info;	info = cfi->info;
	Channel *ch = chanMgr->findChannelByNameID(info);
	chanMgr->currFindAndPlayChannel = info.id;
	if (!ch)		ch = chanMgr->findAndRelay(info);	if (ch)	{
		// check that a different channel hasn`t be selected already.
		if (chanMgr->currFindAndPlayChannel.isSame(ch->info.id))
			chanMgr->playChannel(ch->info);
		if (cfi->keep)			ch->stayConnected = cfi->keep;	}	delete cfi;	return 0;}// -----------------------------------void ChanMgr::findAndPlayChannel(ChanInfo &info, bool keep){	ChanFindInfo *cfi = new ChanFindInfo;	cfi->info = info;	cfi->keep = keep;	cfi->func = findAndPlayChannelProc;

	sys->startThread(cfi);}
// -----------------------------------
void ChanMgr::playChannel(ChanInfo &info)
{

	char str[128],fname[256],idStr[128];

	sprintf(str,"http://localhost:%d",servMgr->serverHost.port);
	info.id.toStr(idStr);

	PlayList::TYPE type;


	if ((info.contentType == ChanInfo::T_WMA) || (info.contentType == ChanInfo::T_WMV))
	{
		type = PlayList::T_ASX;
		// WMP seems to have a bug where it doesn`t re-read asx files if they have the same name
		// so we prepend the channel id to make it unique - NOTE: should be deleted afterwards.
		sprintf(fname,"%s/%s.asx",peercastApp->getPath(),idStr);	
	}else
	{
		type = PlayList::T_SCPLS;
		sprintf(fname,"%s/play.pls",peercastApp->getPath());
	}


	PlayList *pls = new PlayList(type,1);
	pls->addChannel(str,info);


	LOG_DEBUG("Writing %s",fname);
	FileStream file;
	file.openWriteReplace(fname);
	pls->write(file);
	file.close();


	LOG_DEBUG("Executing: %s",fname);
	sys->executeFile(fname);
	delete pls;

}
// -----------------------------------void ChanHitList::init(){	info.init();	memset(hits,0,sizeof(ChanHit)*MAX_HITS);	lastHitTime = 0;
	used = false;
}
// -----------------------------------
void ChanHit::pickNearestIP(Host &h)
{
	for(int i=0; i<2; i++)
	{
		if (h.classType() == rhost[i].classType())
		{
			host = rhost[i];
			break;
		}
	}
}

// -----------------------------------
void ChanHit::init()
{
	host.init();

	rhost[0].init();
	rhost[1].init();

	numListeners = 0;
	numRelays = 0;

	dead = tracker = firewalled = stable = yp = false;
	recv = cin = direct = relay = true;

	direct = 0;
	numHops = 0;
	time = upTime = 0;
	agentStr[0]=0;
	lastContact = 0;

	version = 0;

	sessionID.clear();
	chanID.clear();
}

// -----------------------------------
void ChanHit::initLocal(int numl,int numr,int,int uptm,bool connected)
{
	init();
	firewalled = (servMgr->getFirewall() != ServMgr::FW_OFF);
	numListeners = numl;
	numRelays = numr;
	upTime = uptm;
	stable = servMgr->totalStreams>0;
	strcpy(agentStr,PCX_AGENT);
	sessionID = servMgr->sessionID;
	recv = connected;

	direct = !servMgr->directFull();
	relay = !servMgr->relaysFull();
	cin = !servMgr->controlInFull();

	host = servMgr->serverHost;

	rhost[0] = Host(host.ip,host.port);
	rhost[1] = Host(ClientSocket::getIP(NULL),host.port);

	if (firewalled)
		rhost[0].port = 0;
}

// -----------------------------------
void ChanHit::writeAtoms(AtomStream &atom,bool tryHost, GnuID &chanID)
{
	bool addChan=chanID.isSet();	

	if (tryHost)
	{
		atom.writeParent(PCP_HOST,5 + (addChan?1:0));
			if (addChan)
				atom.writeBytes(PCP_HOST_CHANID,chanID.id,16);
			atom.writeInt(PCP_HOST_IP,rhost[0].ip);
			atom.writeShort(PCP_HOST_PORT,rhost[0].port);
			atom.writeInt(PCP_HOST_IP,rhost[1].ip);
			atom.writeShort(PCP_HOST_PORT,rhost[1].port);
			atom.writeChar(PCP_HOST_TRACKER,tracker);
	}else
	{
		atom.writeParent(PCP_HOST,15  + (addChan?1:0));
			if (addChan)
				atom.writeBytes(PCP_HOST_CHANID,chanID.id,16);
			atom.writeBytes(PCP_HOST_ID,sessionID.id,16);
			atom.writeInt(PCP_HOST_IP,rhost[0].ip);
			atom.writeShort(PCP_HOST_PORT,rhost[0].port);
			atom.writeInt(PCP_HOST_IP,rhost[1].ip);
			atom.writeShort(PCP_HOST_PORT,rhost[1].port);
			atom.writeInt(PCP_HOST_NUML,numListeners);
			atom.writeInt(PCP_HOST_NUMR,numRelays);
			atom.writeInt(PCP_HOST_UPTIME,upTime);
			atom.writeInt(PCP_HOST_VERSION,PCP_CLIENT_VERSION);



			// depreciated
			atom.writeString(PCP_HOST_AGENT,agentStr);
			atom.writeChar(PCP_HOST_BUSY,!relay);
			atom.writeChar(PCP_HOST_PUSH,firewalled);
			atom.writeChar(PCP_HOST_RECV,recv);
			atom.writeChar(PCP_HOST_TRACKER,tracker);


			int fl1 = 0; 
			if (recv) fl1 |= PCP_HOST_FLAGS1_RECV;
			if (relay) fl1 |= PCP_HOST_FLAGS1_RELAY;
			if (direct) fl1 |= PCP_HOST_FLAGS1_DIRECT;
			if (cin) fl1 |= PCP_HOST_FLAGS1_CIN;
			if (tracker) fl1 |= PCP_HOST_FLAGS1_TRACKER;
			if (firewalled) fl1 |= PCP_HOST_FLAGS1_PUSH;

			atom.writeChar(PCP_HOST_FLAGS1,fl1);
	}

}
// -----------------------------------
bool	ChanHit::writeVariable(Stream &out, const String &var)
{
	char buf[1024];

	if (var == "rhost0")

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -