⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pcp.cpp.svn-base

📁 这是和p2p相关的一份源码
💻 SVN-BASE
字号:
// ------------------------------------------------
// File : pcp.cpp
// Date: 1-mar-2004
// Author: giles
//
// (c) 2002-4 peercast.org
// ------------------------------------------------
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
// ------------------------------------------------

#include "atom.h"
#include "pcp.h"
#include "peercast.h"

// ------------------------------------------
void	PCPStream::process(Stream &in, GnuID &srcID)
{
	BroadcastState bcs;
	bcs.srcID = srcID;
	while (!in.eof() && !peercastInst->isQuitting)
	{
		readPacket(in,bcs);
		sys->sleepIdle();
	}

	flush(in);
}
// ------------------------------------------
void PCPStream::init()
{
	error = 0;

	lastPacketTime = 0;

	inData.init();
	inData.accept = ChanPacket::T_PCP;

	outData.init();
	outData.accept = ChanPacket::T_PCP;
}
// ------------------------------------------
void PCPStream::readVersion(Stream &in)
{
	int len = in.readInt();

	if (len != 4)
		throw StreamException("Invalid PCP");

	int ver = in.readInt();

	LOG_DEBUG("PCP ver: %d",ver);
}
// ------------------------------------------
void PCPStream::readHeader(Stream &in,Channel *)
{
//	AtomStream atom(in);

//	if (in.readInt() != PCP_CONNECT)
//		throw StreamException("Not PCP");

//	readVersion(in);
}
// ------------------------------------------
bool PCPStream::sendPacket(ChanPacket &pack)
{
	return outData.writePacket(pack);
}
// ------------------------------------------
void PCPStream::flush(Stream &in)
{
	ChanPacket pack;
	// send outward packets
	while (outData.numPending())
	{
		outData.readPacket(pack);
		error = PCP_ERROR_WRITE;
		pack.writeRaw(in);
		error = 0;
	}
}
// ------------------------------------------
void PCPStream::readPacket(Stream &in,Channel *ch)
{
	BroadcastState bcs;

	if (ch)
		bcs.srcID = ch->remoteID;
	else
		bcs.srcID.clear();

	readPacket(in,bcs);
}
// ------------------------------------------
void PCPStream::readPacket(Stream &in,BroadcastState &bcs)
{
	AtomStream atom(in);

	ChanPacket pack;
	MemoryStream mem(pack.data,sizeof(pack.data));
	AtomStream patom(mem);


	// send outward packets
	if (outData.numPending())
	{
		outData.readPacket(pack);
		error = PCP_ERROR_WRITE;
		pack.writeRaw(in);
		error = 0;
	}

	if (outData.willSkip())
	{
		error = PCP_ERROR_WRITE+PCP_ERROR_SKIP;
		throw StreamException("Send too slow");
	}


	// poll for new downward packet
	if (in.readReady())
	{
		int numc,numd;
		ID4 id;

		error = PCP_ERROR_READ;
		id = atom.read(numc,numd);

		mem.rewind();
		pack.len = patom.writeAtoms(id, in, numc, numd);
		pack.type = ChanPacket::T_PCP;

		error = 0;

		inData.writePacket(pack);
	}

	// process downward packets
	if (inData.numPending())
	{
		inData.readPacket(pack);

		mem.rewind();

		int numc,numd;
		ID4 id = patom.read(numc,numd);

		error = PCPStream::procAtom(patom,id,numc,numd,bcs);
		
		if (error)
		{
			LOG_ERROR("PCP exception: %d",error);
			throw StreamException("PCP exception");
		}
	}
}

// ------------------------------------------
void PCPStream::readEnd(Stream &,Channel *)
{
}


// ------------------------------------------
void PCPStream::readPushAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
{
	Host host;
	GnuID	chanID;

	chanID.clear();

	for(int i=0; i<numc; i++)
	{
		int c,d;
		ID4 id = atom.read(c,d);

		if (id == PCP_PUSH_IP)
			host.ip = atom.readInt();
		else if (id == PCP_PUSH_PORT)
			host.port = atom.readShort();
		else if (id == PCP_PUSH_CHANID)
			atom.readBytes(chanID.id,16);
		else
		{
			LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
			atom.skip(c,d);
		}
	}


	if (bcs.forMe)
	{
		char ipstr[64];
		host.toStr(ipstr);

		Servent *s = NULL;

		if (chanID.isSet())
		{
			Channel *ch = chanMgr->findChannelByID(chanID);
			if (ch)
				if (ch->isBroadcasting() || !ch->isFull() && !servMgr->streamFull() && ch->info.id.isSame(chanID))
					s = servMgr->allocServent();
		}else{
			s = servMgr->allocServent();
		}

		if (s)
		{
			LOG_DEBUG("GIVing to %s",ipstr);
			s->initGIV(host,chanID);
		}
	}

}
// ------------------------------------------
void PCPStream::readRootAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
{
	String url;

	for(int i=0; i<numc; i++)
	{
		int c,d;
		ID4 id = atom.read(c,d);

		if (id == PCP_ROOT_UPDINT)
		{
			int si = atom.readInt();

			chanMgr->setUpdateInterval(si);
			LOG_DEBUG("PCP got new host update interval: %d",si);
		}else if (id == PCP_ROOT_URL)
		{
			url = "http://www.peercast.org/";
			String loc;
			atom.readString(loc.data,sizeof(loc.data),d);
			url.append(loc);

		}else if (id == PCP_ROOT_CHECKVER)
		{
			unsigned int newVer = atom.readInt();
			if (newVer > PCP_CLIENT_VERSION)
			{
				strcpy(servMgr->downloadURL,url.cstr());
				peercastApp->notifyMessage(ServMgr::NT_UPGRADE,"There is a new version available, please click here to upgrade your client.");
			}
			LOG_DEBUG("PCP got version check: %d / %d",newVer,PCP_CLIENT_VERSION);

		}else if (id == PCP_ROOT_UPDATE)
		{
			atom.skip(c,d);

			chanMgr->broadcastTrackerUpdate(bcs.srcID,true);

		}else if (id == PCP_MESG_ASCII)
		{
			String newMsg;

			atom.readString(newMsg.data,sizeof(newMsg.data),d);
			if (!newMsg.isSame(servMgr->rootMsg.cstr()))
			{
				servMgr->rootMsg = newMsg;
				LOG_DEBUG("PCP got new root mesg: %s",servMgr->rootMsg.cstr());
				peercastApp->notifyMessage(ServMgr::NT_PEERCAST,servMgr->rootMsg.cstr());
			}
		}else
		{
			LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
			atom.skip(c,d);
		}
	}
}

// ------------------------------------------
void PCPStream::readPktAtoms(Channel *ch,AtomStream &atom,int numc,BroadcastState &bcs)
{
	ChanPacket pack;
	ID4 type;


	for(int i=0; i<numc; i++)
	{
		int c,d;
		ID4 id = atom.read(c,d);

		if (id == PCP_CHAN_PKT_TYPE)
		{
			type = atom.readInt();

			if (type == PCP_CHAN_PKT_HEAD)
				pack.type = ChanPacket::T_HEAD;
			else if (type == PCP_CHAN_PKT_DATA)
				pack.type = ChanPacket::T_DATA;
			else
				pack.type = ChanPacket::T_UNKNOWN;

		}else if (id == PCP_CHAN_PKT_POS)
		{
			pack.pos = atom.readInt();

		}else if (id == PCP_CHAN_PKT_KEY)
		{
			// not done yet, but can be used to verify packet data
			atom.readBytes(pack.key.id,16);

		}else if (id == PCP_CHAN_PKT_DATA)
		{
			pack.len = d;
			atom.readBytes(pack.data,pack.len);
		}
		else
		{
			LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
			atom.skip(c,d);
		}
	}

	if (ch)
	{

		int diff = pack.pos - ch->streamPos;
		if (diff)
			LOG_DEBUG("PCP skipping %s%d (%d -> %d)",(diff>0)?"+":"",diff,ch->streamPos,pack.pos);

		if (pack.type == ChanPacket::T_HEAD)
		{
			LOG_DEBUG("New head packet at %d",pack.pos);

			ch->headPack = pack;

			if (pack.pos == 0)
			{
				ch->streamIndex++;
				ch->rawData.init();
			}

			ch->rawData.writePacket(pack,true);
			ch->streamPos = pack.pos+pack.len;

		}else if (pack.type == ChanPacket::T_DATA)
		{
			ch->rawData.writePacket(pack,true);
			ch->streamPos = pack.pos+pack.len;
		}

	}

	// update this parent packet stream position
	if ((pack.pos) && (!bcs.streamPos || (pack.pos < bcs.streamPos)))
		bcs.streamPos = pack.pos;

}
// -----------------------------------
void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs)
{
	ChanHit hit;
	hit.init();
	GnuID chanID = bcs.chanID;	//use default

	unsigned int ipNum=0;

	for(int i=0; i<numc; i++)
	{
		int c,d;
		ID4 id = atom.read(c,d);

		if (id == PCP_HOST_IP)
		{
			unsigned int ip = atom.readInt();
			hit.rhost[ipNum].ip = ip;
		}else if (id == PCP_HOST_PORT)
		{
			int port = atom.readShort();
			hit.rhost[ipNum++].port = port;

			if (ipNum > 1)
				ipNum = 1;
		}
		else if (id == PCP_HOST_BUSY)
			hit.busy = atom.readChar()!=0;
		else if (id == PCP_HOST_PUSH)
			hit.firewalled = atom.readChar()!=0;
		else if (id == PCP_HOST_NUML)
			hit.numListeners = atom.readInt();
		else if (id == PCP_HOST_NUMR)
			hit.numRelays = atom.readInt();
		else if (id == PCP_HOST_AGENT)
			atom.readString(hit.agentStr,sizeof(hit.agentStr)-1,d);
		else if (id == PCP_HOST_SKIP)
			hit.numSkips = atom.readInt();
		else if (id == PCP_HOST_UPTIME)
			hit.upTime = atom.readInt();
		else if (id == PCP_HOST_RECV)
			hit.recv = atom.readChar()!=0;
		else if (id == PCP_HOST_ID)
			atom.readBytes(hit.sessionID.id,16);
		else if (id == PCP_HOST_CHANID)
			atom.readBytes(chanID.id,16);
		else if (id == PCP_HOST_TRACKER)
			hit.tracker = atom.readChar()!=0;
		else
		{
			LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
			atom.skip(c,d);
		}
	}

	hit.host = hit.rhost[0];


	char ip0str[64],ip1str[64];
	hit.rhost[0].toStr(ip0str);
	hit.rhost[1].toStr(ip1str);

	hit.hops = bcs.numHops;

	if (chanID.isSet())
	{
		ChanHitList *chl = chanMgr->findHitListByID(chanID);
		if (chl)
		{
			if (hit.recv)
			{
				chl->addHit(hit);
				LOG_DEBUG("Got hit (added): %s/%s",ip0str,ip1str);
			}else
			{
				chl->deadHit(hit);
				LOG_DEBUG("Got hit (removed): %s/%s",ip0str,ip1str);
			}

		}else
			LOG_DEBUG("Got hit (channel not found): %s/%s",ip0str,ip1str);
	}else
	{
		if (hit.tracker)
		{
			if (hit.recv)
				chanMgr->trackerHitList.addHit(hit);
			else
				chanMgr->trackerHitList.deadHit(hit);
		}

		LOG_DEBUG("Got hit (channel unknown): %s/%s",ip0str,ip1str);
	}

}

// ------------------------------------------
void PCPStream::readChanAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
{
	Channel *ch=NULL;
	ChanHitList *chl=NULL;
	ChanInfo info;

	ch = chanMgr->findChannelByID(bcs.chanID);
	chl = chanMgr->findHitListByID(bcs.chanID);

	bool gotInfo=false;
	bool gotTrack=false;

	for(int i=0; i<numc; i++)
	{

		int c,d;
		ID4 id = atom.read(c,d);

		if ((id == PCP_CHAN_PKT) && (ch))
		{
			readPktAtoms(ch,atom,c,bcs);
		}else if (id == PCP_CHAN_INFO)
		{
			info.readInfoAtoms(atom,c);

			if (ch && !ch->isBroadcasting())
				ch->info.update(info);

			if (chl)
				chl->info.update(info);

			gotInfo = true;

		}else if (id == PCP_CHAN_TRACK)
		{
			info.readTrackAtoms(atom,c);

			if (ch && !ch->isBroadcasting())
				ch->info.track = info.track;

			if (chl)
				chl->info.track = info.track;

			gotTrack = true;

		}else if (id == PCP_CHAN_KEY)
		{
			atom.readBytes(info.bcID.id,16);

		}else if (id == PCP_CHAN_ID)
		{
			atom.readBytes(info.id.id,16);

			ch = chanMgr->findChannelByID(info.id);
			chl = chanMgr->findHitListByID(info.id);

		}else
		{
			LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
			atom.skip(c,d);
		}
	}

	if (!chl)
	{
		chl = chanMgr->addHitList(info);
	}


	if (gotTrack || gotInfo)
	{
		if (ch)
			ch->updateMeta();
	}

}
// ------------------------------------------
int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
{
	ChanPacket pack;
	int ttl=1;		
	GnuID fromID,destID;

	fromID.clear();
	destID.clear();

	bcs.initPacketSettings();

	MemoryStream pmem(pack.data,sizeof(pack.data));
	AtomStream patom(pmem);

	patom.writeParent(PCP_BCST,numc);

	for(int i=0; i<numc; i++)
	{
		int c,d;
		ID4 id = atom.read(c,d);
		
		if (id == PCP_BCST_TTL)
		{
			ttl = atom.readChar()-1;
			patom.writeChar(id,ttl);
		}else if (id == PCP_BCST_HOPS)
		{
			bcs.numHops = atom.readChar()+1;
			patom.writeChar(id,bcs.numHops);

		}else if (id == PCP_BCST_FROM)
		{
			atom.readBytes(fromID.id,16);
			patom.writeBytes(id,fromID.id,16);
		}else if (id == PCP_BCST_GROUP)
		{
			bcs.group = atom.readChar();
			patom.writeChar(id,bcs.group);
		}else if (id == PCP_BCST_DEST)
		{
			atom.readBytes(destID.id,16);
			patom.writeBytes(id,destID.id,16);
			bcs.forMe = destID.isSame(servMgr->sessionID);

			char idstr1[64];
			char idstr2[64];

			destID.toStr(idstr1);
			servMgr->sessionID.toStr(idstr2);

		}else if (id == PCP_BCST_CHANID)
		{
			atom.readBytes(bcs.chanID.id,16);
			patom.writeBytes(id,bcs.chanID.id,16);
		}else
		{
			// copy and process atoms
			int oldPos = pmem.pos;
			patom.writeAtoms(id,atom.io,c,d);
			pmem.pos = oldPos;
			readAtom(patom,bcs);
		}
	}

	if (fromID.isSet())
		if (fromID.isSame(servMgr->sessionID))
		{
			LOG_ERROR("BCST loopback: hops=%d, group=%x",bcs.numHops,bcs.group); 
			return PCP_ERROR_BCST+PCP_ERROR_LOOPBACK;
		}

	// broadcast back out if ttl > 0 
	if ((ttl>0) && (!bcs.forMe))
	{

		pack.len = pmem.pos;
		pack.type = ChanPacket::T_PCP;

		if (bcs.group & (PCP_BCST_GROUP_ROOT|PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
		{
			chanMgr->broadcastPacketUp(pack,bcs.chanID,bcs.srcID,destID);
		}

		if (bcs.group & (PCP_BCST_GROUP_ROOT|PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
		{
			servMgr->broadcastPacket(pack,bcs.chanID,bcs.srcID,destID,Servent::T_COUT);
		}

		if (bcs.group & (PCP_BCST_GROUP_RELAYS|PCP_BCST_GROUP_TRACKERS))
		{
			servMgr->broadcastPacket(pack,bcs.chanID,bcs.srcID,destID,Servent::T_CIN);
		}

		if (bcs.group & (PCP_BCST_GROUP_RELAYS))
		{
			servMgr->broadcastPacket(pack,bcs.chanID,bcs.srcID,destID,Servent::T_STREAM);
		}



	}
	return 0;
}


// ------------------------------------------
int PCPStream::procAtom(AtomStream &atom,ID4 id,int numc, int dlen,BroadcastState &bcs)
{
	int r=0;

	if (id == PCP_CHAN)
	{
		readChanAtoms(atom,numc,bcs);
	}else if (id == PCP_ROOT)
	{
		readRootAtoms(atom,numc,bcs);

	}else if (id == PCP_HOST)
	{
		readHostAtoms(atom,numc,bcs);

	}else if (id == PCP_MESG_ASCII)
	{
		String msg;
		atom.readString(msg.data,sizeof(msg.data),dlen);
		LOG_DEBUG("PCP got text: %s",msg.cstr());
	}else if (id == PCP_BCST)
	{
		r = readBroadcastAtoms(atom,numc,bcs);
	}else if (id == PCP_HELO)
	{
		atom.skip(numc,dlen);
		atom.writeParent(PCP_OLEH,1);
			atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
	}else if (id == PCP_PUSH)
	{

		readPushAtoms(atom,numc,bcs);
	}else if (id == PCP_OK)
	{
		atom.readInt();

	}else if (id == PCP_QUIT)
	{
		r = atom.readInt();
		if (!r)					// for pre v01202 clients
			r = PCP_ERROR_QUIT;		
	}else
	{
		LOG_CHANNEL("PCP skip: %s",id.getString().str());
		atom.skip(numc,dlen);
	}

	return r;

}

// ------------------------------------------
int PCPStream::readAtom(AtomStream &atom,BroadcastState &bcs)
{
	int numc,dlen;
	ID4 id = atom.read(numc,dlen);

	return	procAtom(atom,id,numc,dlen,bcs);
}


⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -