defaultcircularbuffer.cpp
来自「C++封装的视频采集代码」· C++ 代码 · 共 1,498 行 · 第 1/5 页
CPP
1,498 行
#include "DefaultCircularBuffer.h"#include "Misc/SequenceOfPairsCompare.h"#include "SysLib/Lock.h"#include "SysLib/taskLib.h"#include <utility>namespace oxsemi{ namespace circular_buffer { DefaultCircularBuffer::DefaultCircularBuffer( unsigned maxEmptyEventListeners, unsigned maxReadThresholdEventListeners, unsigned maxWriteThresholdEventListeners, unsigned long size, int numberOfReaders) : CircularBuffer( maxEmptyEventListeners, maxReadThresholdEventListeners, maxWriteThresholdEventListeners), mutex_(false), buffer_(new UnCachedBuffer(size, ALIGNMENT_POWER_OF_2)), writePosition_(size-1, 0, 0), writeEmpty_(false), mostRecentAcquireAfterPosition_(size-1, 0, 0), numberOfReaders_(numberOfReaders), readPositions_(new ReaderPositionInfo[numberOfReaders_]), knownObjectTypes_(INITIAL_KNOWN_OBJECT_TYPES_SIZE), nextFreeId_(MOVE_TO_START_ID + MAX_SPECIAL_IDS), readWaitCount_(0), writeWaitCount_(0), doNotBlock_(false), doNotBlockReaders_(false), mpReadThresholds_(new ReadThresholdPolicy[numberOfReaders]), writeThreshold_(0), readLimitInfo_(new std::pair<bool, unsigned long>[numberOfReaders_]), numberOfObjects_(new unsigned long[numberOfReaders_]), limitCount_(new unsigned long[numberOfReaders_]), lastAnyReadBuffersAvailable_(true), outstandingWriteAcquisitions_(0), outstandingReadAcquisitions_(new unsigned[numberOfReaders_])#ifdef INUSE_TRACKING ,writeInUse_(INITIAL_INUSE_CONTAINER_SIZE), readerInUse_(new InUseContainer*[numberOfReaders_])#endif // {{{ {#ifdef INUSE_TRACKING if (readerInUse_) { for (unsigned i=0; i < numberOfReaders_; i++) { readerInUse_[i] = 0; } }#endif if (buffer_.get() && buffer_->GetIsComplete() && !(buffer_->GetRequestedSize() % sizeof(unsigned long)) && // Buffer should be a multiple of quad size readPositions_ && mpReadThresholds_ && readLimitInfo_ && numberOfObjects_ && limitCount_ &&#ifdef INUSE_TRACKING readerInUse_ &&#endif outstandingReadAcquisitions_) { SetIsComplete(true); writeThreshold_.SetBufferDetails( reinterpret_cast<unsigned long>(buffer_->GetBuffer()), buffer_->GetSize() ); #ifdef INUSE_TRACKING bool inUseAllocStatus = true;#endif for (unsigned i=0; i < numberOfReaders_; i++) { readPositions_[i].position_.Zeroise(size-1); readPositions_[i].empty_ = true; mpReadThresholds_[i].SetThreshold(0); mpReadThresholds_[i].SetBufferDetails( reinterpret_cast<unsigned long>(buffer_->GetBuffer()), buffer_->GetSize() ); readLimitInfo_[i].first = false; readLimitInfo_[i].second = 0; numberOfObjects_[i] = 0; limitCount_[i] = 0; outstandingReadAcquisitions_[i] = 0;#ifdef INUSE_TRACKING readerInUse_[i] = new InUseContainer(INITIAL_INUSE_CONTAINER_SIZE); if (!readerInUse_[i]) { inUseAllocStatus = false; }#endif }#ifdef INUSE_TRACKING if (!inUseAllocStatus) { SetIsComplete(false); }#endif } } // }}} DefaultCircularBuffer::~DefaultCircularBuffer() // {{{ { delete[] readPositions_; delete[] readLimitInfo_; delete[] numberOfObjects_; delete[] limitCount_; delete[] mpReadThresholds_; delete[] outstandingReadAcquisitions_;#ifdef INUSE_TRACKING if (readerInUse_) { for (unsigned i=0; i < numberOfReaders_; i++) { delete readerInUse_[i]; } } delete[] readerInUse_;#endif } // }}} std::pair<bool, CircularBufferResidentObject*> DefaultCircularBuffer::AcquireForRead( unsigned readerNumber, CircularBufferResidentObject* previousObject, bool blockIfNoObjectsAvailable, unsigned long sizeHint) // {{{ { using namespace std::rel_ops; syslib::Lock lock(mutex_);//TRACE("$GAR() entered: previousObject = 0x%lx, readerNumber = %u$n\n", reinterpret_cast<unsigned long>(previousObject), readerNumber);#ifdef INUSE_TRACKING // Ensure the in-use container is large enough if (previousObject && readerInUse_[readerNumber]->empty()) { TRACE("$RDefaultCircularBuffer::AcquireForRead() previousObject != 0 when no objects in-use for reader %u\n", readerNumber); previousObject = 0; }#endif // Is there data available for read by the specified reader? while (readPositions_[readerNumber].empty_ || (readLimitInfo_[readerNumber].first && (limitCount_[readerNumber] >= readLimitInfo_[readerNumber].second))) {//TRACE("$GNo objects available for reader %u [%u:(%lu:%lu):%lu]$n\n",// readerNumber,// readPositions_[readerNumber].empty_,// readLimitInfo_[readerNumber].first,// readLimitInfo_[readerNumber].second,// limitCount_[readerNumber]); // No, so is blocking to await an object allowed? if (!blockIfNoObjectsAvailable || doNotBlockReaders_ || doNotBlock_) { // No//TRACE("$GAR() leaving: nil$n\n"); // Will not be returning a buffer, but there is no error, so // try to avoid busy loops of constantly trying to acquire // a buffer when blocking is not allowed, by waiting for upto // 1 tick period taskDelay(1); return std::pair<bool, CircularBufferResidentObject*>(false, 0); } // Should block, so add this thread to the list of waiters for // the buffer to become available for reading//TRACE("$GBlocking...$n\n"); ++readWaitCount_; lock.Give(); readSemaphore_.take(); lock.Take();//TRACE("$GAwoken$n\n"); } // If a previous object is provided, use the position after its end // within the buffer as the position from which to attempt to decode // the object CircularBufferPosition readPosition = previousObject ? GetAfterPosition(*previousObject) : readPositions_[readerNumber].position_;//TRACE("$GreadPosition = [%lu,%lu,%lu]$n\n", readPosition.GetLoopCount(), readPosition.GetOffset(), readPosition.GetMargin()); // Determine the type of object that is the next available for read // for the specified reader CircularBufferResidentObject::Id id; // Determine the position within the circular buffer of the start of // the in-buffer representation of the next object available for // read for the specified reader while (true) {//TRACE("$GObjects available for reader %u$n\n", readerNumber); // Does the object ID at the read position indicate that there // are changes to be made to the read position to reach the next // object//TRACE("$Gbuffer_ = 0x%lx$n\n", buffer_->GetBuffer());//TRACE("$greadPosition = [%lu,%lu,%lu]$n\n", readPosition.GetLoopCount(), readPosition.GetOffset(), readPosition.GetMargin());//TRACE("$GRead id from = 0x%lx$n\n", reinterpret_cast<unsigned long>(buffer_->GetBuffer() + readPosition.GetOffset())); id = *reinterpret_cast<CircularBufferResidentObject::Id*>(buffer_->GetBuffer() + readPosition.GetOffset());//TRACE("$GId = %lu\n", id); // Advance the read position beyond the ID bytes readPosition += sizeof(CircularBufferResidentObject::Id); if (id >= MAX_SPECIAL_IDS) {//TRACE("$GNo special IDs$n\n"); // No, so try to decode the object break; } else {//TRACE("$GFound special ID$n\n"); switch(id) { case MOVE_TO_START_ID://TRACE("$GFound MOVE_TO_START_ID$n\n"); // Move read position to the start of the buffer. // Bear in mind that we may have wrapped if the // MOVE_TO_START_ID was the last thing in the // buffer. if (readPosition.GetOffset() != 0) { readPosition += readPosition.GetMargin(); } break; case JUMP_ZERO_ID: // Already positioned after JUMP_ID symbol, so no- // thing more to do break; case JUMP_ID: {//TRACE("$GFound JUMP_ID$n\n"); // Read the length of the unused section of buffer unsigned long jumpLength = *reinterpret_cast<CircularBufferResidentObject::Id*>( buffer_->GetBuffer() + readPosition.GetOffset()); // Jump over the unused section of the buffer readPosition += sizeof(unsigned long); readPosition += jumpLength; } break; default: TRACE("$RUnknown special ID found\n");//TRACE("$GAR() leaving: nil$n\n"); return std::pair<bool, CircularBufferResidentObject*>(true, 0); } }//TRACE("$GFinished processing special IDs$n\n"); } // Get the object pool associated with the object ID ObjectPool* readPool = FindPoolForType(id); if (!readPool) { TRACE("$RDefaultCircularBuffer::AcquireForRead() Unknown object ID %lu found\n", id);//TRACE("$GAR() leaving: nil$n\n"); return std::pair<bool, CircularBufferResidentObject*>(true, 0); } // Obtain a prototype object from the pool//TRACE("$GAllocating prototype object$n\n"); CircularBufferResidentObject* object = static_cast<CircularBufferResidentObject*>(readPool->Allocate()); if (!object) { TRACE("$RDefaultCircularBuffer::AcquireForRead() Failed to allocate prototype object\n"); return std::pair<bool, CircularBufferResidentObject*>(true, 0); } // Attempt to decode an object of the type associated with the ID // from the buffer//TRACE("$GDecoding object from buffer$n\n"); CircularBufferResidentObject::DecodeResult decodeResult = object->DecodeFromBuffer(GetReadSpaceInfo(readPosition)); // Was decode successful? if (!decodeResult.GetSuccessful()) { // No, so return the prototype object to its pool readPool->Free(object); TRACE("$RDefaultCircularBuffer::AcquireForRead() Failed to read object of ID %lu\n", id); return std::pair<bool, CircularBufferResidentObject*>(true, 0); }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?