📄 pcp.cpp
字号:
hit.init();
GnuID chanID = bcs.chanID; //use default
bool busy=false;
unsigned int ipNum=0;
for(int i=0; i<numc; i++)
{
int c,d;
ID4 id = atom.read(c,d);
if (id == PCP_HOST_IP)
{
unsigned int ip = atom.readInt();
hit.rhost[ipNum].ip = ip;
}else if (id == PCP_HOST_PORT)
{
int port = atom.readShort();
hit.rhost[ipNum++].port = port;
if (ipNum > 1)
ipNum = 1;
}
else if (id == PCP_HOST_NUML)
hit.numListeners = atom.readInt();
else if (id == PCP_HOST_NUMR)
hit.numRelays = atom.readInt();
else if (id == PCP_HOST_UPTIME)
hit.upTime = atom.readInt();
else if (id == PCP_HOST_OLDPOS)
hit.oldestPos = atom.readInt();
else if (id == PCP_HOST_NEWPOS)
hit.newestPos = atom.readInt();
else if (id == PCP_HOST_VERSION)
hit.version = atom.readInt();
else if (id == PCP_HOST_FLAGS1)
{
int fl1 = atom.readChar();
hit.recv = (fl1 & PCP_HOST_FLAGS1_RECV) !=0;
hit.relay = (fl1 & PCP_HOST_FLAGS1_RELAY) !=0;
hit.direct = (fl1 & PCP_HOST_FLAGS1_DIRECT) !=0;
hit.cin = (fl1 & PCP_HOST_FLAGS1_CIN) !=0;
hit.tracker = (fl1 & PCP_HOST_FLAGS1_TRACKER) !=0;
hit.firewalled = (fl1 & PCP_HOST_FLAGS1_PUSH) !=0;
}else if (id == PCP_HOST_ID)
atom.readBytes(hit.sessionID.id,16);
else if (id == PCP_HOST_CHANID)
atom.readBytes(chanID.id,16);
else
{
LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
atom.skip(c,d);
}
}
hit.host = hit.rhost[0];
hit.chanID = chanID;
hit.numHops = bcs.numHops;
if (hit.recv)
chanMgr->addHit(hit);
else
chanMgr->delHit(hit);
}
// ------------------------------------------
void PCPStream::readChanAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
{
Channel *ch=NULL;
ChanHitList *chl=NULL;
ChanInfo newInfo;
ch = chanMgr->findChannelByID(bcs.chanID);
chl = chanMgr->findHitListByID(bcs.chanID);
if (ch)
newInfo = ch->info;
else if (chl)
newInfo = chl->info;
for(int i=0; i<numc; i++)
{
int c,d;
ID4 id = atom.read(c,d);
if ((id == PCP_CHAN_PKT) && (ch))
{
readPktAtoms(ch,atom,c,bcs);
}else if (id == PCP_CHAN_INFO)
{
newInfo.readInfoAtoms(atom,c);
}else if (id == PCP_CHAN_TRACK)
{
newInfo.readTrackAtoms(atom,c);
}else if (id == PCP_CHAN_BCID)
{
atom.readBytes(newInfo.bcID.id,16);
}else if (id == PCP_CHAN_KEY) // depreciated
{
atom.readBytes(newInfo.bcID.id,16);
newInfo.bcID.id[0] = 0; // clear flags
}else if (id == PCP_CHAN_ID)
{
atom.readBytes(newInfo.id.id,16);
ch = chanMgr->findChannelByID(newInfo.id);
chl = chanMgr->findHitListByID(newInfo.id);
}else
{
LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
atom.skip(c,d);
}
}
if (!chl)
chl = chanMgr->addHitList(newInfo);
if (chl)
{
chl->info.update(newInfo);
if (!servMgr->chanLog.isEmpty())
{
//if (chl->numListeners())
{
try
{
FileStream file;
file.openWriteAppend(servMgr->chanLog.cstr());
XML::Node *rn = new XML::Node("update time=\"%d\"",sys->getTime());
XML::Node *n = chl->info.createChannelXML();
n->add(chl->createXML(false));
n->add(chl->info.createTrackXML());
rn->add(n);
rn->write(file,0);
delete rn;
file.close();
}catch(StreamException &e)
{
LOG_ERROR("Unable to update channel log: %s",e.msg);
}
}
}
}
if (ch && !ch->isBroadcasting())
ch->updateInfo(newInfo);
}
// ------------------------------------------
int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
{
ChanPacket pack;
int ttl=1;
int ver=0;
GnuID fromID,destID;
fromID.clear();
destID.clear();
bcs.initPacketSettings();
MemoryStream pmem(pack.data,sizeof(pack.data));
AtomStream patom(pmem);
patom.writeParent(PCP_BCST,numc);
for(int i=0; i<numc; i++)
{
int c,d;
ID4 id = atom.read(c,d);
if (id == PCP_BCST_TTL)
{
ttl = atom.readChar()-1;
patom.writeChar(id,ttl);
}else if (id == PCP_BCST_HOPS)
{
bcs.numHops = atom.readChar()+1;
patom.writeChar(id,bcs.numHops);
}else if (id == PCP_BCST_FROM)
{
atom.readBytes(fromID.id,16);
patom.writeBytes(id,fromID.id,16);
routeList.add(fromID);
}else if (id == PCP_BCST_GROUP)
{
bcs.group = atom.readChar();
patom.writeChar(id,bcs.group);
}else if (id == PCP_BCST_DEST)
{
atom.readBytes(destID.id,16);
patom.writeBytes(id,destID.id,16);
bcs.forMe = destID.isSame(servMgr->sessionID);
char idstr1[64];
char idstr2[64];
destID.toStr(idstr1);
servMgr->sessionID.toStr(idstr2);
}else if (id == PCP_BCST_CHANID)
{
atom.readBytes(bcs.chanID.id,16);
patom.writeBytes(id,bcs.chanID.id,16);
}else if (id == PCP_BCST_VERSION)
{
ver = atom.readInt();
patom.writeInt(id,ver);
}else
{
// copy and process atoms
int oldPos = pmem.pos;
patom.writeAtoms(id,atom.io,c,d);
pmem.pos = oldPos;
readAtom(patom,bcs);
}
}
char fromStr[64];
fromStr[0] = 0;
if (fromID.isSet())
fromID.toStr(fromStr);
char destStr[64];
destStr[0] = 0;
if (destID.isSet())
destID.toStr(destStr);
LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d, from=%s, dest=%s",bcs.group,bcs.numHops,ver,fromStr,destStr);
if (fromID.isSet())
if (fromID.isSame(servMgr->sessionID))
{
LOG_ERROR("BCST loopback");
return PCP_ERROR_BCST+PCP_ERROR_LOOPBACK;
}
// broadcast back out if ttl > 0
if ((ttl>0) && (!bcs.forMe))
{
pack.len = pmem.pos;
pack.type = ChanPacket::T_PCP;
if (bcs.group & (PCP_BCST_GROUP_ROOT|PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
{
chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID);
}
if (bcs.group & (PCP_BCST_GROUP_ROOT|PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
{
servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_COUT);
}
if (bcs.group & (PCP_BCST_GROUP_RELAYS|PCP_BCST_GROUP_TRACKERS))
{
servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_CIN);
}
if (bcs.group & (PCP_BCST_GROUP_RELAYS))
{
servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_RELAY);
}
}
return 0;
}
// ------------------------------------------
int PCPStream::procAtom(AtomStream &atom,ID4 id,int numc, int dlen,BroadcastState &bcs)
{
int r=0;
if (id == PCP_CHAN)
{
readChanAtoms(atom,numc,bcs);
}else if (id == PCP_ROOT)
{
if (servMgr->isRoot)
throw StreamException("Unauthorized root message");
else
readRootAtoms(atom,numc,bcs);
}else if (id == PCP_HOST)
{
readHostAtoms(atom,numc,bcs);
}else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG)) // PCP_MESG_ASCII to be depreciated
{
String msg;
atom.readString(msg.data,sizeof(msg.data),dlen);
LOG_DEBUG("PCP got text: %s",msg.cstr());
}else if (id == PCP_BCST)
{
r = readBroadcastAtoms(atom,numc,bcs);
}else if (id == PCP_HELO)
{
atom.skip(numc,dlen);
atom.writeParent(PCP_OLEH,1);
atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
}else if (id == PCP_PUSH)
{
readPushAtoms(atom,numc,bcs);
}else if (id == PCP_OK)
{
atom.readInt();
}else if (id == PCP_QUIT)
{
r = atom.readInt();
if (!r)
r = PCP_ERROR_QUIT;
}else if (id == PCP_ATOM)
{
for(int i=0; i<numc; i++)
{
int nc,nd;
ID4 aid = atom.read(nc,nd);
int ar = procAtom(atom,aid,nc,nd,bcs);
if (ar)
r = ar;
}
}else
{
LOG_CHANNEL("PCP skip: %s",id.getString().str());
atom.skip(numc,dlen);
}
return r;
}
// ------------------------------------------
int PCPStream::readAtom(AtomStream &atom,BroadcastState &bcs)
{
int numc,dlen;
ID4 id = atom.read(numc,dlen);
return procAtom(atom,id,numc,dlen,bcs);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -