📄 sqlite3queue.c.todo
字号:
return doContinue;
}
/**
* Execute the query and get the query result.
* No parameters are checked, they must be valid
* @param queueP The this pointer
* @param methodName The method called
* @param pVm sqlite virtual machine
* @param parseDataFp The function which is called for each SQL result row
* or 0 if no function shall be called
* @param userP The pointer which is passed to parseDataFp
* @param finalize true to call sqlite_finalize which deletes the virtual machine,
* false to call sqlite_reset to reuse the prepared query
* @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
* @return < 0 on error and exception->errorCode is not null
* otherwise the number of successfully parsed rows is returned
* @todo For INSERT and DELETE return the number of touched entries !!!
*/
static int32_t getResultRows(I_Queue *queueP, const char *methodName,
xb_sqlite_stmt *pVm,
ParseDataFp parseDataFp, void *userP,
bool finalize,
ExceptionStruct *exception)
{
int32_t currIndex = 0;
int numCol = 0;
const char **pazValue = 0;
const char **pazColName = 0;
bool done = false;
bool stateOk = true;
int rc;
while (!done) {
rc = sqlite3_step(pVm);
// TODO
# ifdef SQLITE2
rc = sqlite_step(pVm, &numCol, &pazValue, &pazColName);
switch( rc ){
case SQLITE_DONE:
done = true;
break;
case SQLITE_BUSY:
LOG __FILE__, "%s() Sleeping as other thread holds DB.", methodName);
sleepMillis(10);
break;
case SQLITE_ROW:
{
bool doContinue = true;
if (parseDataFp) {
/* @return true->to continue, false->to break execution or on error exception->errorCode is not null */
doContinue = parseDataFp(queueP, currIndex, userP, pazValue, pazColName, exception);
stateOk = *exception->errorCode == 0;
}
else {
/*
printf("RESULT[%d]\n", iRow);
for (iCol = 0; iCol < numCol; iCol++) {
printf("%10.10s = %s\n", pazColName[iCol], pazValue[iCol]);
}
*/
}
currIndex++;
if (!stateOk || !doContinue) done = true;
}
break;
case SQLITE_ERROR: /* If exists already */
LOG __FILE__, "%s() SQL execution problem [sqlCode=%d], entry already exists", methodName, rc);
done = true;
stateOk = false;
break;
case SQLITE_MISUSE:
default:
LOG __FILE__, "%s() SQL execution problem [sqlCode=%d %s]", methodName, rc, sqlite_error_string(rc));
done = true;
stateOk = false;
break;
}
# endif // end of SQLITE2 code
}
LOG __FILE__, "%s() Processed %lu entries.", methodName, (unsigned long)currIndex);
if (finalize) {
sqlite3_finalize(pVm);
if (rc != SQLITE_OK && rc != SQLITE_DONE) {
/* LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled.", rc, sqlite_errmsg( )); */
LOG __FILE__, "WARN: getResultRows() sqlCode=%d is not handled.", rc );
}
}
else { /* Reset prepared statement */
rc = sqlite3_reset(pVm);
if (rc == SQLITE_SCHEMA) {
/* LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled", rc, sqlite_error_string(rc) ); */
LOG __FILE__, "WARN: getResultRows() sqlCode=%d is not handled", rc );
}
}
return stateOk ? currIndex : (-1)*rc;
}
/**
* Access queue entries without removing them.
*/
static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception)
{
int rc = 0;
bool stateOk = true;
DbInfo *dbInfo;
QueueEntryArr *queueEntryArr = 0;
if (checkArgs(queueP, "peekWithSamePriority", true, exception) == false ) return 0;
LOG __FILE__, "peekWithSamePriority(maxNumOfEntries=%d, maxNumOfBytes=%s) ...", (int)maxNumOfEntries, int64ToStr(int64Str, maxNumOfBytes));
dbInfo = getDbInfo(queueP);
if (dbInfo->pVm_peekWithSamePriority == 0) { /* Compile prepared query */
char queryString[LEN512];
/*"SELECT * FROM XB_ENTRIES where queueName='connection_clientJoe' and prio=(select max(prio) from XB_ENTRIES where queueName='connection_clientJoe') ORDER BY dataId ASC";*/
SNPRINTF(queryString, LEN512,
"SELECT * FROM %.20sENTRIES where queueName=?"
" and prio=(select max(prio) from %.20sENTRIES where queueName=?)"
" ORDER BY dataId ASC",
dbInfo->prop.tablePrefix, dbInfo->prop.tablePrefix);
stateOk = compilePreparedQuery(queueP, "peekWithSamePriority",
&dbInfo->pVm_peekWithSamePriority , queryString, exception);
}
if (stateOk) { /* set prepared statement tokens */
int index = 0;
int len = -1; /* Calculated by sqlite_bind */
rc = SQLITE_OK;
// TODO !!!!!!!!!!
# ifdef SQLITE2
if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false);
if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false);
# endif
switch (rc) {
case SQLITE_OK:
LOG __FILE__, "peekWithSamePriority() Bound to prepared statement [sqlCode=%d]", rc);
break;
default:
LOG __FILE__, "peekWithSamePriority() SQL error: %d %s", rc, sqlite3_errmsg(dbInfo->db));
strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] peekWithSamePriority() SQL error: %d %s", __FILE__, __LINE__, rc, sqlite3_errmsg(dbInfo->db));
stateOk = false;
break;
}
}
if (stateOk) { /* start the query */
TmpHelper helper;
int32_t currIndex = 0;
helper.queueEntryArrPP = &queueEntryArr;
helper.maxNumOfEntries = maxNumOfEntries;
helper.maxNumOfBytes = maxNumOfBytes;
currIndex = getResultRows(queueP, "peekWithSamePriority",
dbInfo->pVm_peekWithSamePriority, parseQueueEntryArr,
&helper, false, exception);
stateOk = currIndex >= 0;
if (!stateOk) {
if (queueEntryArr) {
free(queueEntryArr->queueEntryArr);
queueEntryArr->len = 0;
}
}
else {
if (!queueEntryArr)
queueEntryArr = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));
else if ((size_t)currIndex < queueEntryArr->len) {
queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, currIndex * sizeof(QueueEntry));
queueEntryArr->len = currIndex;
}
}
}
LOG __FILE__, "peekWithSamePriority() %s", stateOk ? "done" : "failed");
return queueEntryArr;
}
/**
* Removes the given entries from persistence.
* @return The number of removed entries
*/
static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception)
{
bool stateOk = true;
int64_t numOfBytes = 0;
int32_t countDeleted = 0;
xb_sqlite_stmt *pVm = 0;
DbInfo *dbInfo;
if (checkArgs(queueP, "randomRemove", true, exception) == false || queueEntryArr == 0 ||
queueEntryArr->len == 0 || queueEntryArr->queueEntryArr == 0)
return 0;
LOG __FILE__, "randomRemove(%d) ...", (int)queueEntryArr->len);
dbInfo = getDbInfo(queueP);
{
size_t i;
const size_t qLen = 128 + 2*ID_MAX + queueEntryArr->len*(INT64_STRLEN_MAX+6);
char *queryString = (char *)calloc(qLen, sizeof(char));
/* DELETE FROM xb_entries WHERE queueName = 'connection_clientJoe' AND dataId in ( 1081492136876000000, 1081492136856000000 ); */
SNPRINTF(queryString, qLen,
"DELETE FROM %.20sENTRIES WHERE queueName='%s'"
" AND dataId in ( ",
dbInfo->prop.tablePrefix, dbInfo->prop.queueName);
for (i=0; i<queueEntryArr->len; i++) {
strcat(queryString, int64ToStr(int64Str, queueEntryArr->queueEntryArr[i].uniqueId));
if (i<(queueEntryArr->len-1)) strcat(queryString, ",");
numOfBytes += ((queueEntryArr->queueEntryArr[i].sizeInBytes > 0) ? queueEntryArr->queueEntryArr[i].sizeInBytes : queueEntryArr->queueEntryArr[i].embeddedBlob.dataLen);
}
strcat(queryString, " )");
stateOk = compilePreparedQuery(queueP, "randomRemove", &pVm, queryString, exception);
free(queryString);
}
if (stateOk) { /* start the query */
int32_t currIndex = getResultRows(queueP, "randomRemove",
pVm, 0, 0, true, exception);
stateOk = currIndex >= 0;
}
if (stateOk) {
countDeleted = (int32_t)sqlite3_changes(dbInfo->db); // This function returns the number of database rows that were changed (or inserted or deleted) by the most recently completed
// INSERT, UPDATE, or DELETE statement.
// Only changes that are directly specified by the INSERT, UPDATE, or DELETE statement are counted.
// Auxiliary changes caused by triggers are not counted.
// Use the sqlite3_total_changes() function to find the total number of changes including changes caused by triggers.
if (countDeleted < 0 || (size_t)countDeleted != queueEntryArr->len) {
fillCache(queueP, exception); /* calculate numOfBytes again */
}
else {
dbInfo->numOfEntries -= queueEntryArr->len;
dbInfo->numOfBytes -= numOfBytes;
}
}
return countDeleted;
}
/**
* Destroy all entries in queue and releases all resources in memory and on HD.
*/
static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception)
{
bool stateOk = true;
I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
if (checkArgs(queueP, "destroy", false, exception) == false ) return false;
shutdownInternal(queuePP, exception);
{
DbInfo *dbInfo = getDbInfo(queueP);
const char *dbName = dbInfo->prop.dbName;
stateOk = unlink(dbName) == 0; /* Delete old db file */
if (!stateOk) {
strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] destroy() ERROR: Can't destroy database '%s', errno=%d.", __FILE__, __LINE__, dbName, errno);
}
}
freeQueue(queuePP);
return stateOk;
}
/**
* Destroy all entries in queue.
*/
static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception)
{
int stateOk = true;
char queryString[LEN256];
xb_sqlite_stmt *pVm = 0;
DbInfo *dbInfo;
if (checkArgs(queueP, "clear", true, exception) == false) return false;
dbInfo = getDbInfo(queueP);
SNPRINTF(queryString, LEN256, "DELETE FROM %.20sENTRIES", dbInfo->prop.tablePrefix);
stateOk = compilePreparedQuery(queueP, "clear", &pVm, queryString, exception);
if (stateOk) {
int32_t currIndex = getResultRows(queueP, "clear", pVm, 0, 0, true, exception);
stateOk = currIndex >= 0;
}
if (stateOk) {
dbInfo->numOfEntries = 0;
dbInfo->numOfBytes = 0;
}
LOG __FILE__, "clear() done");
return stateOk;
}
/**
* Parse response of "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'",
*/
static bool parseCacheInfo(I_Queue *queueP, size_t currIndex, void *userP,
const char **pazValue, const char **pazColName, ExceptionStruct *exception)
{
int64_t ival = 0;
bool stateOk;
DbInfo *dbInfo = getDbInfo(queueP);
stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_DATA_ID]);
if (!stateOk) {
strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfEntries, ignoring entry.", __FILE__, __LINE__, pazColName[XB_ENTRIES_DATA_ID], pazValue[XB_ENTRIES_DATA_ID]);
return false;
}
dbInfo->numOfEntries = (int32_t)ival;
stateOk = strToInt64(&dbInfo->numOfBytes, pazValue[1]);
if (!stateOk) {
strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfBytes, ignoring entry.", __FILE__, __LINE__, pazColName[1], pazValue[1]);
if (currIndex) {} /* Just to avoid compiler warning about unused variable */
if (userP) {};
return false;
}
return true;
}
/**
* Reload cached information from database.
* @param queueP The this pointer
* @param exception Returns error
* @return false on error
*/
static bool fillCache(I_Queue *queueP, ExceptionStruct *exception)
{
bool stateOk = true;
DbInfo *dbInfo = 0;
char queryString[LEN512]; /* "SELECT count(dataId) FROM XB_ENTRIES where queueName='connection_clientJoe'" */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -