📄 sync.cc
字号:
/* File: sync.cc By: Alex Theo de Jong Created: February 1996 Description: Synchronization object for multiple streams*/#ifdef __GNUG__#pragma implementation#endif#include "athread.hh"#include <stdio.h>#include <iostream.h>#include <sys/time.h>#ifdef LINUX#include <sys/sem.h>#endif#include "error.hh"#include "debug.hh"#include "util.hh"#include "sync.hh"/* * * StampQueue * */StampQueue::StampQueue(int max) : in(max), out(0), index_in(0), index_out(0), resync(0), newtime(0.0),maximum(max), total(0){ if (!(stamps=new double[maximum])) error("could not allocate memory"); if (!(bytes=new int[maximum])) error("could not allocate memory"); }StampQueue::~StampQueue(){ delete[] stamps; delete[] bytes;}int StampQueue::get(double& stamp){#ifdef TRACE if (total==0) warning("stamp queue underflow (stamp)");#endif wait_out(); lock(); stamp=stamps[index_out]; TRACER("StampQueue::get " << dtoa(stamp)); if (++index_out>=maximum) index_out=0; total--; unlock(); post_in(); return 1;}int StampQueue::get(double& stamp, int& b){#ifdef TRACE if (total==0) warning("stamp queue underflow (stamp and bytes)");#endif wait_out(); lock(); stamp=stamps[index_out]; b+=bytes[index_out]; if (++index_out>=maximum) index_out=0; TRACER("StampQueue::get" << dtoa(stamp) << " " << itoa(bytes[index_out])); total--; unlock(); post_in(); return 1;}int StampQueue::put(const double stamp, const int b){#ifdef TRACE TRACER("StampQueue::put " << dtoa(stamp) << " " << itoa(b)); if (total==maximum){ warning("stamp queue overflow"); }#endif wait_in(); lock(); stamps[index_in]=stamp; bytes[index_in]=b; if (++index_in>=maximum) index_in=0; total++; unlock(); post_out(); return 1;}/* * * SyncTimer * */SyncTimer::SyncTimer(int max) : StampQueue(max), time(-1.0) {}int SyncTimer::update(){ get(newtime); lock(); // == continuous re-sync if time-stamps are not used // < only re-sync if next time stamp drops (ie. jumping back into stream) resync=(newtime<=time) ? 1 : 0; #ifdef TRACE if (resync==1) TRACER("re_sync time");#endif time=newtime;TRACER("Timeupdate " << dtoa(time)); time_cond.broadcast(); unlock(); return resync;}int SyncTimer::done(){ TRACER("int SyncTimer::done()"); lock(); time=0x0fffffff; // maximum time time_cond.broadcast(); unlock(); return 1;}/* * * SyncData * */SyncData::SyncData(int max, SyncTimer* t) : StampQueue(max), terminated(0), time(0.0), bytes(0){ timer=t;}int SyncData::wait(){#ifdef UPTIGHT bytelock(); // bytes#endif DEBUGGER("W" << itoa(bytes)); if (bytes<=0){ get(newtime, bytes);//message("wait for time " << dtoa(newtime) << " " << dtoa(time)); if (newtime<=time){ resync=0; // found new sync time; stop re-sync TRACER("reset data resync"); }//msg("9"); time_lock.lock();//msg("8"); time=newtime; time_cond.signal();//msg("7"); time_lock.unlock();//msg("6"); timer->lock();//message("wait for time " << dtoa(timer->time) << " it is " << dtoa(time) << " resync " << itoa(resync)); while (resync==0 && (time>timer->time) && (terminated==0)){//message("inwhile"); timer->time_cond.wait(&timer->time_lock);//message("wait for time " << dtoa(timer->time) << "it is " << dtoa(time)); } TRACER("Twait " << dtoa(time) << " =< " << dtoa(timer->time)); timer->unlock();#ifdef UPTIGHT byteunlock();#endif return 1; }#ifdef UPTIGHT byteunlock();#endif return 0;}int SyncData::skip(){ TRACER("int SyncData::skip()");#ifdef UPTIGHT bytelock();#endif if (bytes<=0){ get(newtime, bytes); if (newtime<time) resync=0; // found new sync time; stop re-sync time_lock.lock(); time=newtime; time_cond.signal(); time_lock.unlock();#ifdef UPTIGHT byteunlock();#endif return 1; }#ifdef UPTIGHT byteunlock();#endif return 0;}int SyncData::update(){ time_lock.lock(); if (timer->resync==1){ resync=1; // re_sync data TRACER("resync data"); } else { while ((time<=timer->time) && (terminated==0)){ time_cond.wait(&time_lock); } } DEBUGGER("T" << dtoa(time) << "<" << dtoa(timer->time)); time_lock.unlock(); return resync;}int SyncData::done(int term=0){ return (term) ? terminated=term : terminated; }/* * * Synchronization * *//* type sync id notes 0 1, 2 video + audio 1 1 video 2 2 audio n 1,2,... n video + audio + others*/Synchronization::Synchronization(int type, int t_qsize, int f_qsize) : timer(t_qsize), terminate(0), terminated(0) { int i; int error=0; sched_param param; int policy; for (i=0; i<Max_Sync_Process; i++) syncs[i]=0; // set all to 0 if (type>2){ for (i=0; i<Max_Sync_Process; i++) syncs[i]=new SyncData(f_qsize, &timer); } else if (type==0){ for (i=0; i<2 && i<Max_Sync_Process; i++) syncs[i]=new SyncData(f_qsize, &timer); // id 1 & 2 } else if (type==1){ syncs[0]=new SyncData(f_qsize, &timer); // only id=1 } else if (type==2){ syncs[0]=0; syncs[1]=new SyncData(f_qsize, &timer); // only id=2 } if ((error=athr_create((void*(*)(void*)) Synchronization::init, this, &id))<0){ error("failed to create thread"); athr_exit(0); } if ((error=athr_getschedparam(id, &policy, ¶m))<0){ warning("could not get thread prio - ignored"); } else {#ifdef LINUX param.sched_priority+=1;// policy = SCHED_RR; TRACER("TIMERPRIORITY=" << param.sched_priority << "(" << param.sched_priority-1 << ")");#else param.prio+=1; TRACER("TIMERPRIORITY=" << param.prio << "(" << param.prio-1 << ")");#endif if ((error=athr_setschedparam(id, policy, ¶m))<0){ warning("could not set thread prio - ignored"); } }}Synchronization::~Synchronization(){ if (!terminated){ TRACER("waiting for synchronization thread to terminate ..."); athr_join(id); } TRACER("synchronization thread terminate!"); }void* Synchronization::init(Synchronization* s){ TRACER("void* Synchronization::init(Synchronization* s)"); int i; int resync=0; while (!s->terminate){ resync=s->timer.update();#ifdef TRACE msg("!"); if (s->syncs[0] && s->syncs[1]){ if (s->syncs[0]->total==0 && s->syncs[1]->total==s->syncs[1]->maximum){ TRACER("video stamps underflow, audio stamps overflow"); } if (s->syncs[0]->total==s->syncs[0]->maximum && s->syncs[1]->total==0){ TRACER("audio stamps underflow, video stamps overflow"); } }#endif for (i=0; ((s->terminate==0) && (i<Max_Sync_Process)); i++){ if ((s->syncs[i]!=0) && ((s->syncs[i]->total!=0) || resync)){#ifdef TRACE msg(itoa(i));#endif s->syncs[i]->update(); } } } // finish up while (s->timer.total) s->timer.update(); s->timer.done(); s->terminated=1; athr_exit(0); return 0;}int Synchronization::stop(){ TRACER("int Synchronization::stop()"); terminate=1; double finaltime=0.0; for (int i=0; i<Max_Sync_Process; i++){ if (syncs[i]){ syncs[i]->terminated=1; syncs[i]->put(finaltime); // put last dummy time stamp syncs[i]->time_cond.signal(); } } finaltime=0x0fffffff; timer.put(finaltime); return terminated;}int Synchronization::usedbytes(int ID, int b){ return (ID!=0) ? syncs[ID-1]->usedbytes(b) : -1; }int Synchronization::wait(int ID){ return (ID!=0) ? syncs[ID-1]->wait() : -1; }int Synchronization::skip(int ID){ return (ID!=0) ? syncs[ID-1]->skip() : -1; }int Synchronization::put(const double stamp){ return timer.put(stamp,0); }int Synchronization::put(int ID, const double s, const int b){ return (ID!=0) ? syncs[ID-1]->put(s, b) : -1; }int Synchronization::pause(){ return athr_suspend(id); } // stop timerint Synchronization::resume(){ return athr_continue(id); } // continu timerint Synchronization::done(int ID){ return (ID!=0) ? syncs[ID-1]->done() : -1; }#ifdef MAINmain(int argc, char** argv){ // Just a test to compile and link Synchronization sync(); while (1){ }} // set higher priority for reading process int prio=0; thr_getprio(thr_self(), &prio); prio++; thr_setprio(thr_self(), prio);#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -