📄 servent.cpp
字号:
return 0;}// -----------------------------------void Servent::processServent(){ setStatus(S_HANDSHAKE); handshakeIn(); if (!sock) throw StreamException("Servent has no socket");
processGnutella();}// -----------------------------------void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo){
if (!doneHandshake)
{
setStatus(S_HANDSHAKE);
if (!handshakeStream(chanInfo))
return;
}
if (chanInfo.id.isSet())
{
chanID = chanInfo.id;
LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));
if (!waitForChannelHeader(chanInfo))
throw StreamException("Channel not ready");
servMgr->totalStreams++; Host host = sock->host; host.port = 0; // force to 0 so we ignore the incoming port
Channel *ch = chanMgr->findChannelByID(chanID);
if (!ch)
throw StreamException("Channel not found");
if (outputProtocol == ChanInfo::SP_HTTP) { if ((addMetadata) && (chanMgr->icyMetaInterval)) sendRawMetaChannel(chanMgr->icyMetaInterval); else
sendRawChannel(true,true);
}else if (outputProtocol == ChanInfo::SP_MMS)
{
if (nsSwitchNum)
{
sendRawChannel(true,true);
}else
{
sendRawChannel(true,false);
}
}else if (outputProtocol == ChanInfo::SP_PCP) { sendPCPChannel();
} else if (outputProtocol == ChanInfo::SP_PEERCAST)
{
sendPeercastChannel();
}
}
setStatus(S_CLOSING);}// -----------------------------------------#if 0// debug FileStream file; file.openReadOnly("c://test.mp3"); LOG_DEBUG("raw file read"); char buf[4000]; int cnt=0; while (!file.eof()) { LOG_DEBUG("send %d",cnt++); file.read(buf,sizeof(buf)); sock->write(buf,sizeof(buf)); } file.close(); LOG_DEBUG("raw file sent"); return;// debug#endif// -----------------------------------bool Servent::waitForChannelHeader(ChanInfo &info){ for(int i=0; i<30*10; i++) {
Channel *ch = chanMgr->findChannelByID(info.id);
if (!ch)
return false;
if (ch->isPlaying() && (ch->rawData.writePos>0)) return true; if (!thread.active || !sock->active()) break; sys->sleep(100); } return false;}// -----------------------------------void Servent::sendRawChannel(bool sendHead, bool sendData){
try {
sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
Channel *ch = chanMgr->findChannelByID(chanID);
if (!ch)
throw StreamException("Channel not found");
setStatus(S_CONNECTED); LOG_DEBUG("Starting Raw stream of %s at %d",ch->info.name.cstr(),streamPos);
if (sendHead)
{ ch->headPack.writeRaw(*sock);
streamPos = ch->headPack.pos + ch->headPack.len;
LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
}
if (sendData)
{
unsigned int streamIndex = ch->streamIndex; unsigned int connectTime = sys->getTime();
unsigned int lastWriteTime = connectTime;
while ((thread.active) && sock->active()) {
ch = chanMgr->findChannelByID(chanID);
if (ch)
{
if (streamIndex != ch->streamIndex)
{
streamIndex = ch->streamIndex;
streamPos = ch->headPack.pos;
LOG_DEBUG("sendRaw got new stream index");
}
ChanPacket rawPack;
if (ch->rawData.findPacket(streamPos,rawPack))
{
if (syncPos != rawPack.sync)
LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
syncPos = rawPack.sync+1;
if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
{
rawPack.writeRaw(*sock);
lastWriteTime = sys->getTime();
}
if (rawPack.pos < streamPos)
LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
streamPos = rawPack.pos+rawPack.len;
}
}
if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
throw TimeoutException();
sys->sleepIdle(); }
} }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); }}
#if 0
// -----------------------------------
void Servent::sendRawMultiChannel(bool sendHead, bool sendData)
{
try
{
unsigned int chanStreamIndex[ChanMgr::MAX_CHANNELS];
unsigned int chanStreamPos[ChanMgr::MAX_CHANNELS];
GnuID chanIDs[ChanMgr::MAX_CHANNELS];
int numChanIDs=0;
for(int i=0; i<ChanMgr::MAX_CHANNELS; i++)
{
Channel *ch = &chanMgr->channels[i];
if (ch->isPlaying())
chanIDs[numChanIDs++]=ch->info.id;
}
setStatus(S_CONNECTED);
if (sendHead)
{
for(int i=0; i<numChanIDs; i++)
{
Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
if (ch)
{
LOG_DEBUG("Starting RawMulti stream: %s",ch->info.name.cstr());
ch->headPack.writeRaw(*sock);
chanStreamPos[i] = ch->headPack.pos + ch->headPack.len;
chanStreamIndex[i] = ch->streamIndex;
LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
}
}
}
if (sendData)
{
unsigned int connectTime=sys->getTime();
while ((thread.active) && sock->active())
{
for(int i=1; i<numChanIDs; i++)
{
Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
if (ch)
{
if (chanStreamIndex[i] != ch->streamIndex)
{
chanStreamIndex[i] = ch->streamIndex;
chanStreamPos[i] = ch->headPack.pos;
LOG_DEBUG("sendRawMulti got new stream index for chan %d",i);
}
ChanPacket rawPack;
if (ch->rawData.findPacket(chanStreamPos[i],rawPack))
{
if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
rawPack.writeRaw(*sock);
if (rawPack.pos < chanStreamPos[i])
LOG_DEBUG("raw: skip back %d",rawPack.pos-chanStreamPos[i]);
chanStreamPos[i] = rawPack.pos+rawPack.len;
//LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
}
}
break;
}
sys->sleepIdle();
}
}
}catch(StreamException &e)
{
LOG_ERROR("Stream channel: %s",e.msg);
}
}
#endif
// -----------------------------------void Servent::sendRawMetaChannel(int interval){
try {
Channel *ch = chanMgr->findChannelByID(chanID);
if (!ch)
throw StreamException("Channel not found");
sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
setStatus(S_CONNECTED); LOG_DEBUG("Starting Raw Meta stream of %s (metaint: %d) at %d",ch->info.name.cstr(),interval,streamPos); String lastTitle,lastURL; int lastMsgTime=sys->getTime(); bool showMsg=true; char buf[16384]; int bufPos=0; if ((interval > sizeof(buf)) || (interval < 1)) throw StreamException("Bad ICY Meta Interval value");
unsigned int connectTime = sys->getTime(); unsigned int lastWriteTime = connectTime;
streamPos = 0; // raw meta channel has no header (its MP3)
while ((thread.active) && sock->active()) {
ch = chanMgr->findChannelByID(chanID);
if (ch)
{
ChanPacket rawPack;
if (ch->rawData.findPacket(streamPos,rawPack))
{
if (syncPos != rawPack.sync)
LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
syncPos = rawPack.sync+1;
MemoryStream mem(rawPack.data,rawPack.len);
if (rawPack.type == ChanPacket::T_DATA) {
int len = rawPack.len; char *p = rawPack.data; while (len) { int rl = len; if ((bufPos+rl) > interval) rl = interval-bufPos; memcpy(&buf[bufPos],p,rl); bufPos+=rl; p+=rl; len-=rl; if (bufPos >= interval) { bufPos = 0; sock->write(buf,interval); lastWriteTime = sys->getTime();
if (chanMgr->broadcastMsgInterval) if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval) { showMsg ^= true; lastMsgTime = sys->getTime(); } String *metaTitle = &ch->info.track.title; if (!ch->info.comment.isEmpty() && (showMsg)) metaTitle = &ch->info.comment; if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL)) { char tmp[1024]; String title,url; title = *metaTitle; url = ch->info.url; title.convertTo(String::T_META); url.convertTo(String::T_META); sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr()); int len = ((strlen(tmp) + 15+1) / 16); sock->writeChar(len); sock->write(tmp,len*16); lastTitle = *metaTitle; lastURL = ch->info.url; LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr()); }else { sock->writeChar(0); } }
} }
streamPos = rawPack.pos + rawPack.len; }
}
if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
throw TimeoutException();
sys->sleepIdle();
} }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); }}// -----------------------------------void Servent::sendPeercastChannel(){
try { setStatus(S_CONNECTED);
Channel *ch = chanMgr->findChannelByID(chanID);
if (!ch)
throw StreamException("Channel not found");
LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr()); sock->writeTag("PCST"); ChanPacket pack;
ch->headPack.writePeercast(*sock);
pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->streamPos); pack.writePeercast(*sock); streamPos = 0;
unsigned int syncPos=0; while ((thread.active) && sock->active()) { ch = chanMgr->findChannelByID(chanID);
if (ch)
{
ChanPacket rawPack;
if (ch->rawData.findPacket(streamPos,rawPack))
{
if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
{
sock->writeTag("SYNC");
sock->writeShort(4);
sock->writeShort(0);
sock->write(&syncPos,4);
syncPos++;
rawPack.writePeercast(*sock);
}
streamPos = rawPack.pos + rawPack.len;
}
}
sys->sleepIdle(); } }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); }}
// -----------------------------------
void Servent::sendPCPChannel()
{
Channel *ch = chanMgr->findChannelByID(chanID);
if (!ch)
throw StreamException("Channel not found");
AtomStream atom(*sock);
pcpStream = new PCPStream(remoteID);
int error=0;
try
{
LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);
setStatus(S_CONNECTED);
atom.writeParent(PCP_CHAN,3 + ((sendHeader)?1:0));
atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
ch->info.writeInfoAtoms(atom);
ch->info.writeTrackAtoms(atom);
if (sendHeader)
{
atom.writeParent(PCP_CHAN_PKT,3);
atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
streamPos = ch->headPack.pos+ch->headPack.len;
LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
}
unsigned int streamIndex = ch->streamIndex;
while (thread.active)
{
Channel *ch = chanMgr->findChannelByID(chanID);
if (ch)
{
if (streamIndex != ch->streamIndex)
{
streamIndex = ch->streamIndex;
streamPos = ch->headPack.pos;
LOG_DEBUG("sendPCPStream got new stream index");
}
ChanPacket rawPack;
if (ch->rawData.findPacket(streamPos,rawPack))
{
if (rawPack.type == ChanPacket::T_HEAD)
{
atom.writeParent(PCP_CHAN,2);
atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
atom.writeParent(PCP_CHAN_PKT,3);
atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
atom.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
atom.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
}else if (rawPack.type == ChanPacket::T_DATA)
{
atom.writeParent(PCP_CHAN,2);
atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
atom.writeParent(PCP_CHAN_PKT,3);
atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
atom.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
atom.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
}
if (rawPack.pos < streamPos)
LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);
//LOG_DEBUG("Sending %d-%d (%d,%d,%d)",rawPack.pos,rawPack.pos+rawPack.len,ch->streamPos,ch->rawData.getLatestPos(),ch->rawData.getOldestPos());
streamPos = rawPack.pos+rawPack.len;
}
}
BroadcastState bcs;
error = pcpStream->readPacket(*sock,bcs);
if (error)
throw StreamException("PCP exception");
sys->sleepIdle();
}
LOG_DEBUG("PCP channel stream closed normally.");
}catch(StreamException &e)
{
LOG_ERROR("Stream channel: %s",e.msg);
}
try
{
atom.writeInt(PCP_QUIT,error);
}catch(StreamException &) {}
}
// -----------------------------------int Servent::serverProc(ThreadInfo *thread){// thread->lock();
Servent *sv = (Servent*)thread->data; try { if (!sv->sock) throw StreamException("Server has no socket"); sv->setStatus(S_LISTENING); char servIP[64]; sv->sock->host.toStr(servIP); if (servMgr->isRoot) LOG_DEBUG("Root Server started: %s",servIP); else LOG_DEBUG("Server started: %s",servIP); while ((thread->active) && (sv->sock->active())) {
if (servMgr->numActiveOnPort(sv->sock->host.port) < servMgr->maxServIn)
{ ClientSocket *cs = sv->sock->accept(); if (cs) {
LOG_DEBUG("accepted incoming"); Servent *ns = servMgr->allocServent(); if (ns) {
servMgr->lastIncoming = sys->getTime();
ns->servPort = sv->sock->host.port; ns->networkID = servMgr->networkID; ns->initIncoming(cs,sv->allow); }else LOG_ERROR("Out of servents"); }
}
sys->sleep(100);
} }catch(StreamException &e) { LOG_ERROR("Server Error: %s:%d",e.msg,e.err); } LOG_DEBUG("Server stopped");
sv->kill();
sys->endThread(thread); return 0;} // -----------------------------------
bool Servent::writeVariable(Stream &s, const String &var)
{
char buf[1024];
if (var == "type")
strcpy(buf,getTypeStr());
else if (var == "status")
strcpy(buf,getStatusStr());
else if (var == "address")
getHost().toStr(buf);
else if (var == "agent")
strcpy(buf,agent.cstr());
else if (var == "bitrate")
{
if (sock)
{
unsigned int tot = sock->bytesInPerSec+sock->bytesOutPerSec;
sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
}else
strcpy(buf,"0");
}else if (var == "uptime")
{
String uptime;
if (lastConnect)
uptime.setFromStopwatch(sys->getTime()-lastConnect);
else
uptime.set("-");
strcpy(buf,uptime.cstr());
}else if (var.startsWith("gnet."))
{
float ctime = (float)(sys->getTime()-lastConnect);
if (var == "gnet.packetsIn")
sprintf(buf,"%d",gnuStream.packetsIn);
else if (var == "gnet.packetsInPerSec")
sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsIn)/ctime:0);
else if (var == "gnet.packetsOut")
sprintf(buf,"%d",gnuStream.packetsOut);
else if (var == "gnet.packetsOutPerSec")
sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsOut)/ctime:0);
else if (var == "gnet.normQueue")
sprintf(buf,"%d",outPacketsNorm.numPending());
else if (var == "gnet.priQueue")
sprintf(buf,"%d",outPacketsPri.numPending());
else if (var == "gnet.flowControl")
sprintf(buf,"%d",flowControl?1:0);
else if (var == "gnet.routeTime")
{
int nr = seenIDs.numUsed();
unsigned int tim = sys->getTime()-seenIDs.getOldest();
String tstr;
tstr.setFromStopwatch(tim);
if (nr)
strcpy(buf,tstr.cstr());
else
strcpy(buf,"-");
}
else
return false;
}else
return false;
s.writeString(buf);
return true;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -