📄 channel.cpp.svn-base
字号:
ch->sock->writeLine("GET /%s/%s HTTP/1.0",ch->info.getTypeExt(ch->info.srcType),idStr); ch->sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT); // request metadata for mp3 if (ch->info.srcType == ChanInfo::T_MP3) ch->sock->writeLine("icy-metadata:1"); ch->sock->writeLine(""); ch->icyMetaInterval = 0; HTTP http(*ch->sock); http.checkResponse(200); while (http.nextHeader()) { if (http.isHeader("icy-metaint")) ch->icyMetaInterval = http.getArgInt(); LOG_CHANNEL("Ch.%d Raw GET: %s",ch->index,http.cmdLine); } }else{ ch->sock->writeLine("GET /channel/%s HTTP/1.0",idStr); ch->sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT); ch->sock->writeLine(""); HTTP http(*ch->sock); http.checkResponse(200); while (http.nextHeader()) LOG_CHANNEL("Ch.%d GET: %s",ch->index,http.cmdLine); } ch->setStatus(S_RECEIVING); LOG_CHANNEL("Ch.%d Ready",ch->index); ch->input = ch->sock; ch->readStream(); ch->setStatus(S_CLOSING); LOG_CHANNEL("Ch.%d Closed",ch->index); }catch(StreamException &e) { ChanHitList *chl = chanMgr->findHitListByID(ch->info.id); if (chl) chl->deadHit(ch->currSource); ch->setStatus(S_ABORT); LOG_ERROR("Ch.%d aborted: %s",ch->index,e.msg); } if (ch->sock) { ch->sock->close(); delete ch->sock; ch->sock = NULL; } //peercastApp->updateChannelInfo(NULL); } sys->sleepIdle(); } LOG_CHANNEL("Ch.%d closed",ch->index); chanMgr->lockHitList(ch->info.id,false); ch->endThread(); return 0;}// -----------------------------------void Channel::startICY(ClientSocket *cs, SRC_TYPE st){ srcType = st; type = T_BROADCAST; cs->timeout = 30000; sock = cs; input = cs; thread.data = this; thread.func = streamICY; if (!sys->startThread(&thread)) init();}// -----------------------------------void Channel::startFind(){ type = T_RELAY; thread.data = this; thread.func = findProc; if (!sys->startThread(&thread)) init();}// -----------------------------------int Channel::streamICY(ThreadInfo *thread){ thread->lock(); Channel *ch = (Channel *)thread->data; chanMgr->lockHitList(ch->info.id,true); LOG_CHANNEL("Channel started: %s",ch->getName()); try { while ((thread->active) && (!ch->input->eof())) { ch->setStatus(S_BROADCASTING); ch->readStream(); } LOG_CHANNEL("Ch.%d ended",ch->index); }catch(StreamException &e) { ch->setStatus(S_ABORT); LOG_ERROR("Ch.%d aborted: %s",ch->index,e.msg); sys->sleep(1000); ch->input->close(); } ch->setStatus(S_CLOSING); LOG_CHANNEL("Ch.%d stopped",ch->index); if (ch->input) { ch->input->close(); delete ch->input; } chanMgr->lockHitList(ch->info.id,false); ch->endThread(); return 0;}// -----------------------------------static char *nextMetaPart(char *str,char delim){ while (*str) { if (*str == delim) { *str++ = 0; return str; } str++; } return NULL;}// -----------------------------------static void copyStr(char *to,char *from,int max){ char c; while ((c=*from++) && (--max)) if (c != '\'') *to++ = c; *to = 0;}// -----------------------------------void Channel::processMetadata(char *str){ char *cmd=str; while (cmd) { char *arg = nextMetaPart(cmd,'='); if (!arg) break; char *next = nextMetaPart(arg,';'); if (strcmp(cmd,"StreamTitle")==0) info.track.title.setUnquote(arg,String::T_ASCII); else if (strcmp(cmd,"StreamUrl")==0) info.track.contact.setUnquote(arg,String::T_ASCII); cmd = next; } updateMeta();}// -----------------------------------XML::Node *ChanHitList::createHitsXML(){ return new XML::Node("hits listeners=\"%d\" hosts=\"%d\" busy=\"%d\" stable=\"%d\" firewalled=\"%d\" closest=\"%d\" furthest=\"%d\" newest=\"%d\"", numListeners(), numHits(), numBusy(), numStable(), numFirewalled(), closestHit(), furthestHit(), sys->getTime()-newestHit() ); }// -----------------------------------XML::Node *Channel::createRelayXML(bool showStat){ const char *ststr; ststr = getStatusStr(); if (!showStat) if ((status == S_RECEIVING) || (status == S_BROADCASTING)) ststr = "OK"; ChanHitList *chl = chanMgr->findHitListByID(info.id); return new XML::Node("relay listeners=\"%d\" relays=\"%d\" hosts=\"%d\" status=\"%s\"", numListeners, numRelays, (chl!=NULL)?chl->numHits():0, ststr ); }// -----------------------------------void ChanMeta::fromXML(XML &xml){ MemoryStream tout(data,MAX_DATALEN); xml.write(tout); len = tout.pos;}// -----------------------------------void ChanMeta::fromMem(void *p, int l){ len = l; memcpy(data,p,len);}// -----------------------------------void ChanMeta::addMem(void *p, int l){ if ((len+l) <= MAX_DATALEN) { memcpy(data+len,p,l); len += l; }}// -----------------------------------void Channel::updateMeta(){ XML xml; XML::Node *n = info.createChannelXML(); n->add(info.createTrackXML());// n->add(info.createServentXML()); xml.setRoot(n); insertMeta.fromXML(xml); ChanPacket pack; pack.init('META',insertMeta.data,insertMeta.len); chanData.writePacket(pack);}// -----------------------------------void Channel::readStream(){ chanMgr->broadcastRelays(chanMgr->broadcastTTL,NULL); switch(info.srcType) { case ChanInfo::T_MP3: LOG_CHANNEL("Ch.%d is MP3 - meta: %d",index,icyMetaInterval); readMP3(); break; case ChanInfo::T_NSV: LOG_CHANNEL("Ch.%d is NSV",index); readRaw(); break; case ChanInfo::T_OGG: LOG_CHANNEL("Ch.%d is OGG",index); readOGG(); break; case ChanInfo::T_MOV: LOG_CHANNEL("Ch.%d is MOV",index); readMOV(); break; case ChanInfo::T_MPG: LOG_CHANNEL("Ch.%d is MPG",index); readMPG(); break; case ChanInfo::T_PEERCAST: LOG_CHANNEL("Ch.%d is Peercast",index); readPeercast(); break; default: LOG_CHANNEL("Ch.%d is Raw",index); readRaw(); }}// -----------------------------------void Channel::readPeercast(){ ChanPacket pack; if (input->readTag() != 'PCST') throw StreamException("Not PeerCast stream"); syncPos = 0; info.numSkips = 0; peercastApp->channelStart(&info); while ((!input->eof() && (thread.active))) { pack.read(*input); chanData.writePacket(pack); MemoryStream mem(pack.data,pack.len); switch(pack.type) { case 'HEAD': LOG_CHANNEL("Ch.%d HEAD: %d",index,pack.len); if (pack.len > ChanMeta::MAX_DATALEN) throw StreamException("Bad HEAD"); headMeta.fromMem(pack.data,pack.len); break; case 'META': LOG_CHANNEL("Ch.%d META: %d",index,pack.len); insertMeta.fromMem(pack.data,pack.len); { if (pack.len) { XML xml; xml.read(mem); XML::Node *n = xml.findNode("channel"); if (n) { info.updateFromXML(n); LOG_CHANNEL("Ch.%d track update: %s - %s",index,info.track.artist.cstr(),info.track.title.cstr()); peercastApp->channelUpdate(&info); ChanHitList *chl = chanMgr->findHitListByID(info.id); if (chl) chl->info.updateFromXML(n);; } } } break; case 'DATA': //LOG_CHANNEL("DATA: %d",pack.len); if (info.numSkips) info.numSkips--; break; case 'SYNC': { unsigned int s = mem.readLong(); if ((s-syncPos) != 1) { LOG_CHANNEL("Ch.%d SKIP: %d to %d (%d)",index,syncPos,s,info.numSkips); if (syncPos) { info.numSkips++; if (info.numSkips>50) throw StreamException("Bumped - Too many skips"); } } syncPos = s; } break; default: LOG_CHANNEL("Bad channel packet: %x",pack.type); } if (checkBump()) throw StreamException("Bumped"); if (checkIdle()) break; if (readDelay) sys->sleep(readDelay); } peercastApp->channelStop(&info);}// -----------------------------------void Channel::readRaw(){ ChanPacket pack; while ((!input->eof() && (thread.active))) { syncPos++; pack.init('SYNC',&syncPos,sizeof(syncPos)); chanData.writePacket(pack); pack.len = sizeof(pack.data); input->read(pack.data,pack.len); pack.type = 'DATA'; chanData.writePacket(pack); if (checkBump()) throw StreamException("Bumped"); if (checkIdle()) break; if (readDelay) sys->sleep(readDelay); }}// -----------------------------------void Channel::readMPG(){ ChanPacket pack;// for(int i=0; i<10; i++)// {// unsigned int v = in.readLong();// LOG_CHANNEL("raw %d: %08x",i,SWAP4(v));// } while ((!numListeners) && (thread.active)) sys->sleep(1000); sys->sleep(2000);// in.read(headMeta.data,1024);// headMeta.len = 1024; while ((!input->eof() && (thread.active))) { syncPos++; pack.init('SYNC',&syncPos,sizeof(syncPos)); chanData.writePacket(pack); int rlen = 4000; rlen = input->read(pack.data,rlen); //LOG_CHANNEL("raw read %d - %d",syncPos,rlen); pack.len = rlen; pack.type = 'DATA'; chanData.writePacket(pack); if (checkBump()) throw StreamException("Bumped"); if (checkIdle()) break; if (readDelay) sys->sleep(readDelay); }}// -----------------------------------void Channel::readMP3(){ ChanPacket pack; while ((!input->eof() && (thread.active))) { if (icyMetaInterval) { int rlen = icyMetaInterval; while (rlen) { int rl = rlen; if (rl > sizeof(pack.data)) rl = sizeof(pack.data); syncPos++; pack.init('SYNC',&syncPos,sizeof(syncPos)); chanData.writePacket(pack); pack.len = rl; input->read(pack.data,pack.len); pack.type = 'DATA'; chanData.writePacket(pack); rlen-=rl; } unsigned char len; input->read(&len,1); if (len) { if (len*16 > 1024) len = 1024/16; char buf[1024]; input->read(buf,len*16); processMetadata(buf); } }else{ syncPos++; pack.init('SYNC',&syncPos,sizeof(syncPos)); chanData.writePacket(pack); pack.len = sizeof(pack.data); input->read(pack.data,pack.len); pack.type = 'DATA'; chanData.writePacket(pack); } if (checkBump()) throw StreamException("Bumped"); if (checkIdle()) break; if (readDelay) sys->sleep(readDelay); }}// -----------------------------------void Channel::readMOV(){ ChanPacket pack; bool done=false; headMeta.len = 0; while ((!input->eof() && (thread.active)) && (!done)) { unsigned int olen = input->readLong(); unsigned int otag = input->readLong(); // don't ask. unsigned int len = SWAP4(olen); unsigned int tag = SWAP4(otag); len -= 8; if ((tag == 'mdat') && (len)) { LOG_CHANNEL("Ch.%d mdat: %d",index,len); { LOG_CHANNEL("mov wait: %d",headMeta.len); headMeta.addMem(&olen,4); headMeta.addMem(&otag,4); while ((!numListeners) && (thread.active)) sys->sleep(1000); sys->sleep(1000); LOG_CHANNEL("mov go"); while ((len) && (thread.active)) { syncPos++; pack.init('SYNC',&syncPos,sizeof(syncPos)); chanData.writePacket(pack); int rlen = 4000; rlen = input->read(pack.data,rlen); pack.type = 'DATA'; pack.len = rlen; chanData.writePacket(pack); len -= rlen; if (checkBump()) done=true; if (checkIdle()) done=true; if (readDelay) sys->sleep(readDelay); } LOG_CHANNEL("Mov data end"); } }else{ LOG_CHANNEL("Ch.%d %c%c%c%c: %d",index,tag>>24&0xff,tag>>16&0xff,tag>>8&0xff,tag&0xff,len); if (headMeta.len+len > ChanMeta::MAX_DATALEN) throw StreamException("MOV section too big"); headMeta.addMem(&olen,4); headMeta.addMem(&otag,4); if (len) { input->read(headMeta.data+headMeta.len,len); headMeta.len += len; } } }}// -----------------------------------bool OggPacket::isBOS(){ return (data[5] & 0x02) != 0;}// -----------------------------------bool OggPacket::isVorbisPacket(){ char *p = (char *)getContent(); return memcmp(p+1,"vorbis",6)==0;}// -----------------------------------int OggPacket::getVorbisType(){ char *p = (char *)getContent(); return p[0];}// -----------------------------------void OggPacket::read(Stream &in){ // skip until we get OggS identifier bool gotOgg=false; while (!gotOgg) if (in.readChar() == 'O') if (in.readChar() == 'g') if (in.readChar() == 'g') if (in.readChar() == 'S') gotOgg = true; len = 0; data[0] = 'O'; data[1] = 'g'; data[2] = 'g'; data[3] = 'S'; len += 4; in.read(&data[len],23); len += 23; //LOG("OGG: head = %02x",data[5]); //LOG4("OGG: page num = %d",*(unsigned int *)&data[18]); //LOG("OGG: serial = %x",*(unsigned int *)&data[14]); numSegs = data[26]; segLen=0; in.read(&data[len],numSegs); for(int i=0; i<numSegs; i++) segLen += data[len++]; //LOG4("OGG: segs = %d/%d bytes",numSegs,segLen); if (segLen > MAX_DATALEN-128) throw StreamException("OGG packet too big"); in.read(&data[len],segLen); len += segLen; //LOG4("OGG: packet len=%d",len);}// -----------------------------------void ChanPacket::init(unsigned int t, const void *p, unsigned int l){ type = t; if (l > MAX_DATALEN) l = MAX_DATALEN; len = l; memcpy(data,p,len);}// -----------------------------------void ChanPacket::write(Stream &out){ out.writeTag(type); out.writeShort(len); out.writeShort(0); out.write(data,len);}// -----------------------------------void ChanPacket::read(Stream &in){ type = in.readTag(); len = in.readShort(); in.readShort(); if (len > MAX_DATALEN) throw StreamException("Bad ChanPacket"); in.read(data,len);}#if 0// -----------------------------------unsigned int ChanBuffer::read(unsigned int gpos,void *p,unsigned int len){ if (gpos == 0) gpos = pos;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -