⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sqlitequeue.c

📁 java开源的企业总线.xmlBlaster
💻 C
📖 第 1 页 / 共 4 页
字号:
 * @param parseDataFp The function which is called for each SQL result row *                    or 0 if no function shall be called * @param userP The pointer which is passed to parseDataFp * @param finalize true to call sqlite_finalize which deletes the virtual machine, *                 false to call  sqlite_reset to reuse the prepared query * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0 * @return < 0 on error and exception->errorCode is not null *         otherwise the number of successfully parsed rows is returned * @todo For INSERT and DELETE return the number of touched entries !!! */static int32_t getResultRows(I_Queue *queueP, const char *methodName,                             sqlite_vm *pVm,                              ParseDataFp parseDataFp, void *userP,                             bool finalize,                             ExceptionStruct *exception){   char *errMsg = 0;   int32_t currIndex = 0;   int numCol = 0;   const char **pazValue = 0;   const char **pazColName = 0;   bool done = false;   bool stateOk = true;   int rc;   while (!done) {      rc = sqlite_step(pVm, &numCol, &pazValue, &pazColName);      switch( rc ){         case SQLITE_DONE:            done = true;         break;         case SQLITE_BUSY:            LOG __FILE__, "%s() Sleeping as other thread holds DB.", methodName);            sleepMillis(10);         break;         case SQLITE_ROW:         {            bool doContinue = true;            if (parseDataFp) {               /* @return true->to continue, false->to break execution or on error exception->errorCode is not null */               doContinue = parseDataFp(queueP, currIndex, userP, pazValue, pazColName, exception);               stateOk = *exception->errorCode == 0;            }            else {               /*               printf("RESULT[%d]\n", iRow);               for (iCol = 0; iCol < numCol; iCol++) {                  printf("%10.10s = %s\n", pazColName[iCol], pazValue[iCol]);               }               */            }            currIndex++;            if (!stateOk || !doContinue) done = true;         }         break;         case SQLITE_ERROR:   /* If exists already */            LOG __FILE__, "%s() SQL execution problem [sqlCode=%d], entry already exists", methodName, rc);            done = true;            stateOk = false;         break;         case SQLITE_MISUSE:         default:            LOG __FILE__, "%s() SQL execution problem [sqlCode=%d %s]", methodName, rc, sqlite_error_string(rc));            done = true;            stateOk = false;         break;      }   }   LOG __FILE__, "%s() Processed %lu entries.", methodName, (unsigned long)currIndex);   if (finalize) {      sqlite_finalize(pVm, &errMsg);      if (rc != SQLITE_OK && rc != SQLITE_DONE) {         LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled. %s", rc, sqlite_error_string(rc), errMsg==0?"":errMsg);      }      if (errMsg != 0) sqlite_freemem(errMsg);   }   else { /* Reset prepared statement */      rc = sqlite_reset(pVm, &errMsg);      if (rc == SQLITE_SCHEMA) {         LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled %s", rc, sqlite_error_string(rc), errMsg==0?"":errMsg);      }      if (errMsg != 0) sqlite_freemem(errMsg);   }   return stateOk ? currIndex : (-1)*rc;}/** * Access queue entries without removing them.  */static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception){   int rc = 0;   bool stateOk = true;   DbInfo *dbInfo;   QueueEntryArr *queueEntryArr = 0;   if (checkArgs(queueP, "peekWithSamePriority", true, exception) == false ) return 0;   LOG __FILE__, "peekWithSamePriority(maxNumOfEntries=%d, maxNumOfBytes=%s) ...", (int)maxNumOfEntries, int64ToStr(int64Str, maxNumOfBytes));   dbInfo = getDbInfo(queueP);   if (dbInfo->pVm_peekWithSamePriority == 0) {  /* Compile prepared  query */      char queryString[LEN512];      /*"SELECT * FROM XB_ENTRIES where queueName='connection_clientJoe' and prio=(select max(prio) from XB_ENTRIES where queueName='connection_clientJoe') ORDER BY dataId ASC";*/      SNPRINTF(queryString, LEN512,           "SELECT * FROM %.20sENTRIES where queueName=?"           " and prio=(select max(prio) from %.20sENTRIES where queueName=?)"           " ORDER BY dataId ASC",           dbInfo->prop.tablePrefix, dbInfo->prop.tablePrefix);      stateOk = compilePreparedQuery(queueP, "peekWithSamePriority",                    &dbInfo->pVm_peekWithSamePriority , queryString, exception);   }   if (stateOk) { /* set prepared statement tokens */      int index = 0;      int len = -1; /* Calculated by sqlite_bind */      rc = SQLITE_OK;      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false);      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false);      switch (rc) {         case SQLITE_OK:            LOG __FILE__, "peekWithSamePriority() Bound to prepared statement [sqlCode=%d]", rc);            break;         default:            LOG __FILE__, "peekWithSamePriority() SQL error: %d %s", rc, sqlite_error_string(rc));            strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);            SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,                     "[%.100s:%d] peekWithSamePriority() SQL error: %d %s", __FILE__, __LINE__, rc, sqlite_error_string(rc));            stateOk = false;            break;      }   }   if (stateOk) { /* start the query */      TmpHelper helper;      int32_t currIndex = 0;      helper.queueEntryArrPP = &queueEntryArr;      helper.maxNumOfEntries = maxNumOfEntries;      helper.maxNumOfBytes = maxNumOfBytes;      currIndex = getResultRows(queueP, "peekWithSamePriority",                        dbInfo->pVm_peekWithSamePriority, parseQueueEntryArr,                        &helper, false, exception);      stateOk = currIndex >= 0;      if (!stateOk) {         if (queueEntryArr) {            free(queueEntryArr->queueEntryArr);            queueEntryArr->len = 0;         }      }      else {         if (!queueEntryArr)            queueEntryArr = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));         else if ((size_t)currIndex < queueEntryArr->len) {            queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, currIndex * sizeof(QueueEntry));            queueEntryArr->len = currIndex;          }      }   }   LOG __FILE__, "peekWithSamePriority() %s", stateOk ? "done" : "failed");   return queueEntryArr;}/** * Removes the given entries from persistence.  * @return The number of removed entries */static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception){   bool stateOk = true;   int64_t numOfBytes = 0;   int32_t countDeleted = 0;   sqlite_vm *pVm = 0;   DbInfo *dbInfo;   if (checkArgs(queueP, "randomRemove", true, exception) == false || queueEntryArr == 0 ||                 queueEntryArr->len == 0 || queueEntryArr->queueEntryArr == 0)      return 0;   LOG __FILE__, "randomRemove(%d) ...", (int)queueEntryArr->len);   dbInfo = getDbInfo(queueP);   {      size_t i;      const size_t qLen = 128 + 2*ID_MAX + queueEntryArr->len*(INT64_STRLEN_MAX+6);      char *queryString = (char *)calloc(qLen, sizeof(char));      /*  DELETE FROM xb_entries WHERE queueName = 'connection_clientJoe' AND dataId in ( 1081492136876000000, 1081492136856000000 ); */      SNPRINTF(queryString, qLen,            "DELETE FROM %.20sENTRIES WHERE queueName='%s'"           " AND dataId in ( ",           dbInfo->prop.tablePrefix, dbInfo->prop.queueName);      for (i=0; i<queueEntryArr->len; i++) {         strcat(queryString, int64ToStr(int64Str, queueEntryArr->queueEntryArr[i].uniqueId));         if (i<(queueEntryArr->len-1)) strcat(queryString, ",");         numOfBytes += ((queueEntryArr->queueEntryArr[i].sizeInBytes > 0) ? queueEntryArr->queueEntryArr[i].sizeInBytes : queueEntryArr->queueEntryArr[i].embeddedBlob.dataLen);      }      strcat(queryString, " )");      stateOk = compilePreparedQuery(queueP, "randomRemove", &pVm, queryString, exception);      free(queryString);   }   if (stateOk) { /* start the query */      int32_t currIndex = getResultRows(queueP, "randomRemove",                              pVm, 0, 0, true, exception);      stateOk = currIndex >= 0;   }   if (stateOk) {      countDeleted = (int32_t)sqlite_last_statement_changes(dbInfo->db);      if (countDeleted < 0 || (size_t)countDeleted != queueEntryArr->len) {         fillCache(queueP, exception); /* calculate numOfBytes again */      }      else {         dbInfo->numOfEntries -= queueEntryArr->len;         dbInfo->numOfBytes -= numOfBytes;      }   }   return countDeleted;}/** * Destroy all entries in queue and releases all resources in memory and on HD.  */static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception){   bool stateOk = true;   I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;   if (checkArgs(queueP, "destroy", false, exception) == false ) return false;   shutdownInternal(queuePP, exception);   {      DbInfo *dbInfo = getDbInfo(queueP);      const char *dbName = dbInfo->prop.dbName;      stateOk = unlink(dbName) == 0; /* Delete old db file */      if (!stateOk) {         strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);         SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,                  "[%.100s:%d] destroy() ERROR: Can't destroy database '%s', errno=%d.", __FILE__, __LINE__, dbName, errno);      }   }   freeQueue(queuePP);      return stateOk;}/** * Destroy all entries in queue.  */static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception){   int stateOk = true;   char queryString[LEN256];   sqlite_vm *pVm = 0;   DbInfo *dbInfo;   if (checkArgs(queueP, "clear", true, exception) == false) return false;   dbInfo = getDbInfo(queueP);   SNPRINTF(queryString, LEN256, "DELETE FROM %.20sENTRIES", dbInfo->prop.tablePrefix);   stateOk = compilePreparedQuery(queueP, "clear", &pVm, queryString, exception);   if (stateOk) {      int32_t currIndex = getResultRows(queueP, "clear", pVm, 0, 0, true, exception);      stateOk = currIndex >= 0;   }   if (stateOk) {      dbInfo->numOfEntries = 0;      dbInfo->numOfBytes = 0;   }   LOG __FILE__, "clear() done");   return stateOk;}/** * Parse response of "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'", */static bool parseCacheInfo(I_Queue *queueP, size_t currIndex, void *userP,                           const char **pazValue, const char **pazColName, ExceptionStruct *exception){   int64_t ival = 0;   bool stateOk;   DbInfo *dbInfo = getDbInfo(queueP);   stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_DATA_ID]);   if (!stateOk) {      strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,               "[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfEntries, ignoring entry.", __FILE__, __LINE__, pazColName[XB_ENTRIES_DATA_ID], pazValue[XB_ENTRIES_DATA_ID]);      return false;   }   dbInfo->numOfEntries = (int32_t)ival;   stateOk = strToInt64(&dbInfo->numOfBytes, pazValue[1]);   if (!stateOk) {      strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,               "[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfBytes, ignoring entry.", __FILE__, __LINE__, pazColName[1], pazValue[1]);      if (currIndex) {} /* Just to avoid compiler warning about unused variable */      if (userP) {};      return false;   }   return true;}/** * Reload cached information from database.  * @param queueP The this pointer * @param exception Returns error * @return false on error */static bool fillCache(I_Queue *queueP, ExceptionStruct *exception){   bool stateOk = true;   DbInfo *dbInfo = 0;   char queryString[LEN512]; /* "SELECT count(dataId) FROM XB_ENTRIES where queueName='connection_clientJoe'" */   if (checkArgs(queueP, "fillCache", true, exception) == false ) return true;   dbInfo = getDbInfo(queueP);   SNPRINTF(queryString, LEN512,             "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'",            dbInfo->prop.tablePrefix, dbInfo->prop.queueName);   stateOk = compilePreparedQuery(queueP, "fillCache",                  &dbInfo->pVm_fillCache, queryString, exception);   if (stateOk) { /* start the query, calls parseCacheInfo() */      int32_t currIndex = getResultRows(queueP, "fillCache",                              dbInfo->pVm_fillCache, parseCacheInfo,                              0, false, exception);      stateOk = currIndex > 0;   }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -