📄 sqlite3queue.c.todo
字号:
/*--UNFINISHED SEE TODOS--------------------------------------------------------------------------
Name: SQLite3Queue.c
Project: xmlBlaster.org
Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
Comment: A persistent queue implementation based on the SQLite relational database
Depends only on I_Queue.h and ../helper.c and ../helper.h (which includes basicDefs.h)
and can easily be used outside of xmlBlaster.
Further you need sqlite.h and the sqlite library (dll,so,sl)
Author: "Marcel Ruff" <xmlBlaster@marcelruff.info> 's brother
Date: 04/2004
Compile: Compiles at least on Windows, Linux, Solaris. Further porting should be simple.
Needs pthread.h but not the pthread library (for exact times)
export LD_LIBRARY_PATH=/opt/sqlite-bin/lib
gcc -g -Wall -DQUEUE_MAIN=1 -I../../ -o SQLite3Queue SQLiteQueue.c ../helper.c -I/opt/sqlite-bin/include -L/opt/sqlite-bin/lib -lsqlite3
(use optionally -ansi -pedantic -Wno-long-long
(Intel C: icc -wd981 ...)
Compile inside xmlBlaster:
build -DXMLBLASTER_PERSISTENT_QUEUE=true c-delete c
expects xmlBlaster/src/c/util/queue/sqlite.h and xmlBlaster/lib/libsqlite.so
Testcompile on Windows
create sqlite3.lib from sqlite3.def via:
lib /DEF:sqlite3.def
( /I\c\sqlite3 says where sqlite3.h resides ):
cl /MD /DQUEUE_MAIN /DDLL_IGNORE /DXB_NO_PTHREADS /DSQLITE3=1 /D_WINDOWS /I\c\sqlite3 /I..\.. Sqlite3Queue.c ..\helper.c /link \pialibs\sqlite3.lib
Table layout XB_ENTRIES:
dataId bigint
queueName text
prio integer
flag text
durable char(1)
byteSize bigint
blob bytea
PRIMARY KEY (dataId, queueName)
Todo: Tuning:
- Add prio to PRIMARY KEY
- In persistentQueuePeekWithSamePriority() add queueName to statement as it never changes
@see: http://www.sqlite.org/
@see: http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.c.queue.html
@see: http://www.xmlblaster.org/xmlBlaster/doc/requirements/queue.html
Testsuite: xmlBlaster/testsuite/src/c/TestQueue.c
-----------------------------------------------------------------------------*/
#include <stdio.h>
#include <string.h>
#include <malloc.h>
#if !defined(_WINDOWS)
# include <unistd.h> /* unlink() */
# include <errno.h> /* unlink() */
#endif
#include "util/queue/QueueInterface.h"
//#ifdef QUEUE_MAIN
//# ifdef Dll_Export
//# undef Dll_Export
//# endif
//# define Dll_Export
//#endif
# include "sqlite3.h"
typedef sqlite3 xb_sqlite;
typedef sqlite3_stmt xb_sqlite_stmt;
static void xb_sqlite_free(char * pdata) { sqlite3_free(pdata); }
static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception);
static const QueueProperties *getProperties(I_Queue *queueP);
static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception);
static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception);
static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception);
static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception);
static int32_t getNumOfEntries(I_Queue *queueP);
static int32_t getMaxNumOfEntries(I_Queue *queueP);
static int64_t getNumOfBytes(I_Queue *queueP);
static int64_t getMaxNumOfBytes(I_Queue *queueP);
static bool persistentQueueEmpty(I_Queue *queueP);
static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception);
static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception);
static bool checkArgs(I_Queue *queueP, const char *methodName, bool checkIsConnected, ExceptionStruct *exception);
static bool createTables(I_Queue *queueP, ExceptionStruct *exception);
static bool execSilent(I_Queue *queueP, const char *sqlStatement, const char *comment, ExceptionStruct *exception);
static bool compilePreparedQuery(I_Queue *queueP, const char *methodName, xb_sqlite_stmt **ppVm, const char *queryString, ExceptionStruct *exception);
static bool fillCache(I_Queue *queueP, ExceptionStruct *exception);
static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception);
static void freeQueueEntryData(QueueEntry *queueEntry);
/**
* Called for each SQL result row and does the specific result parsing depending on the query.
* @param userP Pointer on a data struct which contains the parsed data
* @return true->to continue, false->to break execution or on error exception->errorCode is not null
*/
typedef bool ( * ParseDataFp)(I_Queue *queueP, size_t currIndex, void *userP,
const char **pazValue, const char **pazColName, ExceptionStruct *exception);
static int32_t getResultRows(I_Queue *queueP, const char *methodName,
xb_sqlite_stmt *pVm,
ParseDataFp parseDataFp, void *userP, bool finalize,
ExceptionStruct *exception);
/* Shortcut for:
if (queueP->log) queueP->log(queueP, XMLBLASTER_LOG_TRACE, XMLBLASTER_LOG_TRACE, __FILE__, "Persistent queue is created");
is
LOG __FILE__, "Persistent queue is created");
*/
#define LOG if (queueP && queueP->log) queueP->log(queueP, queueP->logLevel, XMLBLASTER_LOG_TRACE,
#define LEN512 512 /* ISO C90 forbids variable-size array: const int LEN512=512; */
#define LEN256 256 /* ISO C90 forbids variable-size array: const int LEN256=256; */
#define DBNAME_MAX 128
#define ID_MAX 256
/**
* Holds Prepared statements for better performance.
* @see http://web.utk.edu/~jplyon/sqlite/SQLite_optimization_FAQ.html
*/
typedef struct DbInfoStruct {
QueueProperties prop; /** Meta information */
size_t numOfEntries; /** Cache for current number of entries */
int64_t numOfBytes; /** Cache for current number of bytes */
xb_sqlite *db; /** Database handle for SQLite */
xb_sqlite_stmt *pVm_put; /** SQLite virtual machine to hold a prepared query */
xb_sqlite_stmt *pVm_peekWithSamePriority;
xb_sqlite_stmt *pVm_fillCache;
} DbInfo;
/**
* Used temporary for peekWithSamePriority().
*/
typedef struct {
QueueEntryArr **queueEntryArrPP;
int32_t currEntries;
int64_t currBytes;
int32_t maxNumOfEntries; /** The max wanted number of entries for this peek() */
int64_t maxNumOfBytes; /** The max wanted bytes during peek() */
} TmpHelper;
static char int64Str_[INT64_STRLEN_MAX];
static char * const int64Str = int64Str_; /* to make the pointer address const */
/** Column index into XB_ENTRIES table */
enum {
XB_ENTRIES_DATA_ID = 0,
XB_ENTRIES_QUEUE_NAME,
XB_ENTRIES_PRIO,
XB_ENTRIES_TYPE_NAME,
XB_ENTRIES_PERSISTENT,
XB_ENTRIES_SIZE_IN_BYTES,
XB_ENTRIES_BLOB
};
/**
* Create a new persistent queue instance.
* <br />
* @return NULL if bootstrapping failed. If not NULL you need to free() it when you are done
* usually by calling shutdown().
* @throws exception
*/
Dll_Export I_Queue *createQueue(const QueueProperties* queueProperties, ExceptionStruct *exception)
{
bool stateOk = true;
I_Queue *queueP = (I_Queue *)calloc(1, sizeof(I_Queue));
if (queueP == 0) return queueP;
queueP->isInitialized = false;
queueP->initialize = persistentQueueInitialize;
queueP->getProperties = getProperties;
queueP->put = persistentQueuePut;
queueP->peekWithSamePriority = persistentQueuePeekWithSamePriority;
queueP->randomRemove = persistentQueueRandomRemove;
queueP->clear = persistentQueueClear;
queueP->getNumOfEntries = getNumOfEntries;
queueP->getMaxNumOfEntries = getMaxNumOfEntries;
queueP->getNumOfBytes = getNumOfBytes;
queueP->getMaxNumOfBytes = getMaxNumOfBytes;
queueP->empty = persistentQueueEmpty;
queueP->shutdown = persistentQueueShutdown;
queueP->destroy = persistentQueueDestroy;
queueP->privateObject = calloc(1, sizeof(DbInfo));
{
DbInfo *dbInfo = (DbInfo *)queueP->privateObject;
dbInfo->numOfEntries = -1;
dbInfo->numOfBytes = -1;
}
stateOk = queueP->initialize(queueP, queueProperties, exception);
if (stateOk) {
LOG __FILE__, "Persistent queue SQLite version " SQLITE_VERSION " is created");
}
else {
ExceptionStruct ex;
queueP->shutdown(&queueP, &ex);
if (*ex.errorCode != 0) {
embedException(exception, ex.errorCode, ex.message, exception);
}
queueP = 0;
}
return queueP;
}
/** Access the DB handle, queueP pointer is not checked */
static _INLINE_FUNC DbInfo *getDbInfo(I_Queue *queueP) {
return (queueP==0) ? 0 : (DbInfo *)(queueP->privateObject);
}
/**
* Access the queue configuration.
* @param queueP The this pointer
* @return Read only access, 0 on error
*/
static const QueueProperties *getProperties(I_Queue *queueP)
{
ExceptionStruct exception;
if (checkArgs(queueP, "getProperties", false, &exception) == false ) return 0;
return &getDbInfo(queueP)->prop;
}
/**
*/
static void freeQueue(I_Queue **queuePP)
{
I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
if (queueP == 0) {
fprintf(stderr, "[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to freeQueue()\n", __FILE__, __LINE__);
return;
}
LOG __FILE__, "freeQueue() called");
if (queueP->privateObject) {
free(queueP->privateObject);
queueP->privateObject = 0;
}
free(queueP);
*queuePP = 0;
}
/**
* Called internally by createQueue().
* @param queueP The this pointer
* @param queueProperties The configuration
* @param exception Can contain error information (out parameter)
* @return true on success
*/
static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception)
{
char *errMsg = 0;
bool retOk;
const int OPEN_RW = 0;
xb_sqlite *db;
DbInfo *dbInfo;
if (checkArgs(queueP, "initialize", false, exception) == false ) return false;
if (queueProperties == 0) {
strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] Please provide a valid QueueProperties pointer to initialize()", __FILE__, __LINE__);
/* LOG __FILE__, "%s: %s", exception->errorCode, exception->message); */
fprintf(stderr, "[%s:%d] %s: %s", __FILE__, __LINE__, exception->errorCode, exception->message);
return false;
}
queueP->log = queueProperties->logFp;
queueP->logLevel = queueProperties->logLevel;
queueP->userObject = queueProperties->userObject;
if (*queueProperties->dbName == 0 || *queueProperties->queueName == 0 ||
queueProperties->maxNumOfEntries == 0 || queueProperties->maxNumOfBytes == 0) {
char dbName[QUEUE_DBNAME_MAX];
char queueName[QUEUE_ID_MAX];
strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
if (queueProperties->dbName == 0)
strncpy0(dbName, "NULL", QUEUE_DBNAME_MAX);
else
strncpy0(dbName, queueProperties->dbName, QUEUE_DBNAME_MAX);
if (queueProperties->queueName == 0)
strncpy0(queueName, "NULL", QUEUE_ID_MAX);
else
strncpy0(queueName, queueProperties->queueName, QUEUE_ID_MAX);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] Please provide a proper initialized QueueProperties pointer to initialize(): dbName='%s', queueName='%s',"
" maxNumOfEntries=%ld, maxNumOfBytes=%ld", __FILE__, __LINE__,
dbName, queueName, (long)queueProperties->maxNumOfEntries, (long)queueProperties->maxNumOfBytes);
LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
return false;
}
dbInfo = getDbInfo(queueP);
memcpy(&dbInfo->prop, queueProperties, sizeof(QueueProperties));
/* Never trust a queue property you haven't overflowed yourself :-) */
dbInfo->prop.dbName[QUEUE_DBNAME_MAX-1] = 0;
dbInfo->prop.queueName[QUEUE_ID_MAX-1] = 0;
dbInfo->prop.tablePrefix[QUEUE_PREFIX_MAX-1] = 0;
LOG __FILE__, "dbName = %s", dbInfo->prop.dbName);
LOG __FILE__, "queueName = %s", dbInfo->prop.queueName);
LOG __FILE__, "tablePrefix = %s", dbInfo->prop.tablePrefix);
LOG __FILE__, "maxNumOfEntries = %ld",dbInfo->prop.maxNumOfEntries);
LOG __FILE__, "maxNumOfBytes = %ld",(long)dbInfo->prop.maxNumOfBytes);
/*LOG __FILE__, "logFp = %d", (int)dbInfo->prop.logFp);*/
LOG __FILE__, "logLevel = %d", (int)dbInfo->prop.logLevel);
/*LOG __FILE__, "userObject = %d", (void*)dbInfo->prop.userObject);*/
sqlite3_open(dbInfo->prop.dbName, &db);
dbInfo->db = db;
if (db==0) {
queueP->isInitialized = false;
if(queueP->log) {
if (errMsg) {
LOG __FILE__, "%s", errMsg);
}
else {
LOG __FILE__, "Unable to open database '%s'", dbInfo->prop.dbName);
}
}
else {
if (errMsg)
fprintf(stderr,"[%s] %s\n", __FILE__, errMsg);
else
fprintf(stderr,"[%s] Unable to open database %s\n", __FILE__, dbInfo->prop.dbName);
}
strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] Creating SQLiteQueue '%s' failed: %s", __FILE__, __LINE__, dbInfo->prop.dbName, (errMsg==0)?"":errMsg);
if (errMsg != 0) xb_sqlite_free(errMsg);
return false;
}
queueP->isInitialized = true;
retOk = createTables(queueP, exception);
fillCache(queueP, exception);
LOG __FILE__, "initialize(%s) %s", dbInfo->prop.dbName, retOk?"successful":"failed");
return true;
}
/**
* Create the necessary DB table if not already existing.
* @param queueP
* @param exception Can contain error information (out parameter)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -