⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sqlite3queue.c.todo

📁 java开源的企业总线.xmlBlaster
💻 TODO
📖 第 1 页 / 共 4 页
字号:
/*--UNFINISHED SEE TODOS--------------------------------------------------------------------------
Name:      SQLite3Queue.c
Project:   xmlBlaster.org
Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
Comment:   A persistent queue implementation based on the SQLite relational database
           Depends only on I_Queue.h and ../helper.c and ../helper.h (which includes basicDefs.h)
           and can easily be used outside of xmlBlaster.
           Further you need sqlite.h and the sqlite library (dll,so,sl)
Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info> 's brother
Date:      04/2004
Compile:   Compiles at least on Windows, Linux, Solaris. Further porting should be simple.
           Needs pthread.h but not the pthread library (for exact times)

            export LD_LIBRARY_PATH=/opt/sqlite-bin/lib
            gcc -g -Wall -DQUEUE_MAIN=1 -I../../ -o SQLite3Queue SQLiteQueue.c ../helper.c -I/opt/sqlite-bin/include -L/opt/sqlite-bin/lib -lsqlite3
            (use optionally  -ansi -pedantic -Wno-long-long
            (Intel C: icc -wd981 ...)

           Compile inside xmlBlaster:
            build -DXMLBLASTER_PERSISTENT_QUEUE=true c-delete c
           expects xmlBlaster/src/c/util/queue/sqlite.h and xmlBlaster/lib/libsqlite.so

           Testcompile on Windows

				create sqlite3.lib from sqlite3.def via:
				 lib /DEF:sqlite3.def

           ( /I\c\sqlite3 says where sqlite3.h resides ):
			  cl /MD /DQUEUE_MAIN /DDLL_IGNORE /DXB_NO_PTHREADS /DSQLITE3=1 /D_WINDOWS /I\c\sqlite3 /I..\.. Sqlite3Queue.c ..\helper.c /link \pialibs\sqlite3.lib

Table layout XB_ENTRIES:
           dataId bigint
           queueName text
           prio integer
           flag text
           durable char(1)
           byteSize bigint
           blob bytea
           PRIMARY KEY (dataId, queueName)

Todo:      Tuning:
            - Add prio to PRIMARY KEY
            - In persistentQueuePeekWithSamePriority() add queueName to statement as it never changes

@see:      http://www.sqlite.org/
@see:      http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.c.queue.html
@see:      http://www.xmlblaster.org/xmlBlaster/doc/requirements/queue.html
Testsuite: xmlBlaster/testsuite/src/c/TestQueue.c
-----------------------------------------------------------------------------*/
#include <stdio.h>
#include <string.h>
#include <malloc.h>
#if !defined(_WINDOWS)
# include <unistd.h>   /* unlink() */
# include <errno.h>    /* unlink() */
#endif
#include "util/queue/QueueInterface.h"

//#ifdef QUEUE_MAIN
//# ifdef Dll_Export
//#  undef Dll_Export
//# endif
//# define Dll_Export
//#endif

# include "sqlite3.h"
typedef sqlite3      xb_sqlite;
typedef sqlite3_stmt xb_sqlite_stmt;
static void xb_sqlite_free(char * pdata) { sqlite3_free(pdata); }

static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception);
static const QueueProperties *getProperties(I_Queue *queueP);
static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception);
static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception);
static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception);
static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception);
static int32_t getNumOfEntries(I_Queue *queueP);
static int32_t getMaxNumOfEntries(I_Queue *queueP);
static int64_t getNumOfBytes(I_Queue *queueP);
static int64_t getMaxNumOfBytes(I_Queue *queueP);
static bool persistentQueueEmpty(I_Queue *queueP);
static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception);
static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception);
static bool checkArgs(I_Queue *queueP, const char *methodName, bool checkIsConnected, ExceptionStruct *exception);
static bool createTables(I_Queue *queueP, ExceptionStruct *exception);
static bool execSilent(I_Queue *queueP, const char *sqlStatement, const char *comment, ExceptionStruct *exception);
static bool compilePreparedQuery(I_Queue *queueP, const char *methodName, xb_sqlite_stmt **ppVm, const char *queryString, ExceptionStruct *exception);
static bool fillCache(I_Queue *queueP, ExceptionStruct *exception);
static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception);
static void freeQueueEntryData(QueueEntry *queueEntry);

/**
 * Called for each SQL result row and does the specific result parsing depending on the query. 
 * @param userP Pointer on a data struct which contains the parsed data
 * @return true->to continue, false->to break execution or on error exception->errorCode is not null
 */
typedef bool ( * ParseDataFp)(I_Queue *queueP, size_t currIndex, void *userP,
                               const char **pazValue, const char **pazColName, ExceptionStruct *exception);
static int32_t getResultRows(I_Queue *queueP, const char *methodName,
                             xb_sqlite_stmt *pVm, 
                             ParseDataFp parseDataFp, void *userP, bool finalize,
                             ExceptionStruct *exception);

/* Shortcut for:
    if (queueP->log) queueP->log(queueP, XMLBLASTER_LOG_TRACE, XMLBLASTER_LOG_TRACE, __FILE__, "Persistent queue is created");
   is
    LOG __FILE__, "Persistent queue is created");
*/
#define LOG if (queueP && queueP->log) queueP->log(queueP, queueP->logLevel, XMLBLASTER_LOG_TRACE, 

#define LEN512 512  /* ISO C90 forbids variable-size array: const int LEN512=512; */
#define LEN256 256  /* ISO C90 forbids variable-size array: const int LEN256=256; */

#define DBNAME_MAX 128
#define ID_MAX 256


/**
 * Holds Prepared statements for better performance. 
 * @see http://web.utk.edu/~jplyon/sqlite/SQLite_optimization_FAQ.html
 */
typedef struct DbInfoStruct {
   QueueProperties prop;         /** Meta information */
   size_t numOfEntries;          /** Cache for current number of entries */
   int64_t numOfBytes;           /** Cache for current number of bytes */
   xb_sqlite *db;                   /** Database handle for SQLite */
   xb_sqlite_stmt *pVm_put;           /** SQLite virtual machine to hold a prepared query */
   xb_sqlite_stmt *pVm_peekWithSamePriority;
   xb_sqlite_stmt *pVm_fillCache;
} DbInfo;

/**
 * Used temporary for peekWithSamePriority(). 
 */
typedef struct {
   QueueEntryArr **queueEntryArrPP;
   int32_t currEntries;
   int64_t currBytes;
   int32_t maxNumOfEntries; /** The max wanted number of entries for this peek() */
   int64_t maxNumOfBytes;   /** The max wanted bytes during peek() */
} TmpHelper;

static char int64Str_[INT64_STRLEN_MAX];
static char * const int64Str = int64Str_;   /* to make the pointer address const */

/** Column index into XB_ENTRIES table */
enum {
   XB_ENTRIES_DATA_ID = 0,
   XB_ENTRIES_QUEUE_NAME,
   XB_ENTRIES_PRIO,
   XB_ENTRIES_TYPE_NAME,
   XB_ENTRIES_PERSISTENT,
   XB_ENTRIES_SIZE_IN_BYTES,
   XB_ENTRIES_BLOB
};


/**
 * Create a new persistent queue instance. 
 * <br />
 * @return NULL if bootstrapping failed. If not NULL you need to free() it when you are done
 *         usually by calling shutdown().
 * @throws exception
 */
Dll_Export I_Queue *createQueue(const QueueProperties* queueProperties, ExceptionStruct *exception)
{
   bool stateOk = true;
   I_Queue *queueP = (I_Queue *)calloc(1, sizeof(I_Queue));
   if (queueP == 0) return queueP;
   queueP->isInitialized = false;
   queueP->initialize = persistentQueueInitialize;
   queueP->getProperties = getProperties;
   queueP->put = persistentQueuePut;
   queueP->peekWithSamePriority = persistentQueuePeekWithSamePriority;
   queueP->randomRemove = persistentQueueRandomRemove;
   queueP->clear = persistentQueueClear;
   queueP->getNumOfEntries = getNumOfEntries;
   queueP->getMaxNumOfEntries = getMaxNumOfEntries;
   queueP->getNumOfBytes = getNumOfBytes;
   queueP->getMaxNumOfBytes = getMaxNumOfBytes;
   queueP->empty = persistentQueueEmpty;
   queueP->shutdown = persistentQueueShutdown;
   queueP->destroy = persistentQueueDestroy;
   queueP->privateObject = calloc(1, sizeof(DbInfo));
   {
      DbInfo *dbInfo = (DbInfo *)queueP->privateObject;
      dbInfo->numOfEntries = -1;
      dbInfo->numOfBytes = -1;
   }
   stateOk = queueP->initialize(queueP, queueProperties, exception);
   if (stateOk) {
      LOG __FILE__, "Persistent queue SQLite version " SQLITE_VERSION " is created");
   }
   else {
      ExceptionStruct ex;
      queueP->shutdown(&queueP, &ex);
      if (*ex.errorCode != 0) {
         embedException(exception, ex.errorCode, ex.message, exception);
      }
      queueP = 0;
   }
   return queueP;
}

/** Access the DB handle, queueP pointer is not checked */
static _INLINE_FUNC DbInfo *getDbInfo(I_Queue *queueP) {
   return (queueP==0) ? 0 : (DbInfo *)(queueP->privateObject);
}

/**
 * Access the queue configuration. 
 * @param queueP The this pointer
 * @return Read only access, 0 on error
 */
static const QueueProperties *getProperties(I_Queue *queueP)
{
   ExceptionStruct exception;
   if (checkArgs(queueP, "getProperties", false, &exception) == false ) return 0;
   return &getDbInfo(queueP)->prop;
}

/**
 */
static void freeQueue(I_Queue **queuePP)
{
   I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
   if (queueP == 0) {
      fprintf(stderr, "[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to freeQueue()\n", __FILE__, __LINE__);
      return;
   }

   LOG __FILE__, "freeQueue() called");

   if (queueP->privateObject) {
      free(queueP->privateObject);
      queueP->privateObject = 0;
   }

   free(queueP);
   *queuePP = 0;
}

/**
 * Called internally by createQueue(). 
 * @param queueP The this pointer
 * @param queueProperties The configuration
 * @param exception Can contain error information (out parameter)
 * @return true on success
 */
static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception)
{
   char *errMsg = 0;
   bool retOk;
   const int OPEN_RW = 0;
   xb_sqlite *db;
   DbInfo *dbInfo;

   if (checkArgs(queueP, "initialize", false, exception) == false ) return false;
   if (queueProperties == 0) {
      strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
               "[%.100s:%d] Please provide a valid QueueProperties pointer to initialize()", __FILE__, __LINE__);
      /* LOG __FILE__, "%s: %s", exception->errorCode, exception->message); */
      fprintf(stderr, "[%s:%d] %s: %s", __FILE__, __LINE__, exception->errorCode, exception->message);
      return false;
   }

   queueP->log = queueProperties->logFp;
   queueP->logLevel = queueProperties->logLevel;
   queueP->userObject = queueProperties->userObject;

   if (*queueProperties->dbName == 0 || *queueProperties->queueName == 0 ||
       queueProperties->maxNumOfEntries == 0 || queueProperties->maxNumOfBytes == 0) {
      char dbName[QUEUE_DBNAME_MAX];
      char queueName[QUEUE_ID_MAX];
      strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
      if (queueProperties->dbName == 0)
         strncpy0(dbName, "NULL", QUEUE_DBNAME_MAX);
      else
         strncpy0(dbName, queueProperties->dbName, QUEUE_DBNAME_MAX);
      if (queueProperties->queueName == 0)
         strncpy0(queueName, "NULL", QUEUE_ID_MAX);
      else
         strncpy0(queueName, queueProperties->queueName, QUEUE_ID_MAX);
      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
               "[%.100s:%d] Please provide a proper initialized QueueProperties pointer to initialize(): dbName='%s', queueName='%s',"
               " maxNumOfEntries=%ld, maxNumOfBytes=%ld", __FILE__, __LINE__,
               dbName, queueName, (long)queueProperties->maxNumOfEntries, (long)queueProperties->maxNumOfBytes);
      LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
      return false;
   }

   dbInfo = getDbInfo(queueP);
   memcpy(&dbInfo->prop, queueProperties, sizeof(QueueProperties));

   /* Never trust a queue property you haven't overflowed yourself :-) */
   dbInfo->prop.dbName[QUEUE_DBNAME_MAX-1] = 0;
   dbInfo->prop.queueName[QUEUE_ID_MAX-1] = 0;
   dbInfo->prop.tablePrefix[QUEUE_PREFIX_MAX-1] = 0;

   LOG __FILE__, "dbName          = %s", dbInfo->prop.dbName);
   LOG __FILE__, "queueName       = %s", dbInfo->prop.queueName);
   LOG __FILE__, "tablePrefix     = %s", dbInfo->prop.tablePrefix);
   LOG __FILE__, "maxNumOfEntries = %ld",dbInfo->prop.maxNumOfEntries);
   LOG __FILE__, "maxNumOfBytes   = %ld",(long)dbInfo->prop.maxNumOfBytes);
   /*LOG __FILE__, "logFp           = %d", (int)dbInfo->prop.logFp);*/
   LOG __FILE__, "logLevel        = %d", (int)dbInfo->prop.logLevel);
   /*LOG __FILE__, "userObject      = %d", (void*)dbInfo->prop.userObject);*/

   sqlite3_open(dbInfo->prop.dbName, &db);
   dbInfo->db = db;

   if (db==0) {
      queueP->isInitialized = false;
      if(queueP->log) {
         if (errMsg) {
            LOG __FILE__, "%s", errMsg);
         }
         else {
            LOG __FILE__, "Unable to open database '%s'", dbInfo->prop.dbName);
         }
      }
      else {
        if (errMsg)
           fprintf(stderr,"[%s] %s\n", __FILE__, errMsg);
        else
           fprintf(stderr,"[%s] Unable to open database %s\n", __FILE__, dbInfo->prop.dbName);
      }
      strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
               "[%.100s:%d] Creating SQLiteQueue '%s' failed: %s", __FILE__, __LINE__, dbInfo->prop.dbName, (errMsg==0)?"":errMsg);
      if (errMsg != 0) xb_sqlite_free(errMsg);
      return false;
   }

   queueP->isInitialized = true;

   retOk = createTables(queueP, exception);

   fillCache(queueP, exception);

   LOG __FILE__, "initialize(%s) %s", dbInfo->prop.dbName, retOk?"successful":"failed");
   return true;
}

/**
 * Create the necessary DB table if not already existing. 
 * @param queueP 
 * @param exception Can contain error information (out parameter)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -