defaultcircularbuffer.cpp

来自「C++封装的视频采集代码」· C++ 代码 · 共 1,498 行 · 第 1/5 页

CPP
1,498
字号
#include "DefaultCircularBuffer.h"#include "Misc/SequenceOfPairsCompare.h"#include "SysLib/Lock.h"#include "SysLib/taskLib.h"#include <utility>namespace oxsemi{    namespace circular_buffer    {        DefaultCircularBuffer::DefaultCircularBuffer(            unsigned maxEmptyEventListeners,            unsigned maxReadThresholdEventListeners,            unsigned maxWriteThresholdEventListeners,            unsigned long size,            int numberOfReaders) :            CircularBuffer(                maxEmptyEventListeners,                maxReadThresholdEventListeners,                maxWriteThresholdEventListeners),            mutex_(false),            buffer_(new UnCachedBuffer(size, ALIGNMENT_POWER_OF_2)),            writePosition_(size-1, 0, 0),            writeEmpty_(false),            mostRecentAcquireAfterPosition_(size-1, 0, 0),            numberOfReaders_(numberOfReaders),            readPositions_(new ReaderPositionInfo[numberOfReaders_]),            knownObjectTypes_(INITIAL_KNOWN_OBJECT_TYPES_SIZE),            nextFreeId_(MOVE_TO_START_ID + MAX_SPECIAL_IDS),            readWaitCount_(0),            writeWaitCount_(0),            doNotBlock_(false),            doNotBlockReaders_(false),            mpReadThresholds_(new ReadThresholdPolicy[numberOfReaders]),            writeThreshold_(0),            readLimitInfo_(new std::pair<bool, unsigned long>[numberOfReaders_]),            numberOfObjects_(new unsigned long[numberOfReaders_]),            limitCount_(new unsigned long[numberOfReaders_]),            lastAnyReadBuffersAvailable_(true),            outstandingWriteAcquisitions_(0),            outstandingReadAcquisitions_(new unsigned[numberOfReaders_])#ifdef INUSE_TRACKING            ,writeInUse_(INITIAL_INUSE_CONTAINER_SIZE),            readerInUse_(new InUseContainer*[numberOfReaders_])#endif        // {{{        {#ifdef INUSE_TRACKING            if (readerInUse_)            {                for (unsigned i=0; i < numberOfReaders_; i++)                {                    readerInUse_[i] = 0;                }            }#endif            if (buffer_.get() &&                buffer_->GetIsComplete() &&                !(buffer_->GetRequestedSize() % sizeof(unsigned long)) && // Buffer should be a multiple of quad size                readPositions_ &&                mpReadThresholds_ &&                readLimitInfo_ &&                numberOfObjects_ &&                limitCount_ &&#ifdef INUSE_TRACKING                readerInUse_ &&#endif                outstandingReadAcquisitions_)            {                SetIsComplete(true);                writeThreshold_.SetBufferDetails( reinterpret_cast<unsigned long>(buffer_->GetBuffer()), buffer_->GetSize() );                #ifdef INUSE_TRACKING                bool inUseAllocStatus = true;#endif                for (unsigned i=0; i < numberOfReaders_; i++)                {                    readPositions_[i].position_.Zeroise(size-1);                    readPositions_[i].empty_ = true;                    mpReadThresholds_[i].SetThreshold(0);                    mpReadThresholds_[i].SetBufferDetails( reinterpret_cast<unsigned long>(buffer_->GetBuffer()), buffer_->GetSize() );                    readLimitInfo_[i].first = false;                    readLimitInfo_[i].second = 0;                    numberOfObjects_[i] = 0;                    limitCount_[i] = 0;                    outstandingReadAcquisitions_[i] = 0;#ifdef INUSE_TRACKING                    readerInUse_[i] = new InUseContainer(INITIAL_INUSE_CONTAINER_SIZE);                    if (!readerInUse_[i])                    {                        inUseAllocStatus = false;                    }#endif                }#ifdef INUSE_TRACKING                if (!inUseAllocStatus)                {                    SetIsComplete(false);                }#endif            }        }        // }}}        DefaultCircularBuffer::~DefaultCircularBuffer()        // {{{        {            delete[] readPositions_;            delete[] readLimitInfo_;            delete[] numberOfObjects_;            delete[] limitCount_;            delete[] mpReadThresholds_;            delete[] outstandingReadAcquisitions_;#ifdef INUSE_TRACKING            if (readerInUse_)            {                for (unsigned i=0; i < numberOfReaders_; i++)                {                    delete readerInUse_[i];                }            }            delete[] readerInUse_;#endif        }        // }}}        std::pair<bool, CircularBufferResidentObject*> DefaultCircularBuffer::AcquireForRead(            unsigned readerNumber,            CircularBufferResidentObject* previousObject,            bool blockIfNoObjectsAvailable,            unsigned long sizeHint)        // {{{        {            using namespace std::rel_ops;            syslib::Lock lock(mutex_);//TRACE("$GAR() entered: previousObject = 0x%lx, readerNumber = %u$n\n", reinterpret_cast<unsigned long>(previousObject), readerNumber);#ifdef INUSE_TRACKING            // Ensure the in-use container is large enough            if (previousObject && readerInUse_[readerNumber]->empty())            {                TRACE("$RDefaultCircularBuffer::AcquireForRead() previousObject != 0 when no objects in-use for reader %u\n", readerNumber);                previousObject = 0;            }#endif            // Is there data available for read by the specified reader?            while (readPositions_[readerNumber].empty_ ||                   (readLimitInfo_[readerNumber].first && (limitCount_[readerNumber] >= readLimitInfo_[readerNumber].second)))            {//TRACE("$GNo objects available for reader %u [%u:(%lu:%lu):%lu]$n\n",//    readerNumber,//    readPositions_[readerNumber].empty_,//    readLimitInfo_[readerNumber].first,//    readLimitInfo_[readerNumber].second,//    limitCount_[readerNumber]);                // No, so is blocking to await an object allowed?                if (!blockIfNoObjectsAvailable || doNotBlockReaders_ || doNotBlock_)                {                    // No//TRACE("$GAR() leaving: nil$n\n");                    // Will not be returning a buffer, but there is no error, so                    // try to avoid busy loops of constantly trying to acquire                    // a buffer when blocking is not allowed, by waiting for upto                    // 1 tick period                    taskDelay(1);                    return std::pair<bool, CircularBufferResidentObject*>(false, 0);                }                // Should block, so add this thread to the list of waiters for                // the buffer to become available for reading//TRACE("$GBlocking...$n\n");                ++readWaitCount_;                lock.Give();                readSemaphore_.take();                lock.Take();//TRACE("$GAwoken$n\n");            }            // If a previous object is provided, use the position after its end            // within the buffer as the position from which to attempt to decode            // the object            CircularBufferPosition readPosition =                previousObject ? GetAfterPosition(*previousObject) : readPositions_[readerNumber].position_;//TRACE("$GreadPosition = [%lu,%lu,%lu]$n\n", readPosition.GetLoopCount(), readPosition.GetOffset(), readPosition.GetMargin());            // Determine the type of object that is the next available for read            // for the specified reader            CircularBufferResidentObject::Id id;            // Determine the position within the circular buffer of the start of            // the in-buffer representation of the next object available for            // read for the specified reader            while (true)            {//TRACE("$GObjects available for reader %u$n\n", readerNumber);                // Does the object ID at the read position indicate that there                // are changes to be made to the read position to reach the next                // object//TRACE("$Gbuffer_ = 0x%lx$n\n", buffer_->GetBuffer());//TRACE("$greadPosition = [%lu,%lu,%lu]$n\n", readPosition.GetLoopCount(), readPosition.GetOffset(), readPosition.GetMargin());//TRACE("$GRead id from = 0x%lx$n\n", reinterpret_cast<unsigned long>(buffer_->GetBuffer() + readPosition.GetOffset()));                id = *reinterpret_cast<CircularBufferResidentObject::Id*>(buffer_->GetBuffer() + readPosition.GetOffset());//TRACE("$GId = %lu\n", id);                // Advance the read position beyond the ID bytes                readPosition += sizeof(CircularBufferResidentObject::Id);                if (id >= MAX_SPECIAL_IDS)                {//TRACE("$GNo special IDs$n\n");                    // No, so try to decode the object                    break;                }                else                {//TRACE("$GFound special ID$n\n");                    switch(id)                    {                        case MOVE_TO_START_ID://TRACE("$GFound MOVE_TO_START_ID$n\n");                            // Move read position to the start of the buffer.                            // Bear in mind that we may have wrapped if the                            // MOVE_TO_START_ID was the last thing in the                            // buffer.                            if (readPosition.GetOffset() != 0)                            {                                readPosition += readPosition.GetMargin();                            }                            break;                        case JUMP_ZERO_ID:                            // Already positioned after JUMP_ID symbol, so no-                            // thing more to do                            break;                        case JUMP_ID:                            {//TRACE("$GFound JUMP_ID$n\n");                                // Read the length of the unused section of buffer                                unsigned long jumpLength =                                    *reinterpret_cast<CircularBufferResidentObject::Id*>(                                        buffer_->GetBuffer() +                                        readPosition.GetOffset());                                // Jump over the unused section of the buffer                                readPosition += sizeof(unsigned long);                                readPosition += jumpLength;                            }                            break;                        default:                            TRACE("$RUnknown special ID found\n");//TRACE("$GAR() leaving: nil$n\n");                            return std::pair<bool, CircularBufferResidentObject*>(true, 0);                    }                }//TRACE("$GFinished processing special IDs$n\n");            }            // Get the object pool associated with the object ID            ObjectPool* readPool = FindPoolForType(id);            if (!readPool)            {                TRACE("$RDefaultCircularBuffer::AcquireForRead() Unknown object ID %lu found\n", id);//TRACE("$GAR() leaving: nil$n\n");                return std::pair<bool, CircularBufferResidentObject*>(true, 0);            }            // Obtain a prototype object from the pool//TRACE("$GAllocating prototype object$n\n");            CircularBufferResidentObject* object = static_cast<CircularBufferResidentObject*>(readPool->Allocate());            if (!object)            {                TRACE("$RDefaultCircularBuffer::AcquireForRead() Failed to allocate prototype object\n");                return std::pair<bool, CircularBufferResidentObject*>(true, 0);            }            // Attempt to decode an object of the type associated with the ID            // from the buffer//TRACE("$GDecoding object from buffer$n\n");            CircularBufferResidentObject::DecodeResult decodeResult =                object->DecodeFromBuffer(GetReadSpaceInfo(readPosition));            // Was decode successful?            if (!decodeResult.GetSuccessful())            {                // No, so return the prototype object to its pool                readPool->Free(object);                TRACE("$RDefaultCircularBuffer::AcquireForRead() Failed to read object of ID %lu\n", id);                return std::pair<bool, CircularBufferResidentObject*>(true, 0);            }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?