⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sqlite3queue.c.todo

📁 java开源的企业总线.xmlBlaster
💻 TODO
📖 第 1 页 / 共 4 页
字号:
   return doContinue;
}

/**
 * Execute the query and get the query result. 
 * No parameters are checked, they must be valid
 * @param queueP  The this pointer
 * @param methodName The method called
 * @param pVm sqlite virtual machine
 * @param parseDataFp The function which is called for each SQL result row
 *                    or 0 if no function shall be called
 * @param userP The pointer which is passed to parseDataFp
 * @param finalize true to call sqlite_finalize which deletes the virtual machine,
 *                 false to call  sqlite_reset to reuse the prepared query
 * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
 * @return < 0 on error and exception->errorCode is not null
 *         otherwise the number of successfully parsed rows is returned
 * @todo For INSERT and DELETE return the number of touched entries !!!
 */
static int32_t getResultRows(I_Queue *queueP, const char *methodName,
                             xb_sqlite_stmt *pVm, 
                             ParseDataFp parseDataFp, void *userP,
                             bool finalize,
                             ExceptionStruct *exception)
{
   int32_t currIndex = 0;
   int numCol = 0;
   const char **pazValue = 0;
   const char **pazColName = 0;
   bool done = false;
   bool stateOk = true;
   int rc;
   while (!done) {

      rc = sqlite3_step(pVm);
        // TODO
#  	ifdef SQLITE2
      rc = sqlite_step(pVm, &numCol, &pazValue, &pazColName);
      switch( rc ){
         case SQLITE_DONE:
            done = true;
         break;
         case SQLITE_BUSY:
            LOG __FILE__, "%s() Sleeping as other thread holds DB.", methodName);
            sleepMillis(10);
         break;
         case SQLITE_ROW:
         {
            bool doContinue = true;
            if (parseDataFp) {
               /* @return true->to continue, false->to break execution or on error exception->errorCode is not null */
               doContinue = parseDataFp(queueP, currIndex, userP, pazValue, pazColName, exception);
               stateOk = *exception->errorCode == 0;
            }
            else {
               /*
               printf("RESULT[%d]\n", iRow);
               for (iCol = 0; iCol < numCol; iCol++) {
                  printf("%10.10s = %s\n", pazColName[iCol], pazValue[iCol]);
               }
               */
            }
            currIndex++;
            if (!stateOk || !doContinue) done = true;
         }
         break;
         case SQLITE_ERROR:   /* If exists already */
            LOG __FILE__, "%s() SQL execution problem [sqlCode=%d], entry already exists", methodName, rc);
            done = true;
            stateOk = false;
         break;
         case SQLITE_MISUSE:
         default:
            LOG __FILE__, "%s() SQL execution problem [sqlCode=%d %s]", methodName, rc, sqlite_error_string(rc));
            done = true;
            stateOk = false;
         break;
      }
#     endif // end of SQLITE2 code
   }
   LOG __FILE__, "%s() Processed %lu entries.", methodName, (unsigned long)currIndex);

   if (finalize) {
      sqlite3_finalize(pVm);
      if (rc != SQLITE_OK && rc != SQLITE_DONE) {
/*        LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled.", rc, sqlite_errmsg( )); */
          LOG __FILE__, "WARN: getResultRows() sqlCode=%d is not handled.", rc );
      }
   }
   else { /* Reset prepared statement */
      rc = sqlite3_reset(pVm);
      if (rc == SQLITE_SCHEMA) {
/*         LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled", rc, sqlite_error_string(rc) ); */
         LOG __FILE__, "WARN: getResultRows() sqlCode=%d is not handled", rc );
      }
   }

   return stateOk ? currIndex : (-1)*rc;
}

/**
 * Access queue entries without removing them. 
 */
static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception)
{
   int rc = 0;
   bool stateOk = true;
   DbInfo *dbInfo;
   QueueEntryArr *queueEntryArr = 0;

   if (checkArgs(queueP, "peekWithSamePriority", true, exception) == false ) return 0;

   LOG __FILE__, "peekWithSamePriority(maxNumOfEntries=%d, maxNumOfBytes=%s) ...", (int)maxNumOfEntries, int64ToStr(int64Str, maxNumOfBytes));

   dbInfo = getDbInfo(queueP);

   if (dbInfo->pVm_peekWithSamePriority == 0) {  /* Compile prepared  query */
      char queryString[LEN512];
      /*"SELECT * FROM XB_ENTRIES where queueName='connection_clientJoe' and prio=(select max(prio) from XB_ENTRIES where queueName='connection_clientJoe') ORDER BY dataId ASC";*/
      SNPRINTF(queryString, LEN512,
           "SELECT * FROM %.20sENTRIES where queueName=?"
           " and prio=(select max(prio) from %.20sENTRIES where queueName=?)"
           " ORDER BY dataId ASC",
           dbInfo->prop.tablePrefix, dbInfo->prop.tablePrefix);
      stateOk = compilePreparedQuery(queueP, "peekWithSamePriority",
                    &dbInfo->pVm_peekWithSamePriority , queryString, exception);
   }

   if (stateOk) { /* set prepared statement tokens */
      int index = 0;
      int len = -1; /* Calculated by sqlite_bind */
      rc = SQLITE_OK;

      // TODO !!!!!!!!!!

#     ifdef SQLITE2
      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false);
      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false);
#     endif

      switch (rc) {
         case SQLITE_OK:
            LOG __FILE__, "peekWithSamePriority() Bound to prepared statement [sqlCode=%d]", rc);
            break;
         default:
            LOG __FILE__, "peekWithSamePriority() SQL error: %d %s", rc, sqlite3_errmsg(dbInfo->db));
            strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
            SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
                     "[%.100s:%d] peekWithSamePriority() SQL error: %d %s", __FILE__, __LINE__, rc, sqlite3_errmsg(dbInfo->db));
            stateOk = false;
            break;
      }
   }

   if (stateOk) { /* start the query */
      TmpHelper helper;
      int32_t currIndex = 0;
      helper.queueEntryArrPP = &queueEntryArr;
      helper.maxNumOfEntries = maxNumOfEntries;
      helper.maxNumOfBytes = maxNumOfBytes;
      currIndex = getResultRows(queueP, "peekWithSamePriority",
                        dbInfo->pVm_peekWithSamePriority, parseQueueEntryArr,
                        &helper, false, exception);
      stateOk = currIndex >= 0;
      if (!stateOk) {
         if (queueEntryArr) {
            free(queueEntryArr->queueEntryArr);
            queueEntryArr->len = 0;
         }
      }
      else {
         if (!queueEntryArr)
            queueEntryArr = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));
         else if ((size_t)currIndex < queueEntryArr->len) {
            queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, currIndex * sizeof(QueueEntry));
            queueEntryArr->len = currIndex; 
         }
      }
   }

   LOG __FILE__, "peekWithSamePriority() %s", stateOk ? "done" : "failed");
   return queueEntryArr;
}

/**
 * Removes the given entries from persistence. 
 * @return The number of removed entries
 */
static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception)
{
   bool stateOk = true;
   int64_t numOfBytes = 0;
   int32_t countDeleted = 0;
   xb_sqlite_stmt *pVm = 0;
   DbInfo *dbInfo;
   if (checkArgs(queueP, "randomRemove", true, exception) == false || queueEntryArr == 0 ||
                 queueEntryArr->len == 0 || queueEntryArr->queueEntryArr == 0)
      return 0;

   LOG __FILE__, "randomRemove(%d) ...", (int)queueEntryArr->len);

   dbInfo = getDbInfo(queueP);

   {
      size_t i;
      const size_t qLen = 128 + 2*ID_MAX + queueEntryArr->len*(INT64_STRLEN_MAX+6);
      char *queryString = (char *)calloc(qLen, sizeof(char));
      /*  DELETE FROM xb_entries WHERE queueName = 'connection_clientJoe' AND dataId in ( 1081492136876000000, 1081492136856000000 ); */
      SNPRINTF(queryString, qLen, 
           "DELETE FROM %.20sENTRIES WHERE queueName='%s'"
           " AND dataId in ( ",
           dbInfo->prop.tablePrefix, dbInfo->prop.queueName);

      for (i=0; i<queueEntryArr->len; i++) {
         strcat(queryString, int64ToStr(int64Str, queueEntryArr->queueEntryArr[i].uniqueId));
         if (i<(queueEntryArr->len-1)) strcat(queryString, ",");
         numOfBytes += ((queueEntryArr->queueEntryArr[i].sizeInBytes > 0) ? queueEntryArr->queueEntryArr[i].sizeInBytes : queueEntryArr->queueEntryArr[i].embeddedBlob.dataLen);
      }
      strcat(queryString, " )");
      stateOk = compilePreparedQuery(queueP, "randomRemove", &pVm, queryString, exception);
      free(queryString);
   }


   if (stateOk) { /* start the query */
      int32_t currIndex = getResultRows(queueP, "randomRemove",
                              pVm, 0, 0, true, exception);
      stateOk = currIndex >= 0;
   }

   if (stateOk) {
      countDeleted = (int32_t)sqlite3_changes(dbInfo->db); // This function returns the number of database rows that were changed (or inserted or deleted) by the most recently completed
                                                           // INSERT, UPDATE, or DELETE statement.
                                                           // Only changes that are directly specified by the INSERT, UPDATE, or DELETE statement are counted.
                                                           // Auxiliary changes caused by triggers are not counted.
                                                           // Use the sqlite3_total_changes() function to find the total number of changes including changes caused by triggers.
      if (countDeleted < 0 || (size_t)countDeleted != queueEntryArr->len) {
         fillCache(queueP, exception); /* calculate numOfBytes again */
      }
      else {
         dbInfo->numOfEntries -= queueEntryArr->len;
         dbInfo->numOfBytes -= numOfBytes;
      }
   }

   return countDeleted;
}

/**
 * Destroy all entries in queue and releases all resources in memory and on HD. 
 */
static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception)
{
   bool stateOk = true;
   I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
   if (checkArgs(queueP, "destroy", false, exception) == false ) return false;

   shutdownInternal(queuePP, exception);

   {
      DbInfo *dbInfo = getDbInfo(queueP);
      const char *dbName = dbInfo->prop.dbName;
      stateOk = unlink(dbName) == 0; /* Delete old db file */
      if (!stateOk) {
         strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
         SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
                  "[%.100s:%d] destroy() ERROR: Can't destroy database '%s', errno=%d.", __FILE__, __LINE__, dbName, errno);
      }
   }

   freeQueue(queuePP);
   
   return stateOk;
}

/**
 * Destroy all entries in queue. 
 */
static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception)
{
   int stateOk = true;
   char queryString[LEN256];
   xb_sqlite_stmt *pVm = 0;
   DbInfo *dbInfo;
   if (checkArgs(queueP, "clear", true, exception) == false) return false;
   dbInfo = getDbInfo(queueP);

   SNPRINTF(queryString, LEN256, "DELETE FROM %.20sENTRIES", dbInfo->prop.tablePrefix);
   stateOk = compilePreparedQuery(queueP, "clear", &pVm, queryString, exception);

   if (stateOk) {
      int32_t currIndex = getResultRows(queueP, "clear", pVm, 0, 0, true, exception);
      stateOk = currIndex >= 0;
   }

   if (stateOk) {
      dbInfo->numOfEntries = 0;
      dbInfo->numOfBytes = 0;
   }

   LOG __FILE__, "clear() done");
   return stateOk;
}

/**
 * Parse response of "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'",
 */
static bool parseCacheInfo(I_Queue *queueP, size_t currIndex, void *userP,
                           const char **pazValue, const char **pazColName, ExceptionStruct *exception)
{
   int64_t ival = 0;
   bool stateOk;
   DbInfo *dbInfo = getDbInfo(queueP);

   stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_DATA_ID]);
   if (!stateOk) {
      strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
               "[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfEntries, ignoring entry.", __FILE__, __LINE__, pazColName[XB_ENTRIES_DATA_ID], pazValue[XB_ENTRIES_DATA_ID]);
      return false;
   }
   dbInfo->numOfEntries = (int32_t)ival;

   stateOk = strToInt64(&dbInfo->numOfBytes, pazValue[1]);
   if (!stateOk) {
      strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
               "[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfBytes, ignoring entry.", __FILE__, __LINE__, pazColName[1], pazValue[1]);
      if (currIndex) {} /* Just to avoid compiler warning about unused variable */
      if (userP) {};
      return false;
   }

   return true;
}

/**
 * Reload cached information from database. 
 * @param queueP The this pointer
 * @param exception Returns error
 * @return false on error
 */
static bool fillCache(I_Queue *queueP, ExceptionStruct *exception)
{
   bool stateOk = true;
   DbInfo *dbInfo = 0;

   char queryString[LEN512]; /* "SELECT count(dataId) FROM XB_ENTRIES where queueName='connection_clientJoe'" */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -