📄 pcp.cpp.svn-base
字号:
// ------------------------------------------------
// File : pcp.cpp
// Date: 1-mar-2004
// Author: giles
//
// (c) 2002-4 peercast.org
// ------------------------------------------------
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// ------------------------------------------------
#include "atom.h"
#include "pcp.h"
#include "peercast.h"
// ------------------------------------------
void PCPStream::process(Stream &in, GnuID &srcID)
{
BroadcastState bcs;
bcs.srcID = srcID;
while (!in.eof() && !peercastInst->isQuitting)
{
readPacket(in,bcs);
sys->sleepIdle();
}
flush(in);
}
// ------------------------------------------
void PCPStream::init()
{
error = 0;
lastPacketTime = 0;
inData.init();
inData.accept = ChanPacket::T_PCP;
outData.init();
outData.accept = ChanPacket::T_PCP;
}
// ------------------------------------------
void PCPStream::readVersion(Stream &in)
{
int len = in.readInt();
if (len != 4)
throw StreamException("Invalid PCP");
int ver = in.readInt();
LOG_DEBUG("PCP ver: %d",ver);
}
// ------------------------------------------
void PCPStream::readHeader(Stream &in,Channel *)
{
// AtomStream atom(in);
// if (in.readInt() != PCP_CONNECT)
// throw StreamException("Not PCP");
// readVersion(in);
}
// ------------------------------------------
bool PCPStream::sendPacket(ChanPacket &pack)
{
return outData.writePacket(pack);
}
// ------------------------------------------
void PCPStream::flush(Stream &in)
{
ChanPacket pack;
// send outward packets
while (outData.numPending())
{
outData.readPacket(pack);
error = PCP_ERROR_WRITE;
pack.writeRaw(in);
error = 0;
}
}
// ------------------------------------------
void PCPStream::readPacket(Stream &in,Channel *ch)
{
BroadcastState bcs;
if (ch)
bcs.srcID = ch->remoteID;
else
bcs.srcID.clear();
readPacket(in,bcs);
}
// ------------------------------------------
void PCPStream::readPacket(Stream &in,BroadcastState &bcs)
{
AtomStream atom(in);
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack.data));
AtomStream patom(mem);
// send outward packets
if (outData.numPending())
{
outData.readPacket(pack);
error = PCP_ERROR_WRITE;
pack.writeRaw(in);
error = 0;
}
if (outData.willSkip())
{
error = PCP_ERROR_WRITE+PCP_ERROR_SKIP;
throw StreamException("Send too slow");
}
// poll for new downward packet
if (in.readReady())
{
int numc,numd;
ID4 id;
error = PCP_ERROR_READ;
id = atom.read(numc,numd);
mem.rewind();
pack.len = patom.writeAtoms(id, in, numc, numd);
pack.type = ChanPacket::T_PCP;
error = 0;
inData.writePacket(pack);
}
// process downward packets
if (inData.numPending())
{
inData.readPacket(pack);
mem.rewind();
int numc,numd;
ID4 id = patom.read(numc,numd);
error = PCPStream::procAtom(patom,id,numc,numd,bcs);
if (error)
{
LOG_ERROR("PCP exception: %d",error);
throw StreamException("PCP exception");
}
}
}
// ------------------------------------------
void PCPStream::readEnd(Stream &,Channel *)
{
}
// ------------------------------------------
void PCPStream::readPushAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
{
Host host;
GnuID chanID;
chanID.clear();
for(int i=0; i<numc; i++)
{
int c,d;
ID4 id = atom.read(c,d);
if (id == PCP_PUSH_IP)
host.ip = atom.readInt();
else if (id == PCP_PUSH_PORT)
host.port = atom.readShort();
else if (id == PCP_PUSH_CHANID)
atom.readBytes(chanID.id,16);
else
{
LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
atom.skip(c,d);
}
}
if (bcs.forMe)
{
char ipstr[64];
host.toStr(ipstr);
Servent *s = NULL;
if (chanID.isSet())
{
Channel *ch = chanMgr->findChannelByID(chanID);
if (ch)
if (ch->isBroadcasting() || !ch->isFull() && !servMgr->streamFull() && ch->info.id.isSame(chanID))
s = servMgr->allocServent();
}else{
s = servMgr->allocServent();
}
if (s)
{
LOG_DEBUG("GIVing to %s",ipstr);
s->initGIV(host,chanID);
}
}
}
// ------------------------------------------
void PCPStream::readRootAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
{
String url;
for(int i=0; i<numc; i++)
{
int c,d;
ID4 id = atom.read(c,d);
if (id == PCP_ROOT_UPDINT)
{
int si = atom.readInt();
chanMgr->setUpdateInterval(si);
LOG_DEBUG("PCP got new host update interval: %d",si);
}else if (id == PCP_ROOT_URL)
{
url = "http://www.peercast.org/";
String loc;
atom.readString(loc.data,sizeof(loc.data),d);
url.append(loc);
}else if (id == PCP_ROOT_CHECKVER)
{
unsigned int newVer = atom.readInt();
if (newVer > PCP_CLIENT_VERSION)
{
strcpy(servMgr->downloadURL,url.cstr());
peercastApp->notifyMessage(ServMgr::NT_UPGRADE,"There is a new version available, please click here to upgrade your client.");
}
LOG_DEBUG("PCP got version check: %d / %d",newVer,PCP_CLIENT_VERSION);
}else if (id == PCP_ROOT_UPDATE)
{
atom.skip(c,d);
chanMgr->broadcastTrackerUpdate(bcs.srcID,true);
}else if (id == PCP_MESG_ASCII)
{
String newMsg;
atom.readString(newMsg.data,sizeof(newMsg.data),d);
if (!newMsg.isSame(servMgr->rootMsg.cstr()))
{
servMgr->rootMsg = newMsg;
LOG_DEBUG("PCP got new root mesg: %s",servMgr->rootMsg.cstr());
peercastApp->notifyMessage(ServMgr::NT_PEERCAST,servMgr->rootMsg.cstr());
}
}else
{
LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
atom.skip(c,d);
}
}
}
// ------------------------------------------
void PCPStream::readPktAtoms(Channel *ch,AtomStream &atom,int numc,BroadcastState &bcs)
{
ChanPacket pack;
ID4 type;
for(int i=0; i<numc; i++)
{
int c,d;
ID4 id = atom.read(c,d);
if (id == PCP_CHAN_PKT_TYPE)
{
type = atom.readInt();
if (type == PCP_CHAN_PKT_HEAD)
pack.type = ChanPacket::T_HEAD;
else if (type == PCP_CHAN_PKT_DATA)
pack.type = ChanPacket::T_DATA;
else
pack.type = ChanPacket::T_UNKNOWN;
}else if (id == PCP_CHAN_PKT_POS)
{
pack.pos = atom.readInt();
}else if (id == PCP_CHAN_PKT_KEY)
{
// not done yet, but can be used to verify packet data
atom.readBytes(pack.key.id,16);
}else if (id == PCP_CHAN_PKT_DATA)
{
pack.len = d;
atom.readBytes(pack.data,pack.len);
}
else
{
LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
atom.skip(c,d);
}
}
if (ch)
{
int diff = pack.pos - ch->streamPos;
if (diff)
LOG_DEBUG("PCP skipping %s%d (%d -> %d)",(diff>0)?"+":"",diff,ch->streamPos,pack.pos);
if (pack.type == ChanPacket::T_HEAD)
{
LOG_DEBUG("New head packet at %d",pack.pos);
ch->headPack = pack;
if (pack.pos == 0)
{
ch->streamIndex++;
ch->rawData.init();
}
ch->rawData.writePacket(pack,true);
ch->streamPos = pack.pos+pack.len;
}else if (pack.type == ChanPacket::T_DATA)
{
ch->rawData.writePacket(pack,true);
ch->streamPos = pack.pos+pack.len;
}
}
// update this parent packet stream position
if ((pack.pos) && (!bcs.streamPos || (pack.pos < bcs.streamPos)))
bcs.streamPos = pack.pos;
}
// -----------------------------------
void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs)
{
ChanHit hit;
hit.init();
GnuID chanID = bcs.chanID; //use default
unsigned int ipNum=0;
for(int i=0; i<numc; i++)
{
int c,d;
ID4 id = atom.read(c,d);
if (id == PCP_HOST_IP)
{
unsigned int ip = atom.readInt();
hit.rhost[ipNum].ip = ip;
}else if (id == PCP_HOST_PORT)
{
int port = atom.readShort();
hit.rhost[ipNum++].port = port;
if (ipNum > 1)
ipNum = 1;
}
else if (id == PCP_HOST_BUSY)
hit.busy = atom.readChar()!=0;
else if (id == PCP_HOST_PUSH)
hit.firewalled = atom.readChar()!=0;
else if (id == PCP_HOST_NUML)
hit.numListeners = atom.readInt();
else if (id == PCP_HOST_NUMR)
hit.numRelays = atom.readInt();
else if (id == PCP_HOST_AGENT)
atom.readString(hit.agentStr,sizeof(hit.agentStr)-1,d);
else if (id == PCP_HOST_SKIP)
hit.numSkips = atom.readInt();
else if (id == PCP_HOST_UPTIME)
hit.upTime = atom.readInt();
else if (id == PCP_HOST_RECV)
hit.recv = atom.readChar()!=0;
else if (id == PCP_HOST_ID)
atom.readBytes(hit.sessionID.id,16);
else if (id == PCP_HOST_CHANID)
atom.readBytes(chanID.id,16);
else if (id == PCP_HOST_TRACKER)
hit.tracker = atom.readChar()!=0;
else
{
LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
atom.skip(c,d);
}
}
hit.host = hit.rhost[0];
char ip0str[64],ip1str[64];
hit.rhost[0].toStr(ip0str);
hit.rhost[1].toStr(ip1str);
hit.hops = bcs.numHops;
if (chanID.isSet())
{
ChanHitList *chl = chanMgr->findHitListByID(chanID);
if (chl)
{
if (hit.recv)
{
chl->addHit(hit);
LOG_DEBUG("Got hit (added): %s/%s",ip0str,ip1str);
}else
{
chl->deadHit(hit);
LOG_DEBUG("Got hit (removed): %s/%s",ip0str,ip1str);
}
}else
LOG_DEBUG("Got hit (channel not found): %s/%s",ip0str,ip1str);
}else
{
if (hit.tracker)
{
if (hit.recv)
chanMgr->trackerHitList.addHit(hit);
else
chanMgr->trackerHitList.deadHit(hit);
}
LOG_DEBUG("Got hit (channel unknown): %s/%s",ip0str,ip1str);
}
}
// ------------------------------------------
void PCPStream::readChanAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
{
Channel *ch=NULL;
ChanHitList *chl=NULL;
ChanInfo info;
ch = chanMgr->findChannelByID(bcs.chanID);
chl = chanMgr->findHitListByID(bcs.chanID);
bool gotInfo=false;
bool gotTrack=false;
for(int i=0; i<numc; i++)
{
int c,d;
ID4 id = atom.read(c,d);
if ((id == PCP_CHAN_PKT) && (ch))
{
readPktAtoms(ch,atom,c,bcs);
}else if (id == PCP_CHAN_INFO)
{
info.readInfoAtoms(atom,c);
if (ch && !ch->isBroadcasting())
ch->info.update(info);
if (chl)
chl->info.update(info);
gotInfo = true;
}else if (id == PCP_CHAN_TRACK)
{
info.readTrackAtoms(atom,c);
if (ch && !ch->isBroadcasting())
ch->info.track = info.track;
if (chl)
chl->info.track = info.track;
gotTrack = true;
}else if (id == PCP_CHAN_KEY)
{
atom.readBytes(info.bcID.id,16);
}else if (id == PCP_CHAN_ID)
{
atom.readBytes(info.id.id,16);
ch = chanMgr->findChannelByID(info.id);
chl = chanMgr->findHitListByID(info.id);
}else
{
LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
atom.skip(c,d);
}
}
if (!chl)
{
chl = chanMgr->addHitList(info);
}
if (gotTrack || gotInfo)
{
if (ch)
ch->updateMeta();
}
}
// ------------------------------------------
int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
{
ChanPacket pack;
int ttl=1;
GnuID fromID,destID;
fromID.clear();
destID.clear();
bcs.initPacketSettings();
MemoryStream pmem(pack.data,sizeof(pack.data));
AtomStream patom(pmem);
patom.writeParent(PCP_BCST,numc);
for(int i=0; i<numc; i++)
{
int c,d;
ID4 id = atom.read(c,d);
if (id == PCP_BCST_TTL)
{
ttl = atom.readChar()-1;
patom.writeChar(id,ttl);
}else if (id == PCP_BCST_HOPS)
{
bcs.numHops = atom.readChar()+1;
patom.writeChar(id,bcs.numHops);
}else if (id == PCP_BCST_FROM)
{
atom.readBytes(fromID.id,16);
patom.writeBytes(id,fromID.id,16);
}else if (id == PCP_BCST_GROUP)
{
bcs.group = atom.readChar();
patom.writeChar(id,bcs.group);
}else if (id == PCP_BCST_DEST)
{
atom.readBytes(destID.id,16);
patom.writeBytes(id,destID.id,16);
bcs.forMe = destID.isSame(servMgr->sessionID);
char idstr1[64];
char idstr2[64];
destID.toStr(idstr1);
servMgr->sessionID.toStr(idstr2);
}else if (id == PCP_BCST_CHANID)
{
atom.readBytes(bcs.chanID.id,16);
patom.writeBytes(id,bcs.chanID.id,16);
}else
{
// copy and process atoms
int oldPos = pmem.pos;
patom.writeAtoms(id,atom.io,c,d);
pmem.pos = oldPos;
readAtom(patom,bcs);
}
}
if (fromID.isSet())
if (fromID.isSame(servMgr->sessionID))
{
LOG_ERROR("BCST loopback: hops=%d, group=%x",bcs.numHops,bcs.group);
return PCP_ERROR_BCST+PCP_ERROR_LOOPBACK;
}
// broadcast back out if ttl > 0
if ((ttl>0) && (!bcs.forMe))
{
pack.len = pmem.pos;
pack.type = ChanPacket::T_PCP;
if (bcs.group & (PCP_BCST_GROUP_ROOT|PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
{
chanMgr->broadcastPacketUp(pack,bcs.chanID,bcs.srcID,destID);
}
if (bcs.group & (PCP_BCST_GROUP_ROOT|PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
{
servMgr->broadcastPacket(pack,bcs.chanID,bcs.srcID,destID,Servent::T_COUT);
}
if (bcs.group & (PCP_BCST_GROUP_RELAYS|PCP_BCST_GROUP_TRACKERS))
{
servMgr->broadcastPacket(pack,bcs.chanID,bcs.srcID,destID,Servent::T_CIN);
}
if (bcs.group & (PCP_BCST_GROUP_RELAYS))
{
servMgr->broadcastPacket(pack,bcs.chanID,bcs.srcID,destID,Servent::T_STREAM);
}
}
return 0;
}
// ------------------------------------------
int PCPStream::procAtom(AtomStream &atom,ID4 id,int numc, int dlen,BroadcastState &bcs)
{
int r=0;
if (id == PCP_CHAN)
{
readChanAtoms(atom,numc,bcs);
}else if (id == PCP_ROOT)
{
readRootAtoms(atom,numc,bcs);
}else if (id == PCP_HOST)
{
readHostAtoms(atom,numc,bcs);
}else if (id == PCP_MESG_ASCII)
{
String msg;
atom.readString(msg.data,sizeof(msg.data),dlen);
LOG_DEBUG("PCP got text: %s",msg.cstr());
}else if (id == PCP_BCST)
{
r = readBroadcastAtoms(atom,numc,bcs);
}else if (id == PCP_HELO)
{
atom.skip(numc,dlen);
atom.writeParent(PCP_OLEH,1);
atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
}else if (id == PCP_PUSH)
{
readPushAtoms(atom,numc,bcs);
}else if (id == PCP_OK)
{
atom.readInt();
}else if (id == PCP_QUIT)
{
r = atom.readInt();
if (!r) // for pre v01202 clients
r = PCP_ERROR_QUIT;
}else
{
LOG_CHANNEL("PCP skip: %s",id.getString().str());
atom.skip(numc,dlen);
}
return r;
}
// ------------------------------------------
int PCPStream::readAtom(AtomStream &atom,BroadcastState &bcs)
{
int numc,dlen;
ID4 id = atom.read(numc,dlen);
return procAtom(atom,id,numc,dlen,bcs);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -