⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sqlite3queue.c.todo

📁 java开源的企业总线.xmlBlaster
💻 TODO
📖 第 1 页 / 共 4 页
字号:
 * @return true on success
 */
static bool createTables(I_Queue *queueP, ExceptionStruct *exception)
{
   char queryString[LEN512];
   bool retOk;
   const char *tablePrefix = ((DbInfo *)(queueP->privateObject))->prop.tablePrefix;

   SNPRINTF(queryString, LEN512, "CREATE TABLE %.20sENTRIES (dataId bigint , queueName text , prio integer, flag text, durable char(1), byteSize bigint, blob bytea, PRIMARY KEY (dataId, queueName));",
           tablePrefix);
   retOk = execSilent(queueP, queryString, "Creating ENTRIES table", exception);

   SNPRINTF(queryString, LEN512, "CREATE INDEX %.20sENTRIES_IDX ON %.20sENTRIES (prio);",
           tablePrefix, tablePrefix);
   retOk = execSilent(queueP, queryString, "Creating PRIO index", exception);
   return retOk;
}

/**
 * Invoke SQL query. 
 * @param queueP Is not checked, must not be 0
 * @param queryString The SQL to execute
 * @param comment For logging or exception text
 * @param exception Can contain error information (out parameter)
 * @return true on success
 */
static bool execSilent(I_Queue *queueP, const char *queryString, const char *comment, ExceptionStruct *exception)
{
   int rc = 0;
   char *errMsg = 0;
   bool retOk;
   DbInfo *dbInfo = getDbInfo(queueP);

   rc = sqlite3_exec(dbInfo->db, queryString, NULL, NULL, &errMsg);
   switch (rc) {
      case SQLITE_OK:
         LOG __FILE__, "SQL '%s' success", comment);
         retOk = true;
         break;
      default:
         if (errMsg && strstr(errMsg, "already exists")) {
            LOG __FILE__, "OK, '%s' [%d]: %s %s", comment, rc, sqlite3_errmsg(dbInfo->db), (errMsg==0)?"":errMsg);
            retOk = true;
         }
         else if (rc == SQLITE_CONSTRAINT && errMsg && strstr(errMsg, " not unique")) {
            LOG __FILE__, "OK, '%s' entry existed already [%d]: %s %s", comment, rc, sqlite3_errmsg(dbInfo->db), (errMsg==0)?"":errMsg);
            retOk = true;
         }
         else {
            LOG __FILE__, "SQL error '%s' [%d]: %s %s", comment, rc, sqlite3_errmsg(dbInfo->db), (errMsg==0)?"":errMsg);
            strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
            SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
                     "[%.100s:%d] SQL error '%s' [%d]: %s %s", __FILE__, __LINE__, comment, rc, sqlite3_errmsg(dbInfo->db), (errMsg==0)?"":errMsg);
            retOk = false;
         }
         break;
   }
   if (errMsg != 0) xb_sqlite_free(errMsg);
   return retOk;
}

/*
 * This is the callback routine that the SQLite library
 * invokes for each row of a query result.
static int callback(void *pArg, int nArg, char **azArg, char **azCol){
   int i;
   struct callback_data *p = (struct callback_data*)pArg;
   int w = 5;
   if (p==0) {} // Suppress compiler warning
   if( azArg==0 ) return 0;
   for(i=0; i<nArg; i++){
      int len = strlen(azCol[i]);
      if( len>w ) w = len;
   }
   printf("\n");
   for(i=0; i<nArg; i++){
      printf("%*s = %s\n", w, azCol[i], azArg[i] ? azArg[i] : "NULL");
   }
  return 0;
}
*/

/**
 * @param queueP The queue instance
 * @param queueEntry The entry
 * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
 */
static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception)
{
   int rc = 0;
   bool stateOk = true;
   DbInfo *dbInfo;
   char embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN]; /* To protect against buffer overflow */

   if (checkArgs(queueP, "put", true, exception) == false ) return;
   if (queueEntry == 0) {
      strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
               "[%.100s:%d] Please provide a valid queueEntry pointer to function put()", __FILE__, __LINE__);
      return;
   }
   if (queueEntry->uniqueId == 0) {
      strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
               "[%.100s:%d] Please provide a valid queueEntry->uniqueId to function put()", __FILE__, __LINE__);
      return;
   }
   if (*queueEntry->embeddedType == 0) {
      strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
               "[%.100s:%d] Please provide a valid queueEntry->embeddedType to function put()", __FILE__, __LINE__);
      return;
   }
   strncpy0(embeddedType, queueEntry->embeddedType, QUEUE_ENTRY_EMBEDDEDTYPE_LEN);

   if (queueEntry->embeddedBlob.dataLen > 0 && queueEntry->embeddedBlob.data == 0) {
      strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
               "[%.100s:%d] Please provide a valid queueEntry->embeddedBlob to function put()", __FILE__, __LINE__);
      return;
   }

   dbInfo = getDbInfo(queueP);

   if (dbInfo->numOfEntries >= dbInfo->prop.maxNumOfEntries) {
      strncpy0(exception->errorCode, "resource.overflow.queue.entries", EXCEPTIONSTRUCT_ERRORCODE_LEN);
      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
               "[%.100s:%d] The maximum number of queue entries = %d is exhausted", __FILE__, __LINE__, dbInfo->prop.maxNumOfEntries);
      return;
   }
   if (dbInfo->numOfBytes >= dbInfo->prop.maxNumOfBytes) {
      strncpy0(exception->errorCode, "resource.overflow.queue.bytes", EXCEPTIONSTRUCT_ERRORCODE_LEN);
      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
               "[%.100s:%d] The maximum queue size of %s bytes is exhausted", __FILE__, __LINE__, int64ToStr(int64Str, dbInfo->prop.maxNumOfBytes));
      return;
   }


   if (dbInfo->pVm_put == 0) {  /* Compile prepared query only once */
      char queryString[LEN256];    /*INSERT INTO XB_ENTRIES VALUES ( 1081317015888000000, 'xmlBlaster_192_168_1_4_3412', 'topicStore_xmlBlaster_192_168_1_4_3412', 5, 'TOPIC_XML', 'T', 670, '\\254...')*/
      SNPRINTF(queryString, LEN256, "INSERT INTO %.20sENTRIES VALUES ( ?, ?, ?, ?, ?, ?, ?)", dbInfo->prop.tablePrefix);
      stateOk = compilePreparedQuery(queueP, "put", &dbInfo->pVm_put, queryString, exception);
   }

   if (stateOk) { /* set prepared statement tokens */

    //TODO    !!!!!!!!!!!!!!!!!!!
#  ifdef SQLITE2
      char intStr[INT64_STRLEN_MAX];
      int index = 0;
      const int len = -1; /* Calculated by sqlite_bind */
      rc = SQLITE_OK;
  
      int64ToStr(intStr, queueEntry->uniqueId);
      /*LOG __FILE__, "put uniqueId as string '%s'", intStr);*/
      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, dbInfo->prop.queueName, len, false);
      SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", queueEntry->priority);
      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, embeddedType, len, false);
      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, queueEntry->isPersistent?"T":"F", len, false);
      SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", (int32_t)queueEntry->embeddedBlob.dataLen);
      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
      if (rc == SQLITE_OK) {
         /* As SQLite does only store strings we encode our blob to a string */
         size_t estimatedSize = 2 +(257 * queueEntry->embeddedBlob.dataLen )/254;
         unsigned char *out = (unsigned char *)malloc(estimatedSize*sizeof(char));
         int encodedSize = sqlite_encode_binary((const unsigned char *)queueEntry->embeddedBlob.data,
                              (int)queueEntry->embeddedBlob.dataLen, out);
         rc = sqlite_bind(dbInfo->pVm_put, ++index, (const char *)out, encodedSize+1, true);
         free(out);
      }
#  endif // SQLITE2 code

      if (rc != SQLITE_OK) {
         LOG __FILE__, "put(%s) SQL error: %d %s", int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite3_errmsg(dbInfo->db));
         strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
         SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
                  "[%.100s:%d] put(%s) SQL error: %d %s", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite3_errmsg(dbInfo->db));
         stateOk = false;
      }
   }

   if (stateOk) { /* start the query, process results */
      int countRows = getResultRows(queueP, "put", dbInfo->pVm_put, 0, 0, false, exception);
      stateOk = countRows >= 0;
   }

   if (stateOk) {
      dbInfo->numOfEntries += 1;
      dbInfo->numOfBytes += ((queueEntry->sizeInBytes > 0) ? queueEntry->sizeInBytes : queueEntry->embeddedBlob.dataLen);
   }

   LOG __FILE__, "put(%s) %s", int64ToStr(int64Str, queueEntry->uniqueId), stateOk ? "done" : "failed");
}


/**
 * Compile a prepared query. 
 * No parameters are checked, they must be valid
 * @param queueP The queue instance to use
 * @param methodName A nice string for logging
 * @param ppVm The virtual machine will be initialized if still 0
 * @param queryString
 * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
 * @return false on error and exception->errorCode is not null
 */
static bool compilePreparedQuery(I_Queue *queueP, const char *methodName,
                    xb_sqlite_stmt **ppVm, const char *queryString, ExceptionStruct *exception)
{
   int iRetry, numRetry=100;
   int rc = 0;
   const char *pzTail = 0;   /* OUT: uncompiled tail of zSql */
   bool stateOk = true;
   DbInfo *dbInfo = getDbInfo(queueP);

   if (*ppVm == 0) {  /* Compile prepared  query */
      for (iRetry = 0; iRetry < numRetry; iRetry++) {
         rc = sqlite3_prepare(dbInfo->db, queryString, strlen(queryString), ppVm, &pzTail);
         switch (rc) {
            case SQLITE_BUSY:
               if (iRetry == (numRetry-1)) {
                  strncpy0(exception->errorCode, "resource.db.block", EXCEPTIONSTRUCT_ERRORCODE_LEN);
                  SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
                           "[%.100s:%d] SQL error #%d %s in %s(): %s", __FILE__, __LINE__, rc, sqlite3_errmsg(dbInfo->db), methodName);
               }
               LOG __FILE__, "%s() Sleeping as other thread holds DB %s", methodName, sqlite3_errmsg(dbInfo->db) );
               sleepMillis(10);
               break;
            case SQLITE_OK:
               iRetry = numRetry; /* We're done */
               LOG __FILE__, "%s() Pre-compiled prepared query '%s'", methodName, queryString);
               break;
            default:
               LOG __FILE__, "SQL error #%d %s in %s()", rc, sqlite3_errmsg(dbInfo->db), methodName);
               strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
               SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
                        "[%.100s:%d] SQL error #%d %s in %s()", __FILE__, __LINE__, rc, sqlite3_errmsg(dbInfo->db), methodName);
               iRetry = numRetry; /* We're done */
               stateOk = false;
               break;
         }
      }
   }
   if (*ppVm == 0) stateOk = false;
   return stateOk;
}

/**
 * For each SQL result row parse it into a QueueEntry. 
 * No parameters are checked, they must be valid
 * Implements a ParseDataFp (function pointer)
 * @param queueP The 'this' pointer
 * @param currIndex
 * @param userP
 * @param pazValue
 * @param pazColName
 * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
 * @return false on error and exception->errorCode is not null
 */
static bool parseQueueEntryArr(I_Queue *queueP, size_t currIndex, void *userP,
                               const char **pazValue, const char **pazColName, ExceptionStruct *exception)
{
   bool doContinue = true;
   int numAssigned;
   bool stateOk = true;
   int decodeSize = 0;
   QueueEntry *queueEntry = 0;
   QueueEntryArr *queueEntryArr;
   TmpHelper *helper = (TmpHelper*)userP;
   QueueEntryArr **queueEntryArrPP = helper->queueEntryArrPP;

   if (currIndex == 0) {
      helper->currEntries = 0;
      helper->currBytes = 0;
   }

   if (*queueEntryArrPP == 0) {
      *queueEntryArrPP = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));;
      if (helper->maxNumOfEntries == 0) {
         doContinue = false;
         return doContinue;
      }
   }
   queueEntryArr = *queueEntryArrPP;

   if (queueEntryArr->len == 0) {
      queueEntryArr->len = 10;
      queueEntryArr->queueEntryArr = (QueueEntry *)calloc(queueEntryArr->len, sizeof(QueueEntry));
   }
   else if (currIndex >= queueEntryArr->len) {
      queueEntryArr->len += 10;
      queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, queueEntryArr->len * sizeof(QueueEntry));
   }
   queueEntry = &queueEntryArr->queueEntryArr[currIndex];
   memset(queueEntry, 0, sizeof(QueueEntry));

   stateOk = strToInt64(&queueEntry->uniqueId, pazValue[XB_ENTRIES_DATA_ID]);
   if (!stateOk) {
      LOG __FILE__, "peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' to uniqueId, ignoring entry.", pazValue[XB_ENTRIES_DATA_ID]);
      strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
               "[%.100s:%d] peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' col=%s to uniqueId, ignoring entry.", __FILE__, __LINE__, pazValue[XB_ENTRIES_DATA_ID], pazColName[XB_ENTRIES_DATA_ID]);
      doContinue = false;
      return doContinue;
   }

   LOG __FILE__, "peekWithSamePriority(%s) currIndex=%d", int64ToStr(int64Str, queueEntry->uniqueId), currIndex);
   /* strncpy0(dbInfo->prop.queueName, pazValue[2], ID_MAX); */
   numAssigned = sscanf(pazValue[XB_ENTRIES_PRIO], "%hd", &queueEntry->priority);
   if (numAssigned != 1) {
      LOG __FILE__, "peekWithSamePriority(%s) ERROR: Can't parse pazValue[XB_ENTRIES_PRIO] '%.20s' to priority, setting it to NORM", int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_PRIO]);
      queueEntry->priority = 4;
   }
   strncpy0(queueEntry->embeddedType, pazValue[XB_ENTRIES_TYPE_NAME], QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
   queueEntry->isPersistent = *pazValue[XB_ENTRIES_PERSISTENT] == 'T' ? true : false;
   {
      int64_t ival = 0;
      stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_SIZE_IN_BYTES]);
      queueEntry->embeddedBlob.dataLen = (size_t)ival;
   }

   /* TODO!!! in Java the length is the size in RAM and not strlen(data) */
   /* queueEntry->embeddedBlob.data = (char *)malloc(queueEntry->embeddedBlob.dataLen*sizeof(char)); */

    // TODO!!!!!!!!!!!!

#  ifdef SQLITE2
   queueEntry->embeddedBlob.data = (char *)malloc(strlen(pazValue[XB_ENTRIES_BLOB])*sizeof(char)); /* we spoil some 2 % */
   decodeSize = sqlite_decode_binary((const unsigned char *)pazValue[XB_ENTRIES_BLOB], (unsigned char *)queueEntry->embeddedBlob.data);
   if (decodeSize == -1 || (size_t)decodeSize != queueEntry->embeddedBlob.dataLen) {
      *(queueEntry->embeddedBlob.data + strlen(pazValue[XB_ENTRIES_BLOB]) - 1) = 0; 
      LOG __FILE__, "peekWithSamePriority(%s) ERROR: Returned blob encoded='%s', decodeSize=%d"
                        " but expected decoded len=%d: '%s'",
                    int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_BLOB], decodeSize,
                    queueEntry->embeddedBlob.dataLen, queueEntry->embeddedBlob.data);
   }
#  endif

   helper->currEntries += 1;
   helper->currBytes += queueEntry->embeddedBlob.dataLen;

   /* Limit the number of entries */
   if ((helper->maxNumOfEntries != -1 && helper->currEntries >= helper->maxNumOfEntries) ||
       (helper->maxNumOfBytes != -1 && helper->currBytes >= helper->maxNumOfBytes)) {
      /* sqlite_interrupt(dbInfo->db); -> sets rc==SQLITE_ERROR on next sqlite-step() which i can't distinguish from a real error */
      doContinue = false;
   }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -