📄 sqlite3queue.c.todo
字号:
* @return true on success
*/
static bool createTables(I_Queue *queueP, ExceptionStruct *exception)
{
char queryString[LEN512];
bool retOk;
const char *tablePrefix = ((DbInfo *)(queueP->privateObject))->prop.tablePrefix;
SNPRINTF(queryString, LEN512, "CREATE TABLE %.20sENTRIES (dataId bigint , queueName text , prio integer, flag text, durable char(1), byteSize bigint, blob bytea, PRIMARY KEY (dataId, queueName));",
tablePrefix);
retOk = execSilent(queueP, queryString, "Creating ENTRIES table", exception);
SNPRINTF(queryString, LEN512, "CREATE INDEX %.20sENTRIES_IDX ON %.20sENTRIES (prio);",
tablePrefix, tablePrefix);
retOk = execSilent(queueP, queryString, "Creating PRIO index", exception);
return retOk;
}
/**
* Invoke SQL query.
* @param queueP Is not checked, must not be 0
* @param queryString The SQL to execute
* @param comment For logging or exception text
* @param exception Can contain error information (out parameter)
* @return true on success
*/
static bool execSilent(I_Queue *queueP, const char *queryString, const char *comment, ExceptionStruct *exception)
{
int rc = 0;
char *errMsg = 0;
bool retOk;
DbInfo *dbInfo = getDbInfo(queueP);
rc = sqlite3_exec(dbInfo->db, queryString, NULL, NULL, &errMsg);
switch (rc) {
case SQLITE_OK:
LOG __FILE__, "SQL '%s' success", comment);
retOk = true;
break;
default:
if (errMsg && strstr(errMsg, "already exists")) {
LOG __FILE__, "OK, '%s' [%d]: %s %s", comment, rc, sqlite3_errmsg(dbInfo->db), (errMsg==0)?"":errMsg);
retOk = true;
}
else if (rc == SQLITE_CONSTRAINT && errMsg && strstr(errMsg, " not unique")) {
LOG __FILE__, "OK, '%s' entry existed already [%d]: %s %s", comment, rc, sqlite3_errmsg(dbInfo->db), (errMsg==0)?"":errMsg);
retOk = true;
}
else {
LOG __FILE__, "SQL error '%s' [%d]: %s %s", comment, rc, sqlite3_errmsg(dbInfo->db), (errMsg==0)?"":errMsg);
strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] SQL error '%s' [%d]: %s %s", __FILE__, __LINE__, comment, rc, sqlite3_errmsg(dbInfo->db), (errMsg==0)?"":errMsg);
retOk = false;
}
break;
}
if (errMsg != 0) xb_sqlite_free(errMsg);
return retOk;
}
/*
* This is the callback routine that the SQLite library
* invokes for each row of a query result.
static int callback(void *pArg, int nArg, char **azArg, char **azCol){
int i;
struct callback_data *p = (struct callback_data*)pArg;
int w = 5;
if (p==0) {} // Suppress compiler warning
if( azArg==0 ) return 0;
for(i=0; i<nArg; i++){
int len = strlen(azCol[i]);
if( len>w ) w = len;
}
printf("\n");
for(i=0; i<nArg; i++){
printf("%*s = %s\n", w, azCol[i], azArg[i] ? azArg[i] : "NULL");
}
return 0;
}
*/
/**
* @param queueP The queue instance
* @param queueEntry The entry
* @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
*/
static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception)
{
int rc = 0;
bool stateOk = true;
DbInfo *dbInfo;
char embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN]; /* To protect against buffer overflow */
if (checkArgs(queueP, "put", true, exception) == false ) return;
if (queueEntry == 0) {
strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] Please provide a valid queueEntry pointer to function put()", __FILE__, __LINE__);
return;
}
if (queueEntry->uniqueId == 0) {
strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] Please provide a valid queueEntry->uniqueId to function put()", __FILE__, __LINE__);
return;
}
if (*queueEntry->embeddedType == 0) {
strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] Please provide a valid queueEntry->embeddedType to function put()", __FILE__, __LINE__);
return;
}
strncpy0(embeddedType, queueEntry->embeddedType, QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
if (queueEntry->embeddedBlob.dataLen > 0 && queueEntry->embeddedBlob.data == 0) {
strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] Please provide a valid queueEntry->embeddedBlob to function put()", __FILE__, __LINE__);
return;
}
dbInfo = getDbInfo(queueP);
if (dbInfo->numOfEntries >= dbInfo->prop.maxNumOfEntries) {
strncpy0(exception->errorCode, "resource.overflow.queue.entries", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] The maximum number of queue entries = %d is exhausted", __FILE__, __LINE__, dbInfo->prop.maxNumOfEntries);
return;
}
if (dbInfo->numOfBytes >= dbInfo->prop.maxNumOfBytes) {
strncpy0(exception->errorCode, "resource.overflow.queue.bytes", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] The maximum queue size of %s bytes is exhausted", __FILE__, __LINE__, int64ToStr(int64Str, dbInfo->prop.maxNumOfBytes));
return;
}
if (dbInfo->pVm_put == 0) { /* Compile prepared query only once */
char queryString[LEN256]; /*INSERT INTO XB_ENTRIES VALUES ( 1081317015888000000, 'xmlBlaster_192_168_1_4_3412', 'topicStore_xmlBlaster_192_168_1_4_3412', 5, 'TOPIC_XML', 'T', 670, '\\254...')*/
SNPRINTF(queryString, LEN256, "INSERT INTO %.20sENTRIES VALUES ( ?, ?, ?, ?, ?, ?, ?)", dbInfo->prop.tablePrefix);
stateOk = compilePreparedQuery(queueP, "put", &dbInfo->pVm_put, queryString, exception);
}
if (stateOk) { /* set prepared statement tokens */
//TODO !!!!!!!!!!!!!!!!!!!
# ifdef SQLITE2
char intStr[INT64_STRLEN_MAX];
int index = 0;
const int len = -1; /* Calculated by sqlite_bind */
rc = SQLITE_OK;
int64ToStr(intStr, queueEntry->uniqueId);
/*LOG __FILE__, "put uniqueId as string '%s'", intStr);*/
if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, dbInfo->prop.queueName, len, false);
SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", queueEntry->priority);
if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, embeddedType, len, false);
if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, queueEntry->isPersistent?"T":"F", len, false);
SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", (int32_t)queueEntry->embeddedBlob.dataLen);
if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
if (rc == SQLITE_OK) {
/* As SQLite does only store strings we encode our blob to a string */
size_t estimatedSize = 2 +(257 * queueEntry->embeddedBlob.dataLen )/254;
unsigned char *out = (unsigned char *)malloc(estimatedSize*sizeof(char));
int encodedSize = sqlite_encode_binary((const unsigned char *)queueEntry->embeddedBlob.data,
(int)queueEntry->embeddedBlob.dataLen, out);
rc = sqlite_bind(dbInfo->pVm_put, ++index, (const char *)out, encodedSize+1, true);
free(out);
}
# endif // SQLITE2 code
if (rc != SQLITE_OK) {
LOG __FILE__, "put(%s) SQL error: %d %s", int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite3_errmsg(dbInfo->db));
strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] put(%s) SQL error: %d %s", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite3_errmsg(dbInfo->db));
stateOk = false;
}
}
if (stateOk) { /* start the query, process results */
int countRows = getResultRows(queueP, "put", dbInfo->pVm_put, 0, 0, false, exception);
stateOk = countRows >= 0;
}
if (stateOk) {
dbInfo->numOfEntries += 1;
dbInfo->numOfBytes += ((queueEntry->sizeInBytes > 0) ? queueEntry->sizeInBytes : queueEntry->embeddedBlob.dataLen);
}
LOG __FILE__, "put(%s) %s", int64ToStr(int64Str, queueEntry->uniqueId), stateOk ? "done" : "failed");
}
/**
* Compile a prepared query.
* No parameters are checked, they must be valid
* @param queueP The queue instance to use
* @param methodName A nice string for logging
* @param ppVm The virtual machine will be initialized if still 0
* @param queryString
* @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
* @return false on error and exception->errorCode is not null
*/
static bool compilePreparedQuery(I_Queue *queueP, const char *methodName,
xb_sqlite_stmt **ppVm, const char *queryString, ExceptionStruct *exception)
{
int iRetry, numRetry=100;
int rc = 0;
const char *pzTail = 0; /* OUT: uncompiled tail of zSql */
bool stateOk = true;
DbInfo *dbInfo = getDbInfo(queueP);
if (*ppVm == 0) { /* Compile prepared query */
for (iRetry = 0; iRetry < numRetry; iRetry++) {
rc = sqlite3_prepare(dbInfo->db, queryString, strlen(queryString), ppVm, &pzTail);
switch (rc) {
case SQLITE_BUSY:
if (iRetry == (numRetry-1)) {
strncpy0(exception->errorCode, "resource.db.block", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] SQL error #%d %s in %s(): %s", __FILE__, __LINE__, rc, sqlite3_errmsg(dbInfo->db), methodName);
}
LOG __FILE__, "%s() Sleeping as other thread holds DB %s", methodName, sqlite3_errmsg(dbInfo->db) );
sleepMillis(10);
break;
case SQLITE_OK:
iRetry = numRetry; /* We're done */
LOG __FILE__, "%s() Pre-compiled prepared query '%s'", methodName, queryString);
break;
default:
LOG __FILE__, "SQL error #%d %s in %s()", rc, sqlite3_errmsg(dbInfo->db), methodName);
strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] SQL error #%d %s in %s()", __FILE__, __LINE__, rc, sqlite3_errmsg(dbInfo->db), methodName);
iRetry = numRetry; /* We're done */
stateOk = false;
break;
}
}
}
if (*ppVm == 0) stateOk = false;
return stateOk;
}
/**
* For each SQL result row parse it into a QueueEntry.
* No parameters are checked, they must be valid
* Implements a ParseDataFp (function pointer)
* @param queueP The 'this' pointer
* @param currIndex
* @param userP
* @param pazValue
* @param pazColName
* @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
* @return false on error and exception->errorCode is not null
*/
static bool parseQueueEntryArr(I_Queue *queueP, size_t currIndex, void *userP,
const char **pazValue, const char **pazColName, ExceptionStruct *exception)
{
bool doContinue = true;
int numAssigned;
bool stateOk = true;
int decodeSize = 0;
QueueEntry *queueEntry = 0;
QueueEntryArr *queueEntryArr;
TmpHelper *helper = (TmpHelper*)userP;
QueueEntryArr **queueEntryArrPP = helper->queueEntryArrPP;
if (currIndex == 0) {
helper->currEntries = 0;
helper->currBytes = 0;
}
if (*queueEntryArrPP == 0) {
*queueEntryArrPP = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));;
if (helper->maxNumOfEntries == 0) {
doContinue = false;
return doContinue;
}
}
queueEntryArr = *queueEntryArrPP;
if (queueEntryArr->len == 0) {
queueEntryArr->len = 10;
queueEntryArr->queueEntryArr = (QueueEntry *)calloc(queueEntryArr->len, sizeof(QueueEntry));
}
else if (currIndex >= queueEntryArr->len) {
queueEntryArr->len += 10;
queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, queueEntryArr->len * sizeof(QueueEntry));
}
queueEntry = &queueEntryArr->queueEntryArr[currIndex];
memset(queueEntry, 0, sizeof(QueueEntry));
stateOk = strToInt64(&queueEntry->uniqueId, pazValue[XB_ENTRIES_DATA_ID]);
if (!stateOk) {
LOG __FILE__, "peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' to uniqueId, ignoring entry.", pazValue[XB_ENTRIES_DATA_ID]);
strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' col=%s to uniqueId, ignoring entry.", __FILE__, __LINE__, pazValue[XB_ENTRIES_DATA_ID], pazColName[XB_ENTRIES_DATA_ID]);
doContinue = false;
return doContinue;
}
LOG __FILE__, "peekWithSamePriority(%s) currIndex=%d", int64ToStr(int64Str, queueEntry->uniqueId), currIndex);
/* strncpy0(dbInfo->prop.queueName, pazValue[2], ID_MAX); */
numAssigned = sscanf(pazValue[XB_ENTRIES_PRIO], "%hd", &queueEntry->priority);
if (numAssigned != 1) {
LOG __FILE__, "peekWithSamePriority(%s) ERROR: Can't parse pazValue[XB_ENTRIES_PRIO] '%.20s' to priority, setting it to NORM", int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_PRIO]);
queueEntry->priority = 4;
}
strncpy0(queueEntry->embeddedType, pazValue[XB_ENTRIES_TYPE_NAME], QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
queueEntry->isPersistent = *pazValue[XB_ENTRIES_PERSISTENT] == 'T' ? true : false;
{
int64_t ival = 0;
stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_SIZE_IN_BYTES]);
queueEntry->embeddedBlob.dataLen = (size_t)ival;
}
/* TODO!!! in Java the length is the size in RAM and not strlen(data) */
/* queueEntry->embeddedBlob.data = (char *)malloc(queueEntry->embeddedBlob.dataLen*sizeof(char)); */
// TODO!!!!!!!!!!!!
# ifdef SQLITE2
queueEntry->embeddedBlob.data = (char *)malloc(strlen(pazValue[XB_ENTRIES_BLOB])*sizeof(char)); /* we spoil some 2 % */
decodeSize = sqlite_decode_binary((const unsigned char *)pazValue[XB_ENTRIES_BLOB], (unsigned char *)queueEntry->embeddedBlob.data);
if (decodeSize == -1 || (size_t)decodeSize != queueEntry->embeddedBlob.dataLen) {
*(queueEntry->embeddedBlob.data + strlen(pazValue[XB_ENTRIES_BLOB]) - 1) = 0;
LOG __FILE__, "peekWithSamePriority(%s) ERROR: Returned blob encoded='%s', decodeSize=%d"
" but expected decoded len=%d: '%s'",
int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_BLOB], decodeSize,
queueEntry->embeddedBlob.dataLen, queueEntry->embeddedBlob.data);
}
# endif
helper->currEntries += 1;
helper->currBytes += queueEntry->embeddedBlob.dataLen;
/* Limit the number of entries */
if ((helper->maxNumOfEntries != -1 && helper->currEntries >= helper->maxNumOfEntries) ||
(helper->maxNumOfBytes != -1 && helper->currBytes >= helper->maxNumOfBytes)) {
/* sqlite_interrupt(dbInfo->db); -> sets rc==SQLITE_ERROR on next sqlite-step() which i can't distinguish from a real error */
doContinue = false;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -