📄 sqlitequeue.c
字号:
* @param parseDataFp The function which is called for each SQL result row * or 0 if no function shall be called * @param userP The pointer which is passed to parseDataFp * @param finalize true to call sqlite_finalize which deletes the virtual machine, * false to call sqlite_reset to reuse the prepared query * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0 * @return < 0 on error and exception->errorCode is not null * otherwise the number of successfully parsed rows is returned * @todo For INSERT and DELETE return the number of touched entries !!! */static int32_t getResultRows(I_Queue *queueP, const char *methodName, sqlite_vm *pVm, ParseDataFp parseDataFp, void *userP, bool finalize, ExceptionStruct *exception){ char *errMsg = 0; int32_t currIndex = 0; int numCol = 0; const char **pazValue = 0; const char **pazColName = 0; bool done = false; bool stateOk = true; int rc; while (!done) { rc = sqlite_step(pVm, &numCol, &pazValue, &pazColName); switch( rc ){ case SQLITE_DONE: done = true; break; case SQLITE_BUSY: LOG __FILE__, "%s() Sleeping as other thread holds DB.", methodName); sleepMillis(10); break; case SQLITE_ROW: { bool doContinue = true; if (parseDataFp) { /* @return true->to continue, false->to break execution or on error exception->errorCode is not null */ doContinue = parseDataFp(queueP, currIndex, userP, pazValue, pazColName, exception); stateOk = *exception->errorCode == 0; } else { /* printf("RESULT[%d]\n", iRow); for (iCol = 0; iCol < numCol; iCol++) { printf("%10.10s = %s\n", pazColName[iCol], pazValue[iCol]); } */ } currIndex++; if (!stateOk || !doContinue) done = true; } break; case SQLITE_ERROR: /* If exists already */ LOG __FILE__, "%s() SQL execution problem [sqlCode=%d], entry already exists", methodName, rc); done = true; stateOk = false; break; case SQLITE_MISUSE: default: LOG __FILE__, "%s() SQL execution problem [sqlCode=%d %s]", methodName, rc, sqlite_error_string(rc)); done = true; stateOk = false; break; } } LOG __FILE__, "%s() Processed %lu entries.", methodName, (unsigned long)currIndex); if (finalize) { sqlite_finalize(pVm, &errMsg); if (rc != SQLITE_OK && rc != SQLITE_DONE) { LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled. %s", rc, sqlite_error_string(rc), errMsg==0?"":errMsg); } if (errMsg != 0) sqlite_freemem(errMsg); } else { /* Reset prepared statement */ rc = sqlite_reset(pVm, &errMsg); if (rc == SQLITE_SCHEMA) { LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled %s", rc, sqlite_error_string(rc), errMsg==0?"":errMsg); } if (errMsg != 0) sqlite_freemem(errMsg); } return stateOk ? currIndex : (-1)*rc;}/** * Access queue entries without removing them. */static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception){ int rc = 0; bool stateOk = true; DbInfo *dbInfo; QueueEntryArr *queueEntryArr = 0; if (checkArgs(queueP, "peekWithSamePriority", true, exception) == false ) return 0; LOG __FILE__, "peekWithSamePriority(maxNumOfEntries=%d, maxNumOfBytes=%s) ...", (int)maxNumOfEntries, int64ToStr(int64Str, maxNumOfBytes)); dbInfo = getDbInfo(queueP); if (dbInfo->pVm_peekWithSamePriority == 0) { /* Compile prepared query */ char queryString[LEN512]; /*"SELECT * FROM XB_ENTRIES where queueName='connection_clientJoe' and prio=(select max(prio) from XB_ENTRIES where queueName='connection_clientJoe') ORDER BY dataId ASC";*/ SNPRINTF(queryString, LEN512, "SELECT * FROM %.20sENTRIES where queueName=?" " and prio=(select max(prio) from %.20sENTRIES where queueName=?)" " ORDER BY dataId ASC", dbInfo->prop.tablePrefix, dbInfo->prop.tablePrefix); stateOk = compilePreparedQuery(queueP, "peekWithSamePriority", &dbInfo->pVm_peekWithSamePriority , queryString, exception); } if (stateOk) { /* set prepared statement tokens */ int index = 0; int len = -1; /* Calculated by sqlite_bind */ rc = SQLITE_OK; if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false); if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false); switch (rc) { case SQLITE_OK: LOG __FILE__, "peekWithSamePriority() Bound to prepared statement [sqlCode=%d]", rc); break; default: LOG __FILE__, "peekWithSamePriority() SQL error: %d %s", rc, sqlite_error_string(rc)); strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] peekWithSamePriority() SQL error: %d %s", __FILE__, __LINE__, rc, sqlite_error_string(rc)); stateOk = false; break; } } if (stateOk) { /* start the query */ TmpHelper helper; int32_t currIndex = 0; helper.queueEntryArrPP = &queueEntryArr; helper.maxNumOfEntries = maxNumOfEntries; helper.maxNumOfBytes = maxNumOfBytes; currIndex = getResultRows(queueP, "peekWithSamePriority", dbInfo->pVm_peekWithSamePriority, parseQueueEntryArr, &helper, false, exception); stateOk = currIndex >= 0; if (!stateOk) { if (queueEntryArr) { free(queueEntryArr->queueEntryArr); queueEntryArr->len = 0; } } else { if (!queueEntryArr) queueEntryArr = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr)); else if ((size_t)currIndex < queueEntryArr->len) { queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, currIndex * sizeof(QueueEntry)); queueEntryArr->len = currIndex; } } } LOG __FILE__, "peekWithSamePriority() %s", stateOk ? "done" : "failed"); return queueEntryArr;}/** * Removes the given entries from persistence. * @return The number of removed entries */static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception){ bool stateOk = true; int64_t numOfBytes = 0; int32_t countDeleted = 0; sqlite_vm *pVm = 0; DbInfo *dbInfo; if (checkArgs(queueP, "randomRemove", true, exception) == false || queueEntryArr == 0 || queueEntryArr->len == 0 || queueEntryArr->queueEntryArr == 0) return 0; LOG __FILE__, "randomRemove(%d) ...", (int)queueEntryArr->len); dbInfo = getDbInfo(queueP); { size_t i; const size_t qLen = 128 + 2*ID_MAX + queueEntryArr->len*(INT64_STRLEN_MAX+6); char *queryString = (char *)calloc(qLen, sizeof(char)); /* DELETE FROM xb_entries WHERE queueName = 'connection_clientJoe' AND dataId in ( 1081492136876000000, 1081492136856000000 ); */ SNPRINTF(queryString, qLen, "DELETE FROM %.20sENTRIES WHERE queueName='%s'" " AND dataId in ( ", dbInfo->prop.tablePrefix, dbInfo->prop.queueName); for (i=0; i<queueEntryArr->len; i++) { strcat(queryString, int64ToStr(int64Str, queueEntryArr->queueEntryArr[i].uniqueId)); if (i<(queueEntryArr->len-1)) strcat(queryString, ","); numOfBytes += ((queueEntryArr->queueEntryArr[i].sizeInBytes > 0) ? queueEntryArr->queueEntryArr[i].sizeInBytes : queueEntryArr->queueEntryArr[i].embeddedBlob.dataLen); } strcat(queryString, " )"); stateOk = compilePreparedQuery(queueP, "randomRemove", &pVm, queryString, exception); free(queryString); } if (stateOk) { /* start the query */ int32_t currIndex = getResultRows(queueP, "randomRemove", pVm, 0, 0, true, exception); stateOk = currIndex >= 0; } if (stateOk) { countDeleted = (int32_t)sqlite_last_statement_changes(dbInfo->db); if (countDeleted < 0 || (size_t)countDeleted != queueEntryArr->len) { fillCache(queueP, exception); /* calculate numOfBytes again */ } else { dbInfo->numOfEntries -= queueEntryArr->len; dbInfo->numOfBytes -= numOfBytes; } } return countDeleted;}/** * Destroy all entries in queue and releases all resources in memory and on HD. */static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception){ bool stateOk = true; I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP; if (checkArgs(queueP, "destroy", false, exception) == false ) return false; shutdownInternal(queuePP, exception); { DbInfo *dbInfo = getDbInfo(queueP); const char *dbName = dbInfo->prop.dbName; stateOk = unlink(dbName) == 0; /* Delete old db file */ if (!stateOk) { strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] destroy() ERROR: Can't destroy database '%s', errno=%d.", __FILE__, __LINE__, dbName, errno); } } freeQueue(queuePP); return stateOk;}/** * Destroy all entries in queue. */static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception){ int stateOk = true; char queryString[LEN256]; sqlite_vm *pVm = 0; DbInfo *dbInfo; if (checkArgs(queueP, "clear", true, exception) == false) return false; dbInfo = getDbInfo(queueP); SNPRINTF(queryString, LEN256, "DELETE FROM %.20sENTRIES", dbInfo->prop.tablePrefix); stateOk = compilePreparedQuery(queueP, "clear", &pVm, queryString, exception); if (stateOk) { int32_t currIndex = getResultRows(queueP, "clear", pVm, 0, 0, true, exception); stateOk = currIndex >= 0; } if (stateOk) { dbInfo->numOfEntries = 0; dbInfo->numOfBytes = 0; } LOG __FILE__, "clear() done"); return stateOk;}/** * Parse response of "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'", */static bool parseCacheInfo(I_Queue *queueP, size_t currIndex, void *userP, const char **pazValue, const char **pazColName, ExceptionStruct *exception){ int64_t ival = 0; bool stateOk; DbInfo *dbInfo = getDbInfo(queueP); stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_DATA_ID]); if (!stateOk) { strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfEntries, ignoring entry.", __FILE__, __LINE__, pazColName[XB_ENTRIES_DATA_ID], pazValue[XB_ENTRIES_DATA_ID]); return false; } dbInfo->numOfEntries = (int32_t)ival; stateOk = strToInt64(&dbInfo->numOfBytes, pazValue[1]); if (!stateOk) { strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfBytes, ignoring entry.", __FILE__, __LINE__, pazColName[1], pazValue[1]); if (currIndex) {} /* Just to avoid compiler warning about unused variable */ if (userP) {}; return false; } return true;}/** * Reload cached information from database. * @param queueP The this pointer * @param exception Returns error * @return false on error */static bool fillCache(I_Queue *queueP, ExceptionStruct *exception){ bool stateOk = true; DbInfo *dbInfo = 0; char queryString[LEN512]; /* "SELECT count(dataId) FROM XB_ENTRIES where queueName='connection_clientJoe'" */ if (checkArgs(queueP, "fillCache", true, exception) == false ) return true; dbInfo = getDbInfo(queueP); SNPRINTF(queryString, LEN512, "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'", dbInfo->prop.tablePrefix, dbInfo->prop.queueName); stateOk = compilePreparedQuery(queueP, "fillCache", &dbInfo->pVm_fillCache, queryString, exception); if (stateOk) { /* start the query, calls parseCacheInfo() */ int32_t currIndex = getResultRows(queueP, "fillCache", dbInfo->pVm_fillCache, parseCacheInfo, 0, false, exception); stateOk = currIndex > 0; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -