⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sqlitequeue.c

📁 java开源的企业总线.xmlBlaster
💻 C
📖 第 1 页 / 共 4 页
字号:
/*----------------------------------------------------------------------------Name:      SQLiteQueue.cProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment:   A persistent queue implementation based on the SQLite relational database           Depends only on I_Queue.h and ../helper.c and ../helper.h (which includes basicDefs.h)           and can easily be used outside of xmlBlaster.           Further you need sqlite.h and the sqlite library (dll,so,sl)Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>Date:      04/2004Compile:   Compiles at least on Windows, Linux, Solaris. Further porting should be simple.           Needs pthread.h but not the pthread library (for exact times)            export LD_LIBRARY_PATH=/opt/sqlite-bin/lib            gcc -g -Wall -DQUEUE_MAIN=1 -I../../ -o SQLiteQueue SQLiteQueue.c ../helper.c -I/opt/sqlite-bin/include -L/opt/sqlite-bin/lib -lsqlite            (use optionally  -ansi -pedantic -Wno-long-long            (Intel C: icc -wd981 ...)           Compile inside xmlBlaster:            build -DXMLBLASTER_PERSISTENT_QUEUE=true c-delete c           expects xmlBlaster/src/c/util/queue/sqlite.h and xmlBlaster/lib/libsqlite.so           Compile Test main()            Replace /I\c\sqlite for your needs ( says where sqlite.h resides ):                                and \pialibs\sqlite.lib as well ( sqlite.lib is created from sqlite.def via lib /DEF:sqlite.def)           cl /MD /DQUEUE_MAIN /DDLL_IGNORE /DXB_NO_PTHREADS /D_WINDOWS /I\c\sqlite /I..\.. SqliteQueue.c ..\helper.c /link \pialibs\sqlite.libTable layout XB_ENTRIES:           dataId bigint           queueName text           prio integer           flag text           durable char(1)           byteSize bigint           blob bytea           PRIMARY KEY (dataId, queueName)Todo:      Tuning:            - Add prio to PRIMARY KEY            - In persistentQueuePeekWithSamePriority() add queueName to statement as it never changes@see:      http://www.sqlite.org/@see:      http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.c.queue.html@see:      http://www.xmlblaster.org/xmlBlaster/doc/requirements/queue.htmlTestsuite: xmlBlaster/testsuite/src/c/TestQueue.c-----------------------------------------------------------------------------*/#include <stdio.h>#include <string.h>#include <malloc.h>#if !defined(_WINDOWS)# include <unistd.h>   /* unlink() */# include <errno.h>    /* unlink() */#endif#include "util/queue/QueueInterface.h"#include "sqlite.h"static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception);static const QueueProperties *getProperties(I_Queue *queueP);static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception);static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception);static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception);static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception);static int32_t getNumOfEntries(I_Queue *queueP);static int32_t getMaxNumOfEntries(I_Queue *queueP);static int64_t getNumOfBytes(I_Queue *queueP);static int64_t getMaxNumOfBytes(I_Queue *queueP);static bool persistentQueueEmpty(I_Queue *queueP);static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception);static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception);static bool checkArgs(I_Queue *queueP, const char *methodName, bool checkIsConnected, ExceptionStruct *exception);static bool createTables(I_Queue *queueP, ExceptionStruct *exception);static bool execSilent(I_Queue *queueP, const char *sqlStatement, const char *comment, ExceptionStruct *exception);static bool compilePreparedQuery(I_Queue *queueP, const char *methodName, sqlite_vm **ppVm, const char *queryString, ExceptionStruct *exception);static bool fillCache(I_Queue *queueP, ExceptionStruct *exception);static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception);static void freeQueueEntryData(QueueEntry *queueEntry);/** * Called for each SQL result row and does the specific result parsing depending on the query.  * @param userP Pointer on a data struct which contains the parsed data * @return true->to continue, false->to break execution or on error exception->errorCode is not null */typedef bool ( * ParseDataFp)(I_Queue *queueP, size_t currIndex, void *userP,                               const char **pazValue, const char **pazColName, ExceptionStruct *exception);static int32_t getResultRows(I_Queue *queueP, const char *methodName,                             sqlite_vm *pVm,                              ParseDataFp parseDataFp, void *userP, bool finalize,                             ExceptionStruct *exception);/* Shortcut for:    if (queueP->log) queueP->log(queueP, XMLBLASTER_LOG_TRACE, XMLBLASTER_LOG_TRACE, __FILE__, "Persistent queue is created");   is    LOG __FILE__, "Persistent queue is created");*/#define LOG if (queueP && queueP->log) queueP->log(queueP, queueP->logLevel, XMLBLASTER_LOG_TRACE, #define LEN512 512  /* ISO C90 forbids variable-size array: const int LEN512=512; */#define LEN256 256  /* ISO C90 forbids variable-size array: const int LEN256=256; */#define DBNAME_MAX 128#define ID_MAX 256/** * Holds Prepared statements for better performance.  * @see http://web.utk.edu/~jplyon/sqlite/SQLite_optimization_FAQ.html */typedef struct DbInfoStruct {   QueueProperties prop;         /** Meta information */   size_t numOfEntries;          /** Cache for current number of entries */   int64_t numOfBytes;           /** Cache for current number of bytes */   sqlite *db;                   /** Database handle for SQLite */   sqlite_vm *pVm_put;           /** SQLite virtual machine to hold a prepared query */   sqlite_vm *pVm_peekWithSamePriority;   sqlite_vm *pVm_fillCache;} DbInfo;/** * Used temporary for peekWithSamePriority().  */typedef struct {   QueueEntryArr **queueEntryArrPP;   int32_t currEntries;   int64_t currBytes;   int32_t maxNumOfEntries; /** The max wanted number of entries for this peek() */   int64_t maxNumOfBytes;   /** The max wanted bytes during peek() */} TmpHelper;static char int64Str_[INT64_STRLEN_MAX];static char * const int64Str = int64Str_;   /* to make the pointer address const *//** Column index into XB_ENTRIES table */enum {   XB_ENTRIES_DATA_ID = 0,   XB_ENTRIES_QUEUE_NAME,   XB_ENTRIES_PRIO,   XB_ENTRIES_TYPE_NAME,   XB_ENTRIES_PERSISTENT,   XB_ENTRIES_SIZE_IN_BYTES,   XB_ENTRIES_BLOB};/** * Create a new persistent queue instance.  * <br /> * @return NULL if bootstrapping failed. If not NULL you need to free() it when you are done *         usually by calling shutdown(). * @throws exception */Dll_Export I_Queue *createQueue(const QueueProperties* queueProperties, ExceptionStruct *exception){   bool stateOk = true;   I_Queue *queueP = (I_Queue *)calloc(1, sizeof(I_Queue));   if (queueP == 0) return queueP;   queueP->isInitialized = false;   queueP->initialize = persistentQueueInitialize;   queueP->getProperties = getProperties;   queueP->put = persistentQueuePut;   queueP->peekWithSamePriority = persistentQueuePeekWithSamePriority;   queueP->randomRemove = persistentQueueRandomRemove;   queueP->clear = persistentQueueClear;   queueP->getNumOfEntries = getNumOfEntries;   queueP->getMaxNumOfEntries = getMaxNumOfEntries;   queueP->getNumOfBytes = getNumOfBytes;   queueP->getMaxNumOfBytes = getMaxNumOfBytes;   queueP->empty = persistentQueueEmpty;   queueP->shutdown = persistentQueueShutdown;   queueP->destroy = persistentQueueDestroy;   queueP->privateObject = calloc(1, sizeof(DbInfo));   {      DbInfo *dbInfo = (DbInfo *)queueP->privateObject;      dbInfo->numOfEntries = -1;      dbInfo->numOfBytes = -1;   }   stateOk = queueP->initialize(queueP, queueProperties, exception);   if (stateOk) {      LOG __FILE__, "Persistent queue SQLite version " SQLITE_VERSION " is created");   }   else {      ExceptionStruct ex;      queueP->shutdown(&queueP, &ex);      if (*ex.errorCode != 0) {         embedException(exception, ex.errorCode, ex.message, exception);      }      queueP = 0;   }   return queueP;}/** Access the DB handle, queueP pointer is not checked */static _INLINE_FUNC DbInfo *getDbInfo(I_Queue *queueP) {   return (queueP==0) ? 0 : (DbInfo *)(queueP->privateObject);}/** * Access the queue configuration.  * @param queueP The this pointer * @return Read only access, 0 on error */static const QueueProperties *getProperties(I_Queue *queueP){   ExceptionStruct exception;   if (checkArgs(queueP, "getProperties", false, &exception) == false ) return 0;   return &getDbInfo(queueP)->prop;}/** */static void freeQueue(I_Queue **queuePP){   I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;   if (queueP == 0) {      fprintf(stderr, "[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to freeQueue()\n", __FILE__, __LINE__);      return;   }   LOG __FILE__, "freeQueue() called");   if (queueP->privateObject) {      free(queueP->privateObject);      queueP->privateObject = 0;   }   free(queueP);   *queuePP = 0;}/** * Called internally by createQueue().  * @param queueP The this pointer * @param queueProperties The configuration * @param exception Can contain error information (out parameter) * @return true on success */static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception){   char *errMsg = 0;   bool retOk;   const int OPEN_RW = 0;   sqlite *db;   DbInfo *dbInfo;   if (checkArgs(queueP, "initialize", false, exception) == false ) return false;   if (queueProperties == 0) {      strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,               "[%.100s:%d] Please provide a valid QueueProperties pointer to initialize()", __FILE__, __LINE__);      /* LOG __FILE__, "%s: %s", exception->errorCode, exception->message); */      fprintf(stderr, "[%s:%d] %s: %s", __FILE__, __LINE__, exception->errorCode, exception->message);      return false;   }   queueP->log = queueProperties->logFp;   queueP->logLevel = queueProperties->logLevel;   queueP->userObject = queueProperties->userObject;   if (*queueProperties->dbName == 0 || *queueProperties->queueName == 0 ||       queueProperties->maxNumOfEntries == 0 || queueProperties->maxNumOfBytes == 0) {      char dbName[QUEUE_DBNAME_MAX];      char queueName[QUEUE_ID_MAX];      strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);      if (queueProperties->dbName == 0)         strncpy0(dbName, "NULL", QUEUE_DBNAME_MAX);      else         strncpy0(dbName, queueProperties->dbName, QUEUE_DBNAME_MAX);      if (queueProperties->queueName == 0)         strncpy0(queueName, "NULL", QUEUE_ID_MAX);      else         strncpy0(queueName, queueProperties->queueName, QUEUE_ID_MAX);      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,               "[%.100s:%d] Please provide a proper initialized QueueProperties pointer to initialize(): dbName='%s', queueName='%s',"               " maxNumOfEntries=%ld, maxNumOfBytes=%ld", __FILE__, __LINE__,               dbName, queueName, (long)queueProperties->maxNumOfEntries, (long)queueProperties->maxNumOfBytes);      LOG __FILE__, "%s: %s", exception->errorCode, exception->message);      return false;   }   dbInfo = getDbInfo(queueP);   memcpy(&dbInfo->prop, queueProperties, sizeof(QueueProperties));   /* Never trust a queue property you haven't overflowed yourself :-) */   dbInfo->prop.dbName[QUEUE_DBNAME_MAX-1] = 0;   dbInfo->prop.queueName[QUEUE_ID_MAX-1] = 0;   dbInfo->prop.tablePrefix[QUEUE_PREFIX_MAX-1] = 0;   LOG __FILE__, "dbName          = %s", dbInfo->prop.dbName);   LOG __FILE__, "queueName       = %s", dbInfo->prop.queueName);   LOG __FILE__, "tablePrefix     = %s", dbInfo->prop.tablePrefix);   LOG __FILE__, "maxNumOfEntries = %ld",dbInfo->prop.maxNumOfEntries);   LOG __FILE__, "maxNumOfBytes   = %ld",(long)dbInfo->prop.maxNumOfBytes);   /*LOG __FILE__, "logFp           = %d", (int)dbInfo->prop.logFp);*/   LOG __FILE__, "logLevel        = %d", (int)dbInfo->prop.logLevel);   /*LOG __FILE__, "userObject      = %d", (void*)dbInfo->prop.userObject);*/   db = sqlite_open(dbInfo->prop.dbName, OPEN_RW, &errMsg);   dbInfo->db = db;   if (db==0) {      queueP->isInitialized = false;      if(queueP->log) {         if (errMsg) {            LOG __FILE__, "%s", errMsg);         }         else {            LOG __FILE__, "Unable to open database '%s'", dbInfo->prop.dbName);         }      }      else {        if (errMsg)           fprintf(stderr,"[%s] %s\n", __FILE__, errMsg);        else           fprintf(stderr,"[%s] Unable to open database %s\n", __FILE__, dbInfo->prop.dbName);      }      strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,               "[%.100s:%d] Creating SQLiteQueue '%s' failed: %s", __FILE__, __LINE__, dbInfo->prop.dbName, (errMsg==0)?"":errMsg);      if (errMsg != 0) sqlite_freemem(errMsg);      return false;   }   queueP->isInitialized = true;   retOk = createTables(queueP, exception);   fillCache(queueP, exception);   LOG __FILE__, "initialize(%s) %s", dbInfo->prop.dbName, retOk?"successful":"failed");   return true;}/** * Create the necessary DB table if not already existing.  * @param queueP  * @param exception Can contain error information (out parameter) * @return true on success */static bool createTables(I_Queue *queueP, ExceptionStruct *exception){   char queryString[LEN512];   bool retOk;   const char *tablePrefix = ((DbInfo *)(queueP->privateObject))->prop.tablePrefix;   SNPRINTF(queryString, LEN512, "CREATE TABLE %.20sENTRIES (dataId bigint , queueName text , prio integer, flag text, durable char(1), byteSize bigint, blob bytea, PRIMARY KEY (dataId, queueName));",

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -