📄 sqlitequeue.c
字号:
/*----------------------------------------------------------------------------Name: SQLiteQueue.cProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment: A persistent queue implementation based on the SQLite relational database Depends only on I_Queue.h and ../helper.c and ../helper.h (which includes basicDefs.h) and can easily be used outside of xmlBlaster. Further you need sqlite.h and the sqlite library (dll,so,sl)Author: "Marcel Ruff" <xmlBlaster@marcelruff.info>Date: 04/2004Compile: Compiles at least on Windows, Linux, Solaris. Further porting should be simple. Needs pthread.h but not the pthread library (for exact times) export LD_LIBRARY_PATH=/opt/sqlite-bin/lib gcc -g -Wall -DQUEUE_MAIN=1 -I../../ -o SQLiteQueue SQLiteQueue.c ../helper.c -I/opt/sqlite-bin/include -L/opt/sqlite-bin/lib -lsqlite (use optionally -ansi -pedantic -Wno-long-long (Intel C: icc -wd981 ...) Compile inside xmlBlaster: build -DXMLBLASTER_PERSISTENT_QUEUE=true c-delete c expects xmlBlaster/src/c/util/queue/sqlite.h and xmlBlaster/lib/libsqlite.so Compile Test main() Replace /I\c\sqlite for your needs ( says where sqlite.h resides ): and \pialibs\sqlite.lib as well ( sqlite.lib is created from sqlite.def via lib /DEF:sqlite.def) cl /MD /DQUEUE_MAIN /DDLL_IGNORE /DXB_NO_PTHREADS /D_WINDOWS /I\c\sqlite /I..\.. SqliteQueue.c ..\helper.c /link \pialibs\sqlite.libTable layout XB_ENTRIES: dataId bigint queueName text prio integer flag text durable char(1) byteSize bigint blob bytea PRIMARY KEY (dataId, queueName)Todo: Tuning: - Add prio to PRIMARY KEY - In persistentQueuePeekWithSamePriority() add queueName to statement as it never changes@see: http://www.sqlite.org/@see: http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.c.queue.html@see: http://www.xmlblaster.org/xmlBlaster/doc/requirements/queue.htmlTestsuite: xmlBlaster/testsuite/src/c/TestQueue.c-----------------------------------------------------------------------------*/#include <stdio.h>#include <string.h>#include <malloc.h>#if !defined(_WINDOWS)# include <unistd.h> /* unlink() */# include <errno.h> /* unlink() */#endif#include "util/queue/QueueInterface.h"#include "sqlite.h"static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception);static const QueueProperties *getProperties(I_Queue *queueP);static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception);static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception);static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception);static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception);static int32_t getNumOfEntries(I_Queue *queueP);static int32_t getMaxNumOfEntries(I_Queue *queueP);static int64_t getNumOfBytes(I_Queue *queueP);static int64_t getMaxNumOfBytes(I_Queue *queueP);static bool persistentQueueEmpty(I_Queue *queueP);static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception);static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception);static bool checkArgs(I_Queue *queueP, const char *methodName, bool checkIsConnected, ExceptionStruct *exception);static bool createTables(I_Queue *queueP, ExceptionStruct *exception);static bool execSilent(I_Queue *queueP, const char *sqlStatement, const char *comment, ExceptionStruct *exception);static bool compilePreparedQuery(I_Queue *queueP, const char *methodName, sqlite_vm **ppVm, const char *queryString, ExceptionStruct *exception);static bool fillCache(I_Queue *queueP, ExceptionStruct *exception);static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception);static void freeQueueEntryData(QueueEntry *queueEntry);/** * Called for each SQL result row and does the specific result parsing depending on the query. * @param userP Pointer on a data struct which contains the parsed data * @return true->to continue, false->to break execution or on error exception->errorCode is not null */typedef bool ( * ParseDataFp)(I_Queue *queueP, size_t currIndex, void *userP, const char **pazValue, const char **pazColName, ExceptionStruct *exception);static int32_t getResultRows(I_Queue *queueP, const char *methodName, sqlite_vm *pVm, ParseDataFp parseDataFp, void *userP, bool finalize, ExceptionStruct *exception);/* Shortcut for: if (queueP->log) queueP->log(queueP, XMLBLASTER_LOG_TRACE, XMLBLASTER_LOG_TRACE, __FILE__, "Persistent queue is created"); is LOG __FILE__, "Persistent queue is created");*/#define LOG if (queueP && queueP->log) queueP->log(queueP, queueP->logLevel, XMLBLASTER_LOG_TRACE, #define LEN512 512 /* ISO C90 forbids variable-size array: const int LEN512=512; */#define LEN256 256 /* ISO C90 forbids variable-size array: const int LEN256=256; */#define DBNAME_MAX 128#define ID_MAX 256/** * Holds Prepared statements for better performance. * @see http://web.utk.edu/~jplyon/sqlite/SQLite_optimization_FAQ.html */typedef struct DbInfoStruct { QueueProperties prop; /** Meta information */ size_t numOfEntries; /** Cache for current number of entries */ int64_t numOfBytes; /** Cache for current number of bytes */ sqlite *db; /** Database handle for SQLite */ sqlite_vm *pVm_put; /** SQLite virtual machine to hold a prepared query */ sqlite_vm *pVm_peekWithSamePriority; sqlite_vm *pVm_fillCache;} DbInfo;/** * Used temporary for peekWithSamePriority(). */typedef struct { QueueEntryArr **queueEntryArrPP; int32_t currEntries; int64_t currBytes; int32_t maxNumOfEntries; /** The max wanted number of entries for this peek() */ int64_t maxNumOfBytes; /** The max wanted bytes during peek() */} TmpHelper;static char int64Str_[INT64_STRLEN_MAX];static char * const int64Str = int64Str_; /* to make the pointer address const *//** Column index into XB_ENTRIES table */enum { XB_ENTRIES_DATA_ID = 0, XB_ENTRIES_QUEUE_NAME, XB_ENTRIES_PRIO, XB_ENTRIES_TYPE_NAME, XB_ENTRIES_PERSISTENT, XB_ENTRIES_SIZE_IN_BYTES, XB_ENTRIES_BLOB};/** * Create a new persistent queue instance. * <br /> * @return NULL if bootstrapping failed. If not NULL you need to free() it when you are done * usually by calling shutdown(). * @throws exception */Dll_Export I_Queue *createQueue(const QueueProperties* queueProperties, ExceptionStruct *exception){ bool stateOk = true; I_Queue *queueP = (I_Queue *)calloc(1, sizeof(I_Queue)); if (queueP == 0) return queueP; queueP->isInitialized = false; queueP->initialize = persistentQueueInitialize; queueP->getProperties = getProperties; queueP->put = persistentQueuePut; queueP->peekWithSamePriority = persistentQueuePeekWithSamePriority; queueP->randomRemove = persistentQueueRandomRemove; queueP->clear = persistentQueueClear; queueP->getNumOfEntries = getNumOfEntries; queueP->getMaxNumOfEntries = getMaxNumOfEntries; queueP->getNumOfBytes = getNumOfBytes; queueP->getMaxNumOfBytes = getMaxNumOfBytes; queueP->empty = persistentQueueEmpty; queueP->shutdown = persistentQueueShutdown; queueP->destroy = persistentQueueDestroy; queueP->privateObject = calloc(1, sizeof(DbInfo)); { DbInfo *dbInfo = (DbInfo *)queueP->privateObject; dbInfo->numOfEntries = -1; dbInfo->numOfBytes = -1; } stateOk = queueP->initialize(queueP, queueProperties, exception); if (stateOk) { LOG __FILE__, "Persistent queue SQLite version " SQLITE_VERSION " is created"); } else { ExceptionStruct ex; queueP->shutdown(&queueP, &ex); if (*ex.errorCode != 0) { embedException(exception, ex.errorCode, ex.message, exception); } queueP = 0; } return queueP;}/** Access the DB handle, queueP pointer is not checked */static _INLINE_FUNC DbInfo *getDbInfo(I_Queue *queueP) { return (queueP==0) ? 0 : (DbInfo *)(queueP->privateObject);}/** * Access the queue configuration. * @param queueP The this pointer * @return Read only access, 0 on error */static const QueueProperties *getProperties(I_Queue *queueP){ ExceptionStruct exception; if (checkArgs(queueP, "getProperties", false, &exception) == false ) return 0; return &getDbInfo(queueP)->prop;}/** */static void freeQueue(I_Queue **queuePP){ I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP; if (queueP == 0) { fprintf(stderr, "[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to freeQueue()\n", __FILE__, __LINE__); return; } LOG __FILE__, "freeQueue() called"); if (queueP->privateObject) { free(queueP->privateObject); queueP->privateObject = 0; } free(queueP); *queuePP = 0;}/** * Called internally by createQueue(). * @param queueP The this pointer * @param queueProperties The configuration * @param exception Can contain error information (out parameter) * @return true on success */static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception){ char *errMsg = 0; bool retOk; const int OPEN_RW = 0; sqlite *db; DbInfo *dbInfo; if (checkArgs(queueP, "initialize", false, exception) == false ) return false; if (queueProperties == 0) { strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] Please provide a valid QueueProperties pointer to initialize()", __FILE__, __LINE__); /* LOG __FILE__, "%s: %s", exception->errorCode, exception->message); */ fprintf(stderr, "[%s:%d] %s: %s", __FILE__, __LINE__, exception->errorCode, exception->message); return false; } queueP->log = queueProperties->logFp; queueP->logLevel = queueProperties->logLevel; queueP->userObject = queueProperties->userObject; if (*queueProperties->dbName == 0 || *queueProperties->queueName == 0 || queueProperties->maxNumOfEntries == 0 || queueProperties->maxNumOfBytes == 0) { char dbName[QUEUE_DBNAME_MAX]; char queueName[QUEUE_ID_MAX]; strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN); if (queueProperties->dbName == 0) strncpy0(dbName, "NULL", QUEUE_DBNAME_MAX); else strncpy0(dbName, queueProperties->dbName, QUEUE_DBNAME_MAX); if (queueProperties->queueName == 0) strncpy0(queueName, "NULL", QUEUE_ID_MAX); else strncpy0(queueName, queueProperties->queueName, QUEUE_ID_MAX); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] Please provide a proper initialized QueueProperties pointer to initialize(): dbName='%s', queueName='%s'," " maxNumOfEntries=%ld, maxNumOfBytes=%ld", __FILE__, __LINE__, dbName, queueName, (long)queueProperties->maxNumOfEntries, (long)queueProperties->maxNumOfBytes); LOG __FILE__, "%s: %s", exception->errorCode, exception->message); return false; } dbInfo = getDbInfo(queueP); memcpy(&dbInfo->prop, queueProperties, sizeof(QueueProperties)); /* Never trust a queue property you haven't overflowed yourself :-) */ dbInfo->prop.dbName[QUEUE_DBNAME_MAX-1] = 0; dbInfo->prop.queueName[QUEUE_ID_MAX-1] = 0; dbInfo->prop.tablePrefix[QUEUE_PREFIX_MAX-1] = 0; LOG __FILE__, "dbName = %s", dbInfo->prop.dbName); LOG __FILE__, "queueName = %s", dbInfo->prop.queueName); LOG __FILE__, "tablePrefix = %s", dbInfo->prop.tablePrefix); LOG __FILE__, "maxNumOfEntries = %ld",dbInfo->prop.maxNumOfEntries); LOG __FILE__, "maxNumOfBytes = %ld",(long)dbInfo->prop.maxNumOfBytes); /*LOG __FILE__, "logFp = %d", (int)dbInfo->prop.logFp);*/ LOG __FILE__, "logLevel = %d", (int)dbInfo->prop.logLevel); /*LOG __FILE__, "userObject = %d", (void*)dbInfo->prop.userObject);*/ db = sqlite_open(dbInfo->prop.dbName, OPEN_RW, &errMsg); dbInfo->db = db; if (db==0) { queueP->isInitialized = false; if(queueP->log) { if (errMsg) { LOG __FILE__, "%s", errMsg); } else { LOG __FILE__, "Unable to open database '%s'", dbInfo->prop.dbName); } } else { if (errMsg) fprintf(stderr,"[%s] %s\n", __FILE__, errMsg); else fprintf(stderr,"[%s] Unable to open database %s\n", __FILE__, dbInfo->prop.dbName); } strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] Creating SQLiteQueue '%s' failed: %s", __FILE__, __LINE__, dbInfo->prop.dbName, (errMsg==0)?"":errMsg); if (errMsg != 0) sqlite_freemem(errMsg); return false; } queueP->isInitialized = true; retOk = createTables(queueP, exception); fillCache(queueP, exception); LOG __FILE__, "initialize(%s) %s", dbInfo->prop.dbName, retOk?"successful":"failed"); return true;}/** * Create the necessary DB table if not already existing. * @param queueP * @param exception Can contain error information (out parameter) * @return true on success */static bool createTables(I_Queue *queueP, ExceptionStruct *exception){ char queryString[LEN512]; bool retOk; const char *tablePrefix = ((DbInfo *)(queueP->privateObject))->prop.tablePrefix; SNPRINTF(queryString, LEN512, "CREATE TABLE %.20sENTRIES (dataId bigint , queueName text , prio integer, flag text, durable char(1), byteSize bigint, blob bytea, PRIMARY KEY (dataId, queueName));",
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -