📄 fifo.c
字号:
//fifo.c#include "fifo.h"//**************************************************************//datadispatch module// 1. dispatch one data source to multiple outputs// 2. input packet size and output packet size can be variable// 3. if [data cache] is full, input function don't put data into [data cache] and log the error.//// |------>[data cache]======>// [data source]----->|// |------>[data cache]======>// |// |------>[data cache]======>// |//// --------> input ======>get from cache//////Usage:// 1.call dispInit().// 2.Call regOutputModule(): once to register one output. This function is not thread safe.// 3.The input thread call datadispInput() to input data; and each output thread call getdatadisp() to get data from cache.// 4.After input and output threads stopped, then call disregAllOutputModules() to unregister all output modules.DISP_HANDLE dispInit(){ DISP_OUTPUT* handle = (DISP_OUTPUT*)malloc(sizeof(DISP_OUTPUT)); handle->dispindex = 0; return handle;}//register an output module//return output module ID, error: <0int regOutputModule(DISP_HANDLE h,DWORD fifoSize){ DISP_OUTPUT* handle = (DISP_OUTPUT*)h; DISP_OUPUT_CORE * pCore = &(handle->dispcore[handle->dispindex]); assert(handle->dispindex<MAX_OUTPUT_MODULE); if (handle->dispindex>=MAX_OUTPUT_MODULE) return -1;//full pCore->buf = (char *)malloc(fifoSize); assert(pCore->buf); if (!(pCore->buf)) return -2; pCore->bufsize = fifoSize; pCore->pin = pCore->buf; pCore->pout = pCore->buf; pCore->discardBytes = 0; pCore->inputBytes= 0; pCore->outputBytes= 0; if(pthread_mutex_init(&(pCore->mut),NULL)!=0) { mylog("init mutex in regoutModule error! \n"); return -1; }//add by Eric handle->dispindex++; return (handle->dispindex-1);}//free all modules that registered by regOutputModule()int disregAllOutputModules(DISP_HANDLE h){ DISP_OUTPUT* handle = (DISP_OUTPUT*)h; DISP_OUPUT_CORE * pCore; int i; int size; for (i=0; i<handle->dispindex; i++) { pCore = &(handle->dispcore[i]); //get valide data size size=pCore->pin - pCore->pout; if (size<0) size+=pCore->bufsize; mylog("output module #%d, input bytes[%u],output bytes[%u], discard bytes[%u], left bytes[%i]\n",i,pCore->inputBytes,pCore->outputBytes,pCore->discardBytes,size); assert(pCore->buf); free(pCore->buf); pCore->buf = 0; pthread_mutex_destroy(&pCore->mut);//add by Eric } handle->dispindex = 0; free(handle); return 0;}//dispatch data to each output module//if return -3 ,indicating full not input anydataint datadispInput(DISP_HANDLE h, const void *data, int datalen){ DISP_OUTPUT* handle = (DISP_OUTPUT*)h; DISP_OUPUT_CORE * pCore; int size; int i; int rightsize; int bFullNotInput=false;; for (i=0; i< handle->dispindex; i++) { //mylog("dispindex size=%d \n",handle->dispindex); pCore = &(handle->dispcore[i]); //get valide data size if(pthread_mutex_lock(&pCore->mut) !=0) { mylog(" cj input data error !lock mutex error!\n"); return -1; }//add by Eric size=pCore->pin - pCore->pout; if (size<0) size+=pCore->bufsize; //printf("cur size 0x%x\n",size); if (datalen+size > pCore->bufsize-1) {//1: to differ empty and full pCore->discardBytes+=datalen;//#ifdef FULL_LOG bFullNotInput=true;// mylog("output module #%d is full, data in buffer [%d], can't push in [%d],pCore->pin=%d,pCore->pout=%d,pCore->bufsize=%d\n",i,size,datalen,pCore->pin,pCore->pout,pCore->bufsize);//#endif pthread_mutex_unlock(&pCore->mut); continue; } bFullNotInput=false; //copy data rightsize=pCore->buf + pCore->bufsize - pCore->pin;// mylog("Eric:: before data input \n buf start: %d, pCore->pin: %d, pout: %d\nrightsize is %d, datalen is %d\n",// pCore->buf,pCore->pin,pCore->pout,rightsize,datalen); if (rightsize>=datalen) { memcpy(pCore->pin,data,datalen); } else { memcpy(pCore->pin,data,rightsize); memcpy(pCore->buf,(char *)data+rightsize,datalen-rightsize); } //set inputBytes pCore->inputBytes+= datalen; //move pIn; if (pCore->pin+datalen > pCore->buf + pCore->bufsize-1) pCore->pin = pCore->pin+datalen - pCore->bufsize; else pCore->pin+=datalen; pthread_mutex_unlock(&pCore->mut); //add by Eric// mylog("Eric:: after datainput\n buf start: %d, pCore->pin: %d, pCore->pout : %d\n\n",// pCore->pin,pCore->pout); } if(bFullNotInput) return -3; //indicating full not input; return 0; }int getdatadispsize(DISP_HANDLE h,int id){ DISP_OUTPUT* handle = (DISP_OUTPUT*)h; DISP_OUPUT_CORE * pCore = &(handle->dispcore[id]); int size=0;//valide data size if(id>=handle->dispindex&&id<0) mylog("Eric:: invalde index=%d",id); //assert(id>=0 && id < handle->dispindex); if (id<0 || id>=handle->dispindex) return -1; if(pthread_mutex_lock(&pCore->mut)!=0) { mylog("cj get datadisp size error lock mutex error!\n"); return -1; } else { size=pCore->pin - pCore->pout; if (size<0) size+=pCore->bufsize; pthread_mutex_unlock(&pCore->mut); //add by Eric } return size;}//get data from fifo of a registered module, if size in the fifo is not enough, return -1/*modified by caojie //usage: return -1 means not enough data ,if bForce=true ,read all the left,else ,not read //if read all the left,*pOutputlen indicates how many byte returned*/// end of data dispatch module ===============================================//int getdatadisp(DISP_HANDLE h, int id,void *buf, int* pOutputlen,bool bForce){ DISP_OUTPUT* handle = (DISP_OUTPUT*)h; DISP_OUPUT_CORE * pCore = &(handle->dispcore[id]); int size;//valide data size int rightsize; int outputlen=*pOutputlen; int bNotEnough=false; if(id>=handle->dispindex&&id<0) mylog("Eric:: invalde index=%d",id); assert((id>=0) && (id < handle->dispindex)); if (id<0 || id>=handle->dispindex){ mylog("getdata disp error!!index d%\n",id); return -2; } if(pthread_mutex_lock(&pCore->mut) !=0) { mylog("cj lock mutex error! getdatadisp \n"); return -1;//modified by caojie } size=pCore->pin - pCore->pout; if (size<0) size+=pCore->bufsize; if (size<outputlen) { //printf("no enough data in fifo[id:%d], read [len:%d], left [len:%d]\n",id,outputlen,size); if(!bForce) { *pOutputlen=0; pthread_mutex_unlock(&pCore->mut); //add by caojie // mylog("not force read reading data from fifo not enough size=%d,poutputlen=%d,pCore->pin=%d,pCore->pout=%d \n",size,outputlen,pCore->pin,pCore->pout); return -1; } else { *pOutputlen=size; //read all the left outputlen=size; bNotEnough=true; } } //copy data rightsize=pCore->buf + pCore->bufsize - pCore->pout; if (rightsize>=outputlen) { memcpy(buf,pCore->pout,outputlen); } else { memcpy(buf,pCore->pout,rightsize); memcpy((char *)buf+rightsize,pCore->buf,outputlen-rightsize); } //move pOut; if (pCore->pout+outputlen > pCore->buf + pCore->bufsize-1) pCore->pout = pCore->pout + outputlen - pCore->bufsize; else pCore->pout+=outputlen; //printf("Output udp data length:%d\n",outputlen); pCore->outputBytes+=outputlen; pthread_mutex_unlock(&pCore->mut); //add by Eric if(bNotEnough){ // mylog("cj::data is not Enough in fifo!!\n"); return -1;} return 0;}//**************************************************************//fifo module//The module is used for many inputers and one outputer//The output packet size is the same as the input packet size////// end of command fifo module=====================================//
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -