📄 rvqueue.c
字号:
}
RvLockRelease(&queue->lock);
return RV_OK;
}
RVCOREAPI RvStatus RVCALLCONV RvQueueClear(RvQueue *queue)
{
#if defined(RV_NULLCHECK)
if(queue == NULL)
return RvQueueErrorCode(RV_ERROR_NULLPTR);
#endif
RvLockGet(&queue->lock);
queue->curitems = 0;
/* Notify sending tasks about the space now available. */
while((queue->notifyfull < queue->waitfull) && (queue->notifyfull < queue->size)) {
queue->notifyfull += 1;
RvSemaphorePost(&queue->fullsem);
}
RvLockRelease(&queue->lock);
return RV_OK;
}
RvBool RvQueueIsStopped(RvQueue *queue)
{
RvBool result;
#if defined(RV_NULLCHECK)
if(queue == NULL)
return RvQueueErrorCode(RV_TRUE);
#endif
RvLockGet(&queue->lock);
result = queue->stopped;
RvLockRelease(&queue->lock);
return result;
}
RvSize_t RvQueueSize(RvQueue *queue)
{
#if defined(RV_NULLCHECK)
if(queue == NULL)
return 0;
#endif
return queue->size; /* no locks since this never changes */
}
/* Current number of items in queue */
RvSize_t RvQueueItems(RvQueue *queue)
{
RvSize_t result;
#if defined(RV_NULLCHECK)
if(queue == NULL)
return RvQueueErrorCode(RV_TRUE);
#endif
RvLockGet(&queue->lock);
result = queue->curitems;
RvLockRelease(&queue->lock);
return result;
}
#if defined(RV_TEST_CODE)
#include "rvthread.h"
#include "rvstdio.h"
#define RvQueuePrintError(_r) RvPrintf("Error %d/%d/%d\n", RvErrorGetLib((_r)), RvErrorGetModule((_r)), RvErrorGetCode((_r)))
#define RV_QUEUE_TEST_QSIZE 20
void RvQueueTestRecvThread(RvThread *th, void *data)
{
RvQueue *queue;
RvStatus result;
RvSize_t dataitem;
queue = (RvQueue *)data;
RvPrintf("Thread %p started.\n", th);
for(;;) {
result = RvQueueReceive(queue, &dataitem, sizeof(dataitem), RV_QUEUE_WAIT);
if(result != RV_OK) break;
RvPrintf("Thread: %p, data = %u\n", th, dataitem);
}
RvPrintf("Thread %p exiting due to queue ", th);
RvQueuePrintError(result);
}
void RvQueueTest(void)
{
RvStatus result;
RvQueue queue;
RvThread th1, th2;
RvSize_t count;
RvCCoreInit();
RvPrintf("RvQueueInit: ");
result = RvQueueInit();
if(result != RV_OK) {
RvQueuePrintError(result);
} else RvPrintf("OK\n");
RvPrintf("RvQueueConstruct(%d): ", RV_QUEUE_TEST_QSIZE);
result = RvQueueConstruct(&queue, RV_QUEUE_TEST_QSIZE, sizeof(count), NULL);
if(result != RV_OK) {
RvQueuePrintError(result);
} else RvPrintf("OK\n");
RvPrintf("RvQueueReceive (on empty): ");
result = RvQueueReceive(&queue, &count, sizeof(count), RV_QUEUE_NOWAIT);
if(RvErrorGetCode(result) == RV_QUEUE_ERROR_EMPTY) {
RvPrintf("EMTPY -- OK\n");
} else RvPrintf("ERROR %d\n", result);
RvPrintf("Queue size: %d, Current Items: %d\n", RvQueueSize(&queue), RvQueueItems(&queue));
RvPrintf("Putting %d items on queue...\n", RV_QUEUE_TEST_QSIZE + 5);
for(count = 1; count <= (RV_QUEUE_TEST_QSIZE + 5); count++) {
result = RvQueueSend(&queue, &count, sizeof(count), RV_QUEUE_SEND_NORMAL, RV_QUEUE_NOWAIT);
if(result != RV_OK) {
RvPrintf("RvQueueSend error on item %d: ", count);
RvQueuePrintError(result);
}
}
RvPrintf("Queue size: %d, Current Items: %d\n", RvQueueSize(&queue), RvQueueItems(&queue));
RvPrintf("Getting items from queue: ");
for(;;) {
result = RvQueueReceive(&queue, &count, sizeof(count), RV_QUEUE_NOWAIT);
if(result != RV_OK)
break;
RvPrintf("%d ", count);
}
RvPrintf("\n");
RvPrintf("Queue size: %d, Current Items: %d\n", RvQueueSize(&queue), RvQueueItems(&queue));
RvPrintf("Putting %d items on queue (URGENT)...\n", RV_QUEUE_TEST_QSIZE + 5);
for(count = 1; count <= (RV_QUEUE_TEST_QSIZE + 5); count++) {
result = RvQueueSend(&queue, &count, sizeof(count), RV_QUEUE_SEND_URGENT, RV_QUEUE_NOWAIT);
if(result != RV_OK) {
RvPrintf("RvQueueSend error on item %d: ", count);
RvQueuePrintError(result);
}
}
RvPrintf("Queue size: %d, Current Items: %d\n", RvQueueSize(&queue), RvQueueItems(&queue));
RvPrintf("Getting items from queue: ");
for(;;) {
result = RvQueueReceive(&queue, &count, sizeof(count), RV_QUEUE_NOWAIT);
if(result != RV_OK)
break;
RvPrintf("%d ", count);
}
RvPrintf("\n");
RvPrintf("Queue size: %d, Current Items: %d\n", RvQueueSize(&queue), RvQueueItems(&queue));
RvPrintf("Putting %d items on queue...\n", RV_QUEUE_TEST_QSIZE / 2);
for(count = 1; count <= (RV_QUEUE_TEST_QSIZE / 2); count++) {
result = RvQueueSend(&queue, &count, sizeof(count), RV_QUEUE_SEND_NORMAL, RV_QUEUE_NOWAIT);
if(result != RV_OK) {
RvPrintf("RvQueueSend error on item %d: ", count);
RvQueuePrintError(result);
}
}
RvPrintf("Queue size: %d, Current Items: %d\n", RvQueueSize(&queue), RvQueueItems(&queue));
RvPrintf("RvQueueClear: ");
result = RvQueueClear(&queue);
if(result != RV_OK) {
RvQueuePrintError(result);
} else RvPrintf("OK\n");
RvPrintf("Queue size: %d, Current Items: %d\n", RvQueueSize(&queue), RvQueueItems(&queue));
RvPrintf("Putting %d items on queue...\n", RV_QUEUE_TEST_QSIZE / 2);
for(count = 1; count <= (RV_QUEUE_TEST_QSIZE / 2); count++) {
result = RvQueueSend(&queue, &count, sizeof(count), RV_QUEUE_SEND_NORMAL, RV_QUEUE_NOWAIT);
if(result != RV_OK) {
RvPrintf("RvQueueSend error on item %d: ", count);
RvQueuePrintError(result);
}
}
RvPrintf("Queue size: %d, Current Items: %d\n", RvQueueSize(&queue), RvQueueItems(&queue));
RvPrintf("RvQueueIsStopped: %d\n", RvQueueIsStopped(&queue));
RvPrintf("RvQueueStop: ");
result = RvQueueStop(&queue);
if(result != RV_OK) {
RvQueuePrintError(result);
} else RvPrintf("OK\n");
RvPrintf("Queue size: %d, Current Items: %d\n", RvQueueSize(&queue), RvQueueItems(&queue));
RvPrintf("RvQueueIsStopped: %d\n", RvQueueIsStopped(&queue));
RvPrintf("RvQueueSend: ");
result = RvQueueSend(&queue, &count, sizeof(count), RV_QUEUE_SEND_NORMAL, RV_QUEUE_NOWAIT);
if(RvErrorGetCode(result) == RV_QUEUE_ERROR_STOPPED) {
RvPrintf("STOPPED -- OK\n");
} else RvPrintf("ERROR %d\n", result);
RvPrintf("Queue size: %d, Current Items: %d\n", RvQueueSize(&queue), RvQueueItems(&queue));
RvPrintf("RvQueueIsStopped: %d\n", RvQueueIsStopped(&queue));
RvPrintf("Getting items from queue: ");
for(;;) {
result = RvQueueReceive(&queue, &count, sizeof(count), RV_QUEUE_NOWAIT);
if(result != RV_OK)
break;
RvPrintf("%d ", count);
}
RvPrintf("\n");
if(RvErrorGetCode(result) != RV_QUEUE_ERROR_STOPPED)
RvPrintf("Error: No queue stopped indication.\n");
RvPrintf("Queue size: %d, Current Items: %d\n", RvQueueSize(&queue), RvQueueItems(&queue));
RvPrintf("RvQueueIsStopped: %d\n", RvQueueIsStopped(&queue));
RvPrintf("RvQueueDestruct: ");
result = RvQueueDestruct(&queue);
if(result != RV_OK) {
RvQueuePrintError(result);
} else RvPrintf("OK\n");
RvPrintf("RvQueueConstruct(%d): ", RV_QUEUE_TEST_QSIZE);
result = RvQueueConstruct(&queue, RV_QUEUE_TEST_QSIZE, sizeof(count), NULL);
if(result != RV_OK) {
RvQueuePrintError(result);
} else RvPrintf("OK\n");
/* Now test multi-threaded */
RvThreadConstruct(&th1, RvQueueTestRecvThread, &queue);
RvThreadConstruct(&th2, RvQueueTestRecvThread, &queue);
RvThreadCreate(&th1);
RvThreadCreate(&th2);
RvThreadStart(&th1);
RvThreadStart(&th2);
RvThreadNanosleep(RvInt64Const(5000000000));
RvPrintf("Putting %d items on queue...\n", RV_QUEUE_TEST_QSIZE * 2);
for(count = 1; count <= (RV_QUEUE_TEST_QSIZE * 2); count++) {
result = RvQueueSend(&queue, &count, sizeof(count), RV_QUEUE_SEND_NORMAL, RV_QUEUE_WAIT);
if(result != RV_OK) {
RvPrintf("RvQueueSend error on item %d: ", count);
RvQueuePrintError(result);
} else RvPrintf("Sent %d\n", count);
}
RvPrintf("RvQueueStop: ");
result = RvQueueStop(&queue);
if(result != RV_OK) {
RvQueuePrintError(result);
} else RvPrintf("OK\n");
RvThreadDestruct(&th1);
RvThreadDestruct(&th2);
RvPrintf("RvQueueDestruct: ");
result = RvQueueDestruct(&queue);
if(result != RV_OK) {
RvQueuePrintError(result);
} else RvPrintf("OK\n");
RvPrintf("RvQueueEnd: ");
result = RvQueueEnd();
if(result != RV_OK) {
RvQueuePrintError(result);
} else RvPrintf("OK\n");
RvCCoreEnd();
}
#endif /* RV_TEST_CODE */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -