📄 sqlitequeue.c
字号:
LOG __FILE__, "fillCache() numOfEntries=%d numOfBytes=%s", dbInfo->numOfEntries, int64ToStr(int64Str, dbInfo->numOfBytes)); return stateOk;}static bool persistentQueueEmpty(I_Queue *queueP){ return getNumOfEntries(queueP) <= 0;}static int32_t getNumOfEntries(I_Queue *queueP){ DbInfo *dbInfo; bool stateOk = true; ExceptionStruct exception; if (checkArgs(queueP, "getNumOfEntries", false, &exception) == false ) return -1; dbInfo = getDbInfo(queueP); if (dbInfo->numOfEntries == -1) { stateOk = fillCache(queueP, &exception); } return (stateOk) ? (int32_t)dbInfo->numOfEntries : -1;}static int32_t getMaxNumOfEntries(I_Queue *queueP){ DbInfo *dbInfo; ExceptionStruct exception; if (checkArgs(queueP, "getMaxNumOfEntries", false, &exception) == false ) return -1; dbInfo = getDbInfo(queueP); return dbInfo->prop.maxNumOfEntries;}static int64_t getNumOfBytes(I_Queue *queueP){ DbInfo *dbInfo; ExceptionStruct exception; bool stateOk = true; if (checkArgs(queueP, "getNumOfBytes", false, &exception) == false ) return -1; dbInfo = getDbInfo(queueP); if (dbInfo->numOfBytes == -1) { stateOk = fillCache(queueP, &exception); } return (stateOk) ? dbInfo->numOfBytes : -1;}static int64_t getMaxNumOfBytes(I_Queue *queueP){ DbInfo *dbInfo; ExceptionStruct exception; if (checkArgs(queueP, "getMaxNumOfBytes", false, &exception) == false ) return -1; dbInfo = getDbInfo(queueP); return dbInfo->prop.maxNumOfBytes;}/** * Shutdown without destroying any entry. * Clears all open DB resources. */static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception){ I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP; if (checkArgs(queueP, "shutdown", false, exception) == false ) return; shutdownInternal(queuePP, exception); freeQueue(queuePP);}/** * Shutdown used internally without calling freeQueue(). */static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception){ I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP; if (checkArgs(queueP, "shutdown", false, exception) == false ) return; { DbInfo *dbInfo = getDbInfo(queueP); queueP->isInitialized = false; if(dbInfo) { if (dbInfo->pVm_put) { char *errMsg = 0; /*int rc =*/ sqlite_finalize(dbInfo->pVm_put, &errMsg); if (errMsg != 0) sqlite_freemem(errMsg); dbInfo->pVm_put = 0; } if (dbInfo->pVm_peekWithSamePriority) { char *errMsg = 0; sqlite_finalize(dbInfo->pVm_peekWithSamePriority, &errMsg); if (errMsg != 0) sqlite_freemem(errMsg); dbInfo->pVm_peekWithSamePriority = 0; } if (dbInfo->pVm_fillCache) { char *errMsg = 0; sqlite_finalize(dbInfo->pVm_fillCache, &errMsg); if (errMsg != 0) sqlite_freemem(errMsg); dbInfo->pVm_fillCache = 0; } if (dbInfo->db) { sqlite_close(dbInfo->db); dbInfo->db = 0; } LOG __FILE__, "shutdown() done"); } }}/** * Frees everything inside QueueEntryArr and the struct QueueEntryArr itself * @param queueEntryArr The struct to free, passing NULL is OK */Dll_Export void freeQueueEntryArr(QueueEntryArr *queueEntryArr){ if (queueEntryArr == (QueueEntryArr *)0) return; freeQueueEntryArrInternal(queueEntryArr); free(queueEntryArr);}/** * Frees everything inside QueueEntryArr but NOT the struct QueueEntryArr itself * @param queueEntryArr The struct internals to free, passing NULL is OK */Dll_Export void freeQueueEntryArrInternal(QueueEntryArr *queueEntryArr){ size_t i; if (queueEntryArr == (QueueEntryArr *)0) return; for (i=0; i<queueEntryArr->len; i++) { freeQueueEntryData(&queueEntryArr->queueEntryArr[i]); } free(queueEntryArr->queueEntryArr); queueEntryArr->len = 0;}/** * Does not free the queueEntry itself */static void freeQueueEntryData(QueueEntry *queueEntry){ if (queueEntry == (QueueEntry *)0) return; if (queueEntry->embeddedBlob.data != 0) { free((char *)queueEntry->embeddedBlob.data); queueEntry->embeddedBlob.data = 0; } queueEntry->embeddedBlob.dataLen = 0;}/** * Frees the internal blob and the queueEntry itself. * @param queueEntry Its memory is freed, it is not usable anymore after this call */Dll_Export void freeQueueEntry(QueueEntry *queueEntry){ if (queueEntry == (QueueEntry *)0) return; freeQueueEntryData(queueEntry); free(queueEntry);}/** * NOTE: You need to free the returned pointer with xmlBlasterFree() (which calls free())! * * @param queueEntry The data to put to the queue * @param maxContentDumpLen for -1 get the complete content, else limit the * content to the given number of bytes * @return A ASCII XML formatted entry or NULL if out of memory */Dll_Export char *queueEntryToXml(QueueEntry *queueEntry, int maxContentDumpLen){ if (queueEntry == (QueueEntry *)0) return 0; { char *contentStr = strFromBlobAlloc(queueEntry->embeddedBlob.data, queueEntry->embeddedBlob.dataLen); const size_t blobLen = (maxContentDumpLen >= 0) ? maxContentDumpLen : queueEntry->embeddedBlob.dataLen; const size_t len = 200 + QUEUE_ENTRY_EMBEDDEDTYPE_LEN + blobLen; char *xml = (char *)calloc(len, sizeof(char)); if (xml == 0) { free(contentStr); return 0; } if (maxContentDumpLen == 0) *contentStr = 0; else if (maxContentDumpLen > 0 && queueEntry->embeddedBlob.dataLen > 5 && (size_t)maxContentDumpLen < (queueEntry->embeddedBlob.dataLen-5)) strcpy(contentStr+maxContentDumpLen, " ..."); SNPRINTF(xml, len, "\n <QueueEntry id='%s' priority='%hd' persistent='%s' type='%s'>" "\n <content size='%lu'><![CDATA[%s]]></content>" "\n <QueueEntry>", int64ToStr(int64Str, queueEntry->uniqueId), queueEntry->priority, queueEntry->isPersistent?"true":"false", queueEntry->embeddedType, (unsigned long)queueEntry->embeddedBlob.dataLen, contentStr); free(contentStr); return xml; }}Dll_Export void freeEntryDump(char *entryDump){ if (entryDump) free(entryDump);}/** * Checks the given arguments to be valid. * @param queueP The queue instance * @param methodName For logging * @param checkIsConnected If true does check the connection state as well * @param exception Transporting errors * @return false if the parameters are not usable, * in this case 'exception' is filled with detail informations */static bool checkArgs(I_Queue *queueP, const char *methodName, bool checkIsConnected, ExceptionStruct *exception){ if (queueP == 0) { if (exception == 0) { printf("[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to %s()\n", __FILE__, __LINE__, methodName); } else { strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] Please provide a valid I_Queue pointer to %.16s()", __FILE__, __LINE__, methodName); LOG __FILE__, "%s: %s", exception->errorCode, exception->message); } return false; } if (exception == 0) { LOG __FILE__, "[%s:%d] Please provide valid exception pointer to %s()", __FILE__, __LINE__, methodName); return false; } if (checkIsConnected) { if (queueP->privateObject==0 || ((DbInfo *)(queueP->privateObject))->db==0 || !queueP->isInitialized) { strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] Not connected to database, %s() failed", __FILE__, __LINE__, methodName); LOG __FILE__, "%s: %s", exception->errorCode, exception->message); return false; } } initializeExceptionStruct(exception); LOG __FILE__, "%s() entering ...", methodName); return true;}/*=================== TESTCODE =======================*/# ifdef QUEUE_MAIN/* NOTE: This code may be totally outdated, for current examples please use: xmlBlaster/testsuite/src/c/TestQueue.c*/#include <stdio.h>static void testRun(int argc, char **argv) { ExceptionStruct exception; QueueEntryArr *entries = 0; QueueProperties queueProperties; I_Queue *queueP = 0; memset(&queueProperties, 0, sizeof(QueueProperties)); strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX); strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX); strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX); queueProperties.maxNumOfEntries = 10000000L; queueProperties.maxNumOfBytes = 1000000000LL; queueProperties.logFp = xmlBlasterDefaultLogging; queueProperties.logLevel = XMLBLASTER_LOG_TRACE; queueProperties.userObject = 0; queueP = createQueue(&queueProperties, &exception); /* DbInfo *dbInfo = (DbInfo *)queueP->privateObject; */ if (argc || argv) {} /* to avoid compiler warning */ printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false"); { int64_t idArr[] = { 1081492136826000000ll, 1081492136856000000ll, 1081492136876000000ll }; int16_t prioArr[] = { 5 , 1 , 5 }; char *data[] = { "Hello" , " World" , "!!!" }; size_t i; for (i=0; i<sizeof(idArr)/sizeof(int64_t); i++) { QueueEntry queueEntry; memset(&queueEntry, 0, sizeof(QueueEntry)); queueEntry.priority = prioArr[i]; queueEntry.isPersistent = true; queueEntry.uniqueId = idArr[i]; strncpy0(queueEntry.embeddedType, "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN); queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0; queueEntry.embeddedBlob.data = data[i]; queueEntry.embeddedBlob.dataLen = strlen(queueEntry.embeddedBlob.data); queueP->put(queueP, &queueEntry, &exception); if (*exception.errorCode != 0) { LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message); } } } entries = queueP->peekWithSamePriority(queueP, -1, 6, &exception); if (*exception.errorCode != 0) { LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message); } if (entries != 0) { size_t i; printf("testRun after peekWithSamePriority() dump %lu entries:\n", (unsigned long)entries->len); for (i=0; i<entries->len; i++) { QueueEntry *queueEntry = &entries->queueEntryArr[i]; char *dump = queueEntryToXml(queueEntry, 200); printf("%s\n", dump); free(dump); } } printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false"); queueP->randomRemove(queueP, entries, &exception); if (*exception.errorCode != 0) { LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message); } freeQueueEntryArr(entries); printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false"); queueP->clear(queueP, &exception); printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false"); queueP->shutdown(&queueP, &exception);}int main(int argc, char **argv) { int i; for (i=0; i<1; i++) { testRun(argc, argv); } return 0;}#endif /*QUEUE_MAIN*//*=================== TESTCODE =======================*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -