📄 rbufflib.c
字号:
return(NULL); } /* * Set source partition for object class memory allocation * - note this only works overall if all rBuffs share a source * partition. */ classMemPartIdSet(rBuffClassId, rBuffCreateParams->sourcePartition); /* allocate control structure */ rBuff = (RBUFF_ID) objAlloc (rBuffClassId); if(rBuff == NULL) {#ifdef RBUFF_DEBUG logMsg ("rBuff: objAlloc failed\n",0,0,0,0,0,0);#endif return(NULL); } objCoreInit (&rBuff->buffDesc.objCore, rBuffClassId); /* record parameters */ rBuff->info.srcPart = rBuffCreateParams->sourcePartition; rBuff->info.options = rBuffCreateParams->options; rBuff->info.buffSize = rBuffCreateParams->buffSize; rBuff->info.threshold = rBuffCreateParams->threshold; rBuff->info.minBuffs = rBuffCreateParams->minimum; rBuff->info.maxBuffs = (unsigned int) rBuffCreateParams->maximum; rBuff->errorHandler = rBuffCreateParams->errorHandler; if (semBInit (&rBuff->buffDesc.threshXSem, SEM_Q_PRIORITY, SEM_EMPTY) == ERROR) { objFree (rBuffClassId, (UINT8 *) rBuff); return (NULL); } if (semBInit (&rBuff->readBlk, SEM_Q_PRIORITY, SEM_FULL) == ERROR) { semTerminate (&rBuff->buffDesc.threshXSem); objFree (rBuffClassId, (UINT8 *) rBuff); return (NULL); } if (semBInit (&rBuff->bufferFull, SEM_Q_PRIORITY, SEM_EMPTY) == ERROR) { semTerminate (&rBuff->buffDesc.threshXSem); semTerminate (&rBuff->readBlk); objFree (rBuffClassId, (UINT8 *)rBuff); return (NULL); } if (semCInit (&rBuff->msgSem, SEM_Q_PRIORITY, 0) == ERROR) { semTerminate (&rBuff->bufferFull); semTerminate (&rBuff->buffDesc.threshXSem); semTerminate (&rBuff->readBlk); objFree (rBuffClassId, (UINT8 *)rBuff); return (NULL); } rBuff->rBuffMgrId = 0; /* ...so we can use rBuffDestroy safely */ /* * If things go wrong from here, use rBuffDestroy to throw back what * we've caught. */ /* allocate 'rBuffCreateParams->minimum' buffers */ rBuff->info.currBuffs = rBuff->info.emptyBuffs = 0; rBuff->buffWrite = NULL; for (count=0;count < rBuffCreateParams->minimum;count++) { /* First we need a buffer */ newBuff = (RBUFF_PTR) memPartAlloc (rBuffCreateParams->sourcePartition, sizeof(RBUFF_BUFF_TYPE) + rBuffCreateParams->buffSize);#ifdef RBUFF_DEBUG logMsg ("rBuff: adding buffer %p to ring\n",newBuff,0,0,0,0,0);#endif /* newBuff will be returned as NULL if source partition is exhausted */ /* Don't need to lock ints around rBuffAdd as no-one knows about * this rBuff. */ if (newBuff == NULL || rBuffAdd (rBuff, newBuff) == ERROR) {#ifdef RBUFF_DEBUG logMsg ("rBuff: abandoned creation\n",0,0,0,0,0,0);#endif rBuffDestroy ((BUFFER_ID) rBuff); return (NULL); } } rBuff->rBuffMgrId = taskSpawn ("tRBuffMgr", rBuffMgrPriorityDefault, RBUFF_MGR_OPTIONS, 2048, rBuffMgr, rBuff, 0,0,0,0,0,0,0,0,0); if(rBuff->rBuffMgrId == NULL) {#ifdef RBUFF_DEBUG logMsg ("rBuff: error creating rBuffMgr\n",0,0,0,0,0,0);#endif rBuffDestroy ((BUFFER_ID) rBuff); return (NULL); } /* set control pointers */ rBuff->nestLevel = 0; /* rBuff->buffWrite initialised by rBuffAdd */ rBuff->buffRead = rBuff->buffWrite; rBuff->dataWrite = rBuff->dataRead = rBuff->buffWrite->dataStart; /* reset info */ rBuff->info.maxBuffsActual = rBuffCreateParams->minimum; rBuff->info.dataContent = rBuff->info.writesSinceReset = rBuff->info.readsSinceReset = rBuff->info.timesExtended = rBuff->info.timesXThreshold = rBuff->info.bytesWritten = rBuff->info.bytesRead = rBuff->info.bytesPeak = 0; /* Reset msg passing mechanism */ rBuff->msgOutstanding = FALSE; rBuff->msgWriteIndex = rBuff->msgReadIndex = 0; /* now set up func ptrs allowing it to be called */ rBuff->buffDesc.readReserveRtn = (FUNCPTR) rBuffReadReserve; rBuff->buffDesc.readCommitRtn = (FUNCPTR) rBuffReadCommit; rBuff->buffDesc.writeRtn = rBuffWrite; rBuff->buffDesc.flushRtn = rBuffFlush; rBuff->buffDesc.threshold = rBuffCreateParams->threshold; rBuff->buffDesc.nBytesRtn = rBuffNBytes; /* made it! */#ifdef RBUFF_DEBUG logMsg("Created rBuff with ID %p\n",rBuff,0,0,0,0,0);#endif return ((BUFFER_ID) rBuff); }int rBuffVerify (BUFFER_ID rBuff){ if (OBJ_VERIFY (rBuff, rBuffClassId) == OK) /* validate rBuff ID */ { return (OK); } else { return (ERROR); }}/********************************************************************************* rBuffWrite - Write data to a ring of buffers** This routine writes data to an extendable ring of buffers. If the existing* structure is full, and the existing number of buffers is less than the* specified minimum, then this function will attempt to add another buffer to* the ring.** This function may be called from interrupt level.** Mutually exclusive access must be guaranteed by the caller.** NOMANUAL*/UINT8 *rBuffWrite ( BUFFER_ID buffId, UINT8 *dataSrc, UINT32 numOfBytes ){ BOOL startUploading; RBUFF_ID rBuff; /* access this particular rBuff */ UINT8 *returnPtr = (UINT8 *) ~NULL; /* Return a non-zero value if OK */ /* Get access to the private members of this buffer's descriptor. */ rBuff = (RBUFF_ID) buffId; /* Get out early if this request makes no sense. */ if (numOfBytes > rBuff->info.buffSize) { return (NULL); } /* Critical Region Start */ if (rBuff->info.dataContent >= rBuff->info.threshold) { startUploading = TRUE; } else { startUploading = FALSE; } if (rBuff->buffWrite->spaceAvail >= numOfBytes) { if (dataSrc) { /* We are writing the data, not just reserving the space */ memcpy (rBuff->dataWrite, dataSrc, numOfBytes); } else { /* Record the start of the reserved area */ returnPtr = rBuff->dataWrite; } /* * If we are consuming one of our precious empty buffs, * check we have enough in reserve. */ if (rBuff->buffWrite->dataLen == 0) { --rBuff->info.emptyBuffs; } if (rBuff->info.emptyBuffs < RBUFF_EMPTY_KEEP && rBuff->info.currBuffs != rBuff->info.maxBuffs) { /* Schedule the rBuff Mgr to extend the buffer */ RBUFF_SEND_MSG (RBUFF_MSG_ADD, MSG_PRI_URGENT, rBuff, 0); } rBuff->buffWrite->dataLen += numOfBytes; rBuff->dataWrite += numOfBytes; rBuff->buffWrite->spaceAvail -= numOfBytes; /* * It may be that the current buffer is full at this point, * but don't do anything about it yet as the situation will * be handled next time through. Also, it's possible that * the current buffer may even have been emptied by then. */ } else { UINT32 dataWritten1stBuff; UINT32 dataWritten2ndBuff; /* * Data won't fit in this buffer (at least not entirely), * is the next buffer available? */ if (rBuff->buffWrite->next == rBuff->buffRead) { /* We've caught the reader. This will only happen when * we have actually filled the maximum allocation of * buffers *or* the reserved buffer has been filled * before the buffer has had the opportunity to * be extended. We cannot extend the buffer at this point * as we may be in a critical region, the design is such * that if the buffer could be extended it should have * been done by now (for that matter, the wrap-around * should have also occurred). * * If we have filled the reserved buffer without having * the opportunity to extend the buffer then the buffer * is configured badly and must be retuned. * * Meanwhile, at this moment, options are: * * 1) If we have filled the entire buffer and cannot wrap- * around then return ERROR. * 2) If we have filled the entire buffer but can wrap- * around then do that. */ if (rBuff->info.options & RBUFF_WRAPAROUND) { /* * OK, perform an inline ring extension supplying NULL * as the new buffer forcing the ring to wrap-around. * This is a deterministic action. */ /* Interrupts already locked out */ rBuffAdd(rBuff, NULL); } else { /* Oh dear, can't wrap-round */ return (NULL); } } /* * OK. * In the case that this function is writing the data, it * can be shared between this and next buffer. * In the case that we are only reserving the space, we must * return a pointer to contiguous space and therefore skip the * remainder of the previous buffer. */ if (dataSrc == NULL) { /* skip the remainder of the current buffer */ rBuff->buffWrite->spaceAvail = 0; /* Move on to the next buffer */ rBuff->buffWrite = rBuff->buffWrite->next; /* Record the start of the reserved area */ returnPtr = rBuff->buffWrite->dataStart; /* Point dataWrite past the reserved area */ rBuff->dataWrite = rBuff->buffWrite->dataStart; dataWritten1stBuff = 0; dataWritten2ndBuff = numOfBytes; } else { /* first make use of the space in this buffer */ memcpy (rBuff->dataWrite, dataSrc, (dataWritten1stBuff = rBuff->buffWrite->spaceAvail)); rBuff->buffWrite->dataLen += dataWritten1stBuff; rBuff->buffWrite->spaceAvail = 0; /* Move on to the next buffer */ dataWritten2ndBuff = numOfBytes - dataWritten1stBuff; rBuff->buffWrite = rBuff->buffWrite->next; rBuff->dataWrite = rBuff->buffWrite->dataStart; memcpy (rBuff->dataWrite, dataSrc + dataWritten1stBuff, dataWritten2ndBuff); } rBuff->buffWrite->dataLen = dataWritten2ndBuff; rBuff->dataWrite += dataWritten2ndBuff; rBuff->buffWrite->spaceAvail = rBuff->info.buffSize - dataWritten2ndBuff; if (--rBuff->info.emptyBuffs < RBUFF_EMPTY_KEEP && rBuff->info.currBuffs != rBuff->info.maxBuffs) { /* Schedule the rBuff Mgr to extend the buffer */ RBUFF_SEND_MSG (RBUFF_MSG_ADD, MSG_PRI_URGENT, rBuff, 0); } } rBuff->info.dataContent += numOfBytes; /* update info */ if(rBuff->info.dataContent > rBuff->info.bytesPeak) { rBuff->info.bytesPeak = rBuff->info.dataContent; } rBuff->info.bytesWritten += numOfBytes; rBuff->info.writesSinceReset++; if (!startUploading && rBuff->info.dataContent >= rBuff->info.threshold) { rBuff->info.timesXThreshold++; if (!(rBuff->info.options & RBUFF_UP_DEFERRED)) { /* * Signal for uploading to begin. Note that if we have just * reserved space it is imperative that uploading does not * actually begin until the data is in the buffer. */#if (CPU_FAMILY == I80X86) portWorkQAdd1 ((FUNCPTR)semBGiveDefer, (int) &rBuff->buffDesc.threshXSem);#else workQAdd1 ((FUNCPTR)semBGiveDefer, (int) &rBuff->buffDesc.threshXSem);#endif } } /* Critical Region End */ return(returnPtr);}/********************************************************************************* rBuffRead - Read data from a ring of buffers** This routine reads data from an extendable ring of buffers.** This function assumes mutually exclusive access is guaranteed by the caller.** NOMANUAL*/INT32 rBuffRead ( BUFFER_ID buffId, /* generic buffer descriptor */ UINT8 *dataDest, UINT32 numOfBytes ) { UINT32 bytesToCopy, remaining = numOfBytes; RBUFF_ID rBuff; /* access private members of this rBuff desc */ /* Get access to the private members of this particular rBuff. */ rBuff = (RBUFF_ID) buffId; /* Critical Region Start */ while(remaining) { bytesToCopy = (rBuff->buffRead->dataLen < remaining ? rBuff->buff
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -