⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pcp.cpp

📁 P2P应用 : Peercast的源代码
💻 CPP
📖 第 1 页 / 共 2 页
字号:
	hit.init();
	GnuID chanID = bcs.chanID;	//use default

	bool busy=false;

	unsigned int ipNum=0;

	for(int i=0; i<numc; i++)
	{
		int c,d;
		ID4 id = atom.read(c,d);

		if (id == PCP_HOST_IP)
		{
			unsigned int ip = atom.readInt();
			hit.rhost[ipNum].ip = ip;
		}else if (id == PCP_HOST_PORT)
		{
			int port = atom.readShort();
			hit.rhost[ipNum++].port = port;

			if (ipNum > 1)
				ipNum = 1;
		}
		else if (id == PCP_HOST_NUML)
			hit.numListeners = atom.readInt();
		else if (id == PCP_HOST_NUMR)
			hit.numRelays = atom.readInt();
		else if (id == PCP_HOST_UPTIME)
			hit.upTime = atom.readInt();
		else if (id == PCP_HOST_OLDPOS)
			hit.oldestPos = atom.readInt();
		else if (id == PCP_HOST_NEWPOS)
			hit.newestPos = atom.readInt();
		else if (id == PCP_HOST_VERSION)
			hit.version = atom.readInt();
		else if (id == PCP_HOST_FLAGS1)
		{
			int fl1 = atom.readChar();

			hit.recv = (fl1 & PCP_HOST_FLAGS1_RECV) !=0;
			hit.relay = (fl1 & PCP_HOST_FLAGS1_RELAY) !=0;
			hit.direct = (fl1 & PCP_HOST_FLAGS1_DIRECT) !=0;
			hit.cin = (fl1 & PCP_HOST_FLAGS1_CIN) !=0;
			hit.tracker = (fl1 & PCP_HOST_FLAGS1_TRACKER) !=0;
			hit.firewalled = (fl1 & PCP_HOST_FLAGS1_PUSH) !=0;


		}else if (id == PCP_HOST_ID)
			atom.readBytes(hit.sessionID.id,16);
		else if (id == PCP_HOST_CHANID)
			atom.readBytes(chanID.id,16);
		else
		{
			LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
			atom.skip(c,d);
		}
	}


	hit.host = hit.rhost[0];
	hit.chanID = chanID;

	hit.numHops = bcs.numHops;


	if (hit.recv)
		chanMgr->addHit(hit);
	else
		chanMgr->delHit(hit);
}

// ------------------------------------------
void PCPStream::readChanAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
{
	Channel *ch=NULL;
	ChanHitList *chl=NULL;
	ChanInfo newInfo;

	ch = chanMgr->findChannelByID(bcs.chanID);
	chl = chanMgr->findHitListByID(bcs.chanID);

	if (ch)
		newInfo = ch->info;
	else if (chl)
		newInfo = chl->info;


	for(int i=0; i<numc; i++)
	{

		int c,d;
		ID4 id = atom.read(c,d);

		if ((id == PCP_CHAN_PKT) && (ch))
		{
			readPktAtoms(ch,atom,c,bcs);
		}else if (id == PCP_CHAN_INFO)
		{
			newInfo.readInfoAtoms(atom,c);

		}else if (id == PCP_CHAN_TRACK)
		{
			newInfo.readTrackAtoms(atom,c);

		}else if (id == PCP_CHAN_BCID)
		{
			atom.readBytes(newInfo.bcID.id,16);

		}else if (id == PCP_CHAN_KEY)			// depreciated
		{
			atom.readBytes(newInfo.bcID.id,16);
			newInfo.bcID.id[0] = 0;				// clear flags

		}else if (id == PCP_CHAN_ID)
		{
			atom.readBytes(newInfo.id.id,16);

			ch = chanMgr->findChannelByID(newInfo.id);
			chl = chanMgr->findHitListByID(newInfo.id);

		}else
		{
			LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
			atom.skip(c,d);
		}
	}

	if (!chl)
		chl = chanMgr->addHitList(newInfo);

	if (chl)
	{
		chl->info.update(newInfo);
	
		if (!servMgr->chanLog.isEmpty())
		{
			//if (chl->numListeners())
			{
				try
				{
					FileStream file;
					file.openWriteAppend(servMgr->chanLog.cstr());

        				XML::Node *rn = new XML::Node("update time=\"%d\"",sys->getTime());
       					XML::Node *n = chl->info.createChannelXML();
        				n->add(chl->createXML(false));
        				n->add(chl->info.createTrackXML());
					rn->add(n);	
	
					rn->write(file,0);
					delete rn;
					file.close();
				}catch(StreamException &e)
				{
					LOG_ERROR("Unable to update channel log: %s",e.msg);
				}
			}
		}

	}

	if (ch && !ch->isBroadcasting())
		ch->updateInfo(newInfo);


}
// ------------------------------------------
int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
{
	ChanPacket pack;
	int ttl=1;		
	int ver=0;
	GnuID fromID,destID;

	fromID.clear();
	destID.clear();

	bcs.initPacketSettings();

	MemoryStream pmem(pack.data,sizeof(pack.data));
	AtomStream patom(pmem);

	patom.writeParent(PCP_BCST,numc);

	for(int i=0; i<numc; i++)
	{
		int c,d;
		ID4 id = atom.read(c,d);
		
		if (id == PCP_BCST_TTL)
		{
			ttl = atom.readChar()-1;
			patom.writeChar(id,ttl);

		}else if (id == PCP_BCST_HOPS)
		{
			bcs.numHops = atom.readChar()+1;
			patom.writeChar(id,bcs.numHops);

		}else if (id == PCP_BCST_FROM)
		{
			atom.readBytes(fromID.id,16);
			patom.writeBytes(id,fromID.id,16);

			routeList.add(fromID);
		}else if (id == PCP_BCST_GROUP)
		{
			bcs.group = atom.readChar();
			patom.writeChar(id,bcs.group);
		}else if (id == PCP_BCST_DEST)
		{
			atom.readBytes(destID.id,16);
			patom.writeBytes(id,destID.id,16);
			bcs.forMe = destID.isSame(servMgr->sessionID);

			char idstr1[64];
			char idstr2[64];

			destID.toStr(idstr1);
			servMgr->sessionID.toStr(idstr2);

		}else if (id == PCP_BCST_CHANID)
		{
			atom.readBytes(bcs.chanID.id,16);
			patom.writeBytes(id,bcs.chanID.id,16);
		}else if (id == PCP_BCST_VERSION)
		{
			ver = atom.readInt();
			patom.writeInt(id,ver);
		}else
		{
			// copy and process atoms
			int oldPos = pmem.pos;
			patom.writeAtoms(id,atom.io,c,d);
			pmem.pos = oldPos;
			readAtom(patom,bcs);
		}
	}

	char fromStr[64];
	fromStr[0] = 0;
	if (fromID.isSet())
		fromID.toStr(fromStr);
	char destStr[64];
	destStr[0] = 0;
	if (destID.isSet())
		destID.toStr(destStr);


	LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d, from=%s, dest=%s",bcs.group,bcs.numHops,ver,fromStr,destStr);

	if (fromID.isSet())
		if (fromID.isSame(servMgr->sessionID))
		{
			LOG_ERROR("BCST loopback"); 
			return PCP_ERROR_BCST+PCP_ERROR_LOOPBACK;
		}

	// broadcast back out if ttl > 0 
	if ((ttl>0) && (!bcs.forMe))
	{
		pack.len = pmem.pos;
		pack.type = ChanPacket::T_PCP;

		if (bcs.group & (PCP_BCST_GROUP_ROOT|PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
		{
			chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID);
		}

		if (bcs.group & (PCP_BCST_GROUP_ROOT|PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
		{
			servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_COUT);
		}

		if (bcs.group & (PCP_BCST_GROUP_RELAYS|PCP_BCST_GROUP_TRACKERS))
		{
			servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_CIN);
		}

		if (bcs.group & (PCP_BCST_GROUP_RELAYS))
		{
			servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_RELAY);
		}



	}
	return 0;
}


// ------------------------------------------
int PCPStream::procAtom(AtomStream &atom,ID4 id,int numc, int dlen,BroadcastState &bcs)
{
	int r=0;

	if (id == PCP_CHAN)
	{
		readChanAtoms(atom,numc,bcs);
	}else if (id == PCP_ROOT)
	{
		if (servMgr->isRoot)
			throw StreamException("Unauthorized root message");				
		else
			readRootAtoms(atom,numc,bcs);

	}else if (id == PCP_HOST)
	{
		readHostAtoms(atom,numc,bcs);

	}else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG))		// PCP_MESG_ASCII to be depreciated
	{
		String msg;
		atom.readString(msg.data,sizeof(msg.data),dlen);
		LOG_DEBUG("PCP got text: %s",msg.cstr());
	}else if (id == PCP_BCST)
	{
		r = readBroadcastAtoms(atom,numc,bcs);
	}else if (id == PCP_HELO)
	{
		atom.skip(numc,dlen);
		atom.writeParent(PCP_OLEH,1);
			atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
	}else if (id == PCP_PUSH)
	{

		readPushAtoms(atom,numc,bcs);
	}else if (id == PCP_OK)
	{
		atom.readInt();

	}else if (id == PCP_QUIT)
	{
		r = atom.readInt();
		if (!r)
			r = PCP_ERROR_QUIT;

	}else if (id == PCP_ATOM)
	{
		for(int i=0; i<numc; i++)
		{
			int nc,nd;
			ID4 aid = atom.read(nc,nd);
			int ar = procAtom(atom,aid,nc,nd,bcs);
			if (ar)
				r = ar;
		}

	}else
	{
		LOG_CHANNEL("PCP skip: %s",id.getString().str());
		atom.skip(numc,dlen);
	}

	return r;

}

// ------------------------------------------
int PCPStream::readAtom(AtomStream &atom,BroadcastState &bcs)
{
	int numc,dlen;
	ID4 id = atom.read(numc,dlen);

	return	procAtom(atom,id,numc,dlen,bcs);
}


⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -