📄 sqlitequeue.c
字号:
tablePrefix); retOk = execSilent(queueP, queryString, "Creating ENTRIES table", exception); SNPRINTF(queryString, LEN512, "CREATE INDEX %.20sENTRIES_IDX ON %.20sENTRIES (prio);", tablePrefix, tablePrefix); retOk = execSilent(queueP, queryString, "Creating PRIO index", exception); return retOk;}/** * Invoke SQL query. * @param queueP Is not checked, must not be 0 * @param queryString The SQL to execute * @param comment For logging or exception text * @param exception Can contain error information (out parameter) * @return true on success */static bool execSilent(I_Queue *queueP, const char *queryString, const char *comment, ExceptionStruct *exception){ int rc = 0; char *errMsg = 0; bool retOk; DbInfo *dbInfo = getDbInfo(queueP); rc = sqlite_exec(dbInfo->db, queryString, NULL, NULL, &errMsg); switch (rc) { case SQLITE_OK: LOG __FILE__, "SQL '%s' success", comment); retOk = true; break; default: if (errMsg && strstr(errMsg, "already exists")) { LOG __FILE__, "OK, '%s' [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg); retOk = true; } else if (rc == SQLITE_CONSTRAINT && errMsg && strstr(errMsg, " not unique")) { LOG __FILE__, "OK, '%s' entry existed already [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg); retOk = true; } else { LOG __FILE__, "SQL error '%s' [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg); strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] SQL error '%s' [%d]: %s %s", __FILE__, __LINE__, comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg); retOk = false; } break; } if (errMsg != 0) sqlite_freemem(errMsg); return retOk;}/* * This is the callback routine that the SQLite library * invokes for each row of a query result.static int callback(void *pArg, int nArg, char **azArg, char **azCol){ int i; struct callback_data *p = (struct callback_data*)pArg; int w = 5; if (p==0) {} // Suppress compiler warning if( azArg==0 ) return 0; for(i=0; i<nArg; i++){ int len = strlen(azCol[i]); if( len>w ) w = len; } printf("\n"); for(i=0; i<nArg; i++){ printf("%*s = %s\n", w, azCol[i], azArg[i] ? azArg[i] : "NULL"); } return 0;}*//** * @param queueP The queue instance * @param queueEntry The entry * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0 */static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception){ int rc = 0; bool stateOk = true; DbInfo *dbInfo; char embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN]; /* To protect against buffer overflow */ if (checkArgs(queueP, "put", true, exception) == false ) return; if (queueEntry == 0) { strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] Please provide a valid queueEntry pointer to function put()", __FILE__, __LINE__); return; } if (queueEntry->uniqueId == 0) { strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] Please provide a valid queueEntry->uniqueId to function put()", __FILE__, __LINE__); return; } if (*queueEntry->embeddedType == 0) { strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] Please provide a valid queueEntry->embeddedType to function put()", __FILE__, __LINE__); return; } strncpy0(embeddedType, queueEntry->embeddedType, QUEUE_ENTRY_EMBEDDEDTYPE_LEN); if (queueEntry->embeddedBlob.dataLen > 0 && queueEntry->embeddedBlob.data == 0) { strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] Please provide a valid queueEntry->embeddedBlob to function put()", __FILE__, __LINE__); return; } dbInfo = getDbInfo(queueP); if ((int64_t)dbInfo->numOfEntries >= dbInfo->prop.maxNumOfEntries) { strncpy0(exception->errorCode, "resource.overflow.queue.entries", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] The maximum number of queue entries = %d is exhausted", __FILE__, __LINE__, dbInfo->prop.maxNumOfEntries); return; } if (dbInfo->numOfBytes >= dbInfo->prop.maxNumOfBytes) { strncpy0(exception->errorCode, "resource.overflow.queue.bytes", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] The maximum queue size of %s bytes is exhausted", __FILE__, __LINE__, int64ToStr(int64Str, dbInfo->prop.maxNumOfBytes)); return; } if (dbInfo->pVm_put == 0) { /* Compile prepared query only once */ char queryString[LEN256]; /*INSERT INTO XB_ENTRIES VALUES ( 1081317015888000000, 'xmlBlaster_192_168_1_4_3412', 'topicStore_xmlBlaster_192_168_1_4_3412', 5, 'TOPIC_XML', 'T', 670, '\\254...')*/ SNPRINTF(queryString, LEN256, "INSERT INTO %.20sENTRIES VALUES ( ?, ?, ?, ?, ?, ?, ?)", dbInfo->prop.tablePrefix); stateOk = compilePreparedQuery(queueP, "put", &dbInfo->pVm_put, queryString, exception); } if (stateOk) { /* set prepared statement tokens */ char intStr[INT64_STRLEN_MAX]; int index = 0; const int len = -1; /* Calculated by sqlite_bind */ rc = SQLITE_OK; int64ToStr(intStr, queueEntry->uniqueId); /*LOG __FILE__, "put uniqueId as string '%s'", intStr);*/ if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true); if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, dbInfo->prop.queueName, len, false); SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", queueEntry->priority); if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true); if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, embeddedType, len, false); if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, queueEntry->isPersistent?"T":"F", len, false); SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", (int32_t)queueEntry->embeddedBlob.dataLen); if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true); if (rc == SQLITE_OK) { /* As SQLite does only store strings we encode our blob to a string */ size_t estimatedSize = 2 +(257 * queueEntry->embeddedBlob.dataLen )/254; unsigned char *out = (unsigned char *)malloc(estimatedSize*sizeof(char)); int encodedSize = sqlite_encode_binary((const unsigned char *)queueEntry->embeddedBlob.data, (int)queueEntry->embeddedBlob.dataLen, out); rc = sqlite_bind(dbInfo->pVm_put, ++index, (const char *)out, encodedSize+1, true); free(out); } if (rc != SQLITE_OK) { LOG __FILE__, "put(%s) SQL error: %d %s", int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite_error_string(rc)); strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] put(%s) SQL error: %d %s", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite_error_string(rc)); stateOk = false; } } if (stateOk) { /* start the query, process results */ int countRows = getResultRows(queueP, "put", dbInfo->pVm_put, 0, 0, false, exception); stateOk = countRows >= 0; } if (stateOk) { dbInfo->numOfEntries += 1; dbInfo->numOfBytes += ((queueEntry->sizeInBytes > 0) ? queueEntry->sizeInBytes : queueEntry->embeddedBlob.dataLen); } LOG __FILE__, "put(%s) %s", int64ToStr(int64Str, queueEntry->uniqueId), stateOk ? "done" : "failed");}/** * Compile a prepared query. * No parameters are checked, they must be valid * @param queueP The queue instance to use * @param methodName A nice string for logging * @param ppVm The virtual machine will be initialized if still 0 * @param queryString * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0 * @return false on error and exception->errorCode is not null */static bool compilePreparedQuery(I_Queue *queueP, const char *methodName, sqlite_vm **ppVm, const char *queryString, ExceptionStruct *exception){ int iRetry, numRetry=100; char *errMsg = 0; int rc = 0; const char *pzTail = 0; /* OUT: uncompiled tail of zSql */ bool stateOk = true; DbInfo *dbInfo = getDbInfo(queueP); if (*ppVm == 0) { /* Compile prepared query */ for (iRetry = 0; iRetry < numRetry; iRetry++) { rc = sqlite_compile(dbInfo->db, queryString, &pzTail, ppVm, &errMsg); switch (rc) { case SQLITE_BUSY: if (iRetry == (numRetry-1)) { strncpy0(exception->errorCode, "resource.db.block", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] SQL error #%d in %s(): %s %s", __FILE__, __LINE__, rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg); } LOG __FILE__, "%s() Sleeping as other thread holds DB %s", methodName, (errMsg==0)?"":errMsg); if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; } sleepMillis(10); break; case SQLITE_OK: iRetry = numRetry; /* We're done */ LOG __FILE__, "%s() Pre-compiled prepared query '%s'", methodName, queryString); if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; } break; default: LOG __FILE__, "SQL error #%d %s in %s(): %s: %s", rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg); strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] SQL error #%d %s in %s(): %s", __FILE__, __LINE__, rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg); iRetry = numRetry; /* We're done */ if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; } stateOk = false; break; } } } if (*ppVm == 0) stateOk = false; return stateOk;}/** * For each SQL result row parse it into a QueueEntry. * No parameters are checked, they must be valid * Implements a ParseDataFp (function pointer) * @param queueP The 'this' pointer * @param currIndex * @param userP * @param pazValue * @param pazColName * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0 * @return false on error and exception->errorCode is not null */static bool parseQueueEntryArr(I_Queue *queueP, size_t currIndex, void *userP, const char **pazValue, const char **pazColName, ExceptionStruct *exception){ bool doContinue = true; int numAssigned; bool stateOk = true; int decodeSize = 0; QueueEntry *queueEntry = 0; QueueEntryArr *queueEntryArr; TmpHelper *helper = (TmpHelper*)userP; QueueEntryArr **queueEntryArrPP = helper->queueEntryArrPP; if (currIndex == 0) { helper->currEntries = 0; helper->currBytes = 0; } if (*queueEntryArrPP == 0) { *queueEntryArrPP = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));; if (helper->maxNumOfEntries == 0) { doContinue = false; return doContinue; } } queueEntryArr = *queueEntryArrPP; if (queueEntryArr->len == 0) { queueEntryArr->len = 10; queueEntryArr->queueEntryArr = (QueueEntry *)calloc(queueEntryArr->len, sizeof(QueueEntry)); } else if (currIndex >= queueEntryArr->len) { queueEntryArr->len += 10; queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, queueEntryArr->len * sizeof(QueueEntry)); } queueEntry = &queueEntryArr->queueEntryArr[currIndex]; memset(queueEntry, 0, sizeof(QueueEntry)); stateOk = strToInt64(&queueEntry->uniqueId, pazValue[XB_ENTRIES_DATA_ID]); if (!stateOk) { LOG __FILE__, "peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' to uniqueId, ignoring entry.", pazValue[XB_ENTRIES_DATA_ID]); strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' col=%s to uniqueId, ignoring entry.", __FILE__, __LINE__, pazValue[XB_ENTRIES_DATA_ID], pazColName[XB_ENTRIES_DATA_ID]); doContinue = false; return doContinue; } LOG __FILE__, "peekWithSamePriority(%s) currIndex=%d", int64ToStr(int64Str, queueEntry->uniqueId), currIndex); /* strncpy0(dbInfo->prop.queueName, pazValue[2], ID_MAX); */ numAssigned = sscanf(pazValue[XB_ENTRIES_PRIO], "%hd", &queueEntry->priority); if (numAssigned != 1) { LOG __FILE__, "peekWithSamePriority(%s) ERROR: Can't parse pazValue[XB_ENTRIES_PRIO] '%.20s' to priority, setting it to NORM", int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_PRIO]); queueEntry->priority = 4; } strncpy0(queueEntry->embeddedType, pazValue[XB_ENTRIES_TYPE_NAME], QUEUE_ENTRY_EMBEDDEDTYPE_LEN); queueEntry->isPersistent = *pazValue[XB_ENTRIES_PERSISTENT] == 'T' ? true : false; { int64_t ival = 0; stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_SIZE_IN_BYTES]); queueEntry->embeddedBlob.dataLen = (size_t)ival; } /* TODO!!! in Java the length is the size in RAM and not strlen(data) */ /* queueEntry->embeddedBlob.data = (char *)malloc(queueEntry->embeddedBlob.dataLen*sizeof(char)); */ queueEntry->embeddedBlob.data = (char *)malloc(strlen(pazValue[XB_ENTRIES_BLOB])*sizeof(char)); /* we spoil some 2 % */ decodeSize = sqlite_decode_binary((const unsigned char *)pazValue[XB_ENTRIES_BLOB], (unsigned char *)queueEntry->embeddedBlob.data); if (decodeSize == -1 || (size_t)decodeSize != queueEntry->embeddedBlob.dataLen) { *(queueEntry->embeddedBlob.data + strlen(pazValue[XB_ENTRIES_BLOB]) - 1) = 0; LOG __FILE__, "peekWithSamePriority(%s) ERROR: Returned blob encoded='%s', decodeSize=%d" " but expected decoded len=%d: '%s'", int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_BLOB], decodeSize, queueEntry->embeddedBlob.dataLen, queueEntry->embeddedBlob.data); } helper->currEntries += 1; helper->currBytes += queueEntry->embeddedBlob.dataLen; /* Limit the number of entries */ if ((helper->maxNumOfEntries != -1 && helper->currEntries >= helper->maxNumOfEntries) || (helper->maxNumOfBytes != -1 && helper->currBytes >= helper->maxNumOfBytes)) { /* sqlite_interrupt(dbInfo->db); -> sets rc==SQLITE_ERROR on next sqlite-step() which i can't distinguish from a real error */ doContinue = false; } return doContinue;}/** * Execute the query and get the query result. * No parameters are checked, they must be valid * @param queueP The this pointer * @param methodName The method called * @param pVm sqlite virtual machine
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -