📄 servent.cpp.svn-base
字号:
chanMgr->trackerHitList.deadHit(best);
try
{
if (sv->sock)
{
sv->sock->close();
delete sv->sock;
sv->sock = NULL;
}
}catch(StreamException &) {}
sys->sleepIdle();
} sv->kill(); return 0;}// -----------------------------------int Servent::incomingProc(ThreadInfo *thread){ thread->lock(); Servent *sv = (Servent*)thread->data;
char ipStr[64];
sv->sock->host.toStr(ipStr);
try { sv->handshakeIncoming(); }catch(HTTPException &e) { try { sv->sock->writeLine(e.msg); if (e.code == 401) sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\""); sv->sock->writeLine(""); }catch(StreamException &){} LOG_ERROR("Incoming from %s: %s",ipStr,e.msg); }catch(StreamException &e) { LOG_ERROR("Incoming from %s: %s",ipStr,e.msg); }
sv->kill(); return 0;}// -----------------------------------void Servent::processServent(){ setStatus(S_HANDSHAKE); handshakeIn(); if (!sock) throw StreamException("Servent has no socket");
processGnutella();}// -----------------------------------void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo){
if (!doneHandshake)
{
setStatus(S_HANDSHAKE);
if (!handshakeStream(chanInfo))
return;
}
if (chanInfo.id.isSet())
{
chanID = chanInfo.id;
LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));
if (!waitForChannelHeader(chanInfo))
throw StreamException("Channel not ready");
servMgr->totalStreams++; Host host = sock->host; host.port = 0; // force to 0 so we ignore the incoming port
Channel *ch = chanMgr->findChannelByID(chanID);
if (!ch)
throw StreamException("Channel not found");
if (outputProtocol == ChanInfo::SP_HTTP) { // we have agent string now, so check again. if (!canPreview()) throw StreamException("Preview disallowed"); if ((addMetadata) && (chanMgr->icyMetaInterval)) sendRawMetaChannel(chanMgr->icyMetaInterval); else
sendRawChannel(true,true);
}else if (outputProtocol == ChanInfo::SP_MMS)
{
if (nsSwitchNum)
{
sendRawChannel(true,true);
}else
{
sendRawChannel(true,false);
}
}else if (outputProtocol == ChanInfo::SP_PCP) { sendPCPChannel();
} else if (outputProtocol == ChanInfo::SP_PEERCAST)
{
sendChannel(ch);
}
}
setStatus(S_CLOSING);}// -----------------------------------------#if 0// debug FileStream file; file.openReadOnly("c://test.mp3"); LOG_DEBUG("raw file read"); char buf[4000]; int cnt=0; while (!file.eof()) { LOG_DEBUG("send %d",cnt++); file.read(buf,sizeof(buf)); sock->write(buf,sizeof(buf)); } file.close(); LOG_DEBUG("raw file sent"); return;// debug#endif// -----------------------------------bool Servent::waitForChannelHeader(ChanInfo &info){ for(int i=0; i<30*10; i++) {
Channel *ch = chanMgr->findChannelByID(info.id);
if (!ch)
return false;
if (ch->isPlaying() && (ch->rawData.writePos>0)) return true; if (!thread.active || !sock->active()) break; sys->sleep(100); } return false;}// -----------------------------------bool Servent::checkPreview(unsigned int connectTime){ Host h = sock->host; h.port = 0; // ignore incoming port number if (!servMgr->isFiltered(ServFilter::F_DIRECT,h)) return false; if (isPrivate()) // always allow private clients return true; return false;}// -----------------------------------bool Servent::canPreview(){ // probably not needed, but just in case we bind to the actual IP address and not localhost if (sock->host.ip == servMgr->serverHost.ip) return true; Host h = getHost(); h.port = 0; if (!servMgr->isFiltered(ServFilter::F_DIRECT,h)) return false; if (isPrivate()) // always allow private clients return true; if (agent.contains("PeerCast")) // allow connections from peercast clients (direct relays etc..) return true; return true;}// -----------------------------------void Servent::sendRawChannel(bool sendHead, bool sendData){
try {
Channel *ch = chanMgr->findChannelByID(chanID);
if (!ch)
throw StreamException("Channel not found");
setStatus(S_CONNECTED); LOG_DEBUG("Starting Raw stream: %s",ch->info.name.cstr());
if (sendHead)
{ ch->headPack.writeRaw(*sock);
streamPos = ch->headPack.pos + ch->headPack.len;
LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
}
if (sendData)
{
unsigned int connectTime=sys->getTime();
unsigned int streamIndex = ch->streamIndex;
while ((thread.active) && sock->active()) { if (!checkPreview(connectTime)) throw StreamException("Preview time limit reached");
ch = chanMgr->findChannelByID(chanID);
if (ch)
{
if (streamIndex != ch->streamIndex)
{
streamIndex = ch->streamIndex;
streamPos = ch->headPack.pos;
LOG_DEBUG("sendRaw got new stream index");
}
ChanPacket rawPack;
if (ch->rawData.findPacket(streamPos,rawPack))
{
if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD)) rawPack.writeRaw(*sock);
if (rawPack.pos < streamPos)
LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
streamPos = rawPack.pos+rawPack.len;
//LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
}
}
sys->sleepIdle(); }
} }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); }}// -----------------------------------void Servent::sendRawMetaChannel(int interval){
try {
Channel *ch = chanMgr->findChannelByID(chanID);
if (!ch)
throw StreamException("Channel not found");
setStatus(S_CONNECTED); LOG_DEBUG("Starting Raw Meta stream: %s (metaint: %d)",ch->info.name.cstr(),interval); String lastTitle,lastURL; int lastMsgTime=sys->getTime(); bool showMsg=true; char buf[16384]; int bufPos=0; if ((interval > sizeof(buf)) || (interval < 1)) throw StreamException("Bad ICY Meta Interval value");
unsigned int connectTime = sys->getTime();
streamPos = 0; // raw meta channel has no header (its MP3)
while ((thread.active) && sock->active()) { if (!checkPreview(connectTime)) throw StreamException("Preview time limit reached");
ch = chanMgr->findChannelByID(chanID);
if (ch)
{
ChanPacket rawPack;
if (ch->rawData.findPacket(streamPos,rawPack))
{
MemoryStream mem(rawPack.data,rawPack.len);
if (rawPack.type == ChanPacket::T_DATA) {
int len = rawPack.len; char *p = rawPack.data; while (len) { int rl = len; if ((bufPos+rl) > interval) rl = interval-bufPos; memcpy(&buf[bufPos],p,rl); bufPos+=rl; p+=rl; len-=rl; if (bufPos >= interval) { bufPos = 0; sock->write(buf,interval); if (chanMgr->broadcastMsgInterval) if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval) { showMsg ^= true; lastMsgTime = sys->getTime(); } String *metaTitle = &ch->info.track.title; if (!ch->info.comment.isEmpty() && (showMsg)) metaTitle = &ch->info.comment; if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL)) { char tmp[1024]; String title,url; title = *metaTitle; url = ch->info.url; title.convertTo(String::T_META); url.convertTo(String::T_META); sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr()); int len = ((strlen(tmp) + 15+1) / 16); sock->writeChar(len); sock->write(tmp,len*16); lastTitle = *metaTitle; lastURL = ch->info.url; LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr()); }else { sock->writeChar(0); } }
} }
streamPos = rawPack.pos + rawPack.len; }
}
sys->sleepIdle();
} }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); }}// -----------------------------------void Servent::sendChannel(Channel *ch){
#if 0
downData.accept = ChanPacket::T_HEAD|ChanPacket::T_DATA|ChanPacket::T_META;
ch->numRelays++;
try { sock->timeout = 10000; setStatus(S_CONNECTED); LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr()); sock->writeTag("PCST"); ChanPacket pack;
ch->headPack.write(*sock);
pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->syncPos,ch->streamPos); pack.write(*sock); while ((thread.active) && sock->active()) { if (!ch->isActive()) throw StreamException("Channel closed"); unsigned int np = downData.readPacket(syncPos,pack); if ((np-syncPos) > 1) LOG_DEBUG("sendChannel skip: %d",np-syncPos); syncPos = np; pack.write(*sock); } }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); } if (ch->numRelays) ch->numRelays--;#endif
}
// -----------------------------------
void Servent::sendPCPChannel()
{
Channel *ch = chanMgr->findChannelByID(chanID);
if (!ch)
throw StreamException("Channel not found");
AtomStream atom(*sock);
try
{
LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);
setStatus(S_CONNECTED);
atom.writeParent(PCP_CHAN,3 + ((streamPos==0)?1:0));
atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
ch->info.writeInfoAtoms(atom);
ch->info.writeTrackAtoms(atom);
if (streamPos == 0)
{
atom.writeParent(PCP_CHAN_PKT,3);
atom.writeInt(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
streamPos = ch->headPack.pos+ch->headPack.len;
LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
}
unsigned int streamIndex = ch->streamIndex;
while (thread.active)
{
Channel *ch = chanMgr->findChannelByID(chanID);
if (ch)
{
if (streamIndex != ch->streamIndex)
{
streamIndex = ch->streamIndex;
streamPos = ch->headPack.pos;
LOG_DEBUG("sendPCPStream got new stream index");
}
ChanPacket rawPack;
if (ch->rawData.findPacket(streamPos,rawPack))
{
bool hasKey=rawPack.key.isSet();
if (rawPack.type == ChanPacket::T_HEAD)
{
atom.writeParent(PCP_CHAN,2);
atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
atom.writeParent(PCP_CHAN_PKT,3 + (hasKey?1:0));
atom.writeInt(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
atom.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
atom.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
if (hasKey)
atom.writeBytes(PCP_CHAN_PKT_KEY,rawPack.key.id,16);
}else if (rawPack.type == ChanPacket::T_DATA)
{
atom.writeParent(PCP_CHAN,2);
atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
atom.writeParent(PCP_CHAN_PKT,3 + (hasKey?1:0));
atom.writeInt(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
atom.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
atom.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
if (hasKey)
atom.writeBytes(PCP_CHAN_PKT_KEY,rawPack.key.id,16);
}
if (rawPack.pos < streamPos)
LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);
streamPos = rawPack.pos+rawPack.len;
}
}
BroadcastState bcs;
bcs.srcID = remoteID;
pcpStream.readPacket(*sock,bcs);
sys->sleepIdle();
}
LOG_DEBUG("PCP stream of channel closed.");
}catch(StreamException &e)
{
LOG_ERROR("Stream channel: %s",e.msg);
}
try
{
atom.writeInt(PCP_QUIT,pcpStream.error);
}catch(StreamException &) {}
}
// -----------------------------------int Servent::serverProc(ThreadInfo *thread){ thread->lock(); Servent *sv = (Servent*)thread->data; try { if (!sv->sock) throw StreamException("Server has no socket"); sv->setStatus(S_LISTENING); //LOG4("Listening on port %d",sv->sock->host.port); char servIP[64]; sv->sock->host.toStr(servIP); if (servMgr->isRoot) LOG_DEBUG("Root Server started: %s",servIP); else LOG_DEBUG("Server started: %s",servIP); while ((thread->active) && (sv->sock->active())) { ClientSocket *cs = sv->sock->accept(); if (cs) { Servent *ns = servMgr->allocServent(); if (ns) { ns->networkID = servMgr->networkID; ns->initIncoming(cs,sv->allow); }else LOG_ERROR("Out of servents"); } } }catch(StreamException &e) { LOG_ERROR("Server Error: %s:%d",e.msg,e.err); } LOG_DEBUG("Server stopped"); sv->kill(); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -