⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sqlitequeue.c

📁 java开源的企业总线.xmlBlaster
💻 C
📖 第 1 页 / 共 4 页
字号:
           tablePrefix);   retOk = execSilent(queueP, queryString, "Creating ENTRIES table", exception);   SNPRINTF(queryString, LEN512, "CREATE INDEX %.20sENTRIES_IDX ON %.20sENTRIES (prio);",           tablePrefix, tablePrefix);   retOk = execSilent(queueP, queryString, "Creating PRIO index", exception);   return retOk;}/** * Invoke SQL query.  * @param queueP Is not checked, must not be 0 * @param queryString The SQL to execute * @param comment For logging or exception text * @param exception Can contain error information (out parameter) * @return true on success */static bool execSilent(I_Queue *queueP, const char *queryString, const char *comment, ExceptionStruct *exception){   int rc = 0;   char *errMsg = 0;   bool retOk;   DbInfo *dbInfo = getDbInfo(queueP);   rc = sqlite_exec(dbInfo->db, queryString, NULL, NULL, &errMsg);   switch (rc) {      case SQLITE_OK:         LOG __FILE__, "SQL '%s' success", comment);         retOk = true;         break;      default:         if (errMsg && strstr(errMsg, "already exists")) {            LOG __FILE__, "OK, '%s' [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);            retOk = true;         }         else if (rc == SQLITE_CONSTRAINT && errMsg && strstr(errMsg, " not unique")) {            LOG __FILE__, "OK, '%s' entry existed already [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);            retOk = true;         }         else {            LOG __FILE__, "SQL error '%s' [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);            strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);            SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,                     "[%.100s:%d] SQL error '%s' [%d]: %s %s", __FILE__, __LINE__, comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);            retOk = false;         }         break;   }   if (errMsg != 0) sqlite_freemem(errMsg);   return retOk;}/* * This is the callback routine that the SQLite library * invokes for each row of a query result.static int callback(void *pArg, int nArg, char **azArg, char **azCol){   int i;   struct callback_data *p = (struct callback_data*)pArg;   int w = 5;   if (p==0) {} // Suppress compiler warning   if( azArg==0 ) return 0;   for(i=0; i<nArg; i++){      int len = strlen(azCol[i]);      if( len>w ) w = len;   }   printf("\n");   for(i=0; i<nArg; i++){      printf("%*s = %s\n", w, azCol[i], azArg[i] ? azArg[i] : "NULL");   }  return 0;}*//** * @param queueP The queue instance * @param queueEntry The entry * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0 */static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception){   int rc = 0;   bool stateOk = true;   DbInfo *dbInfo;   char embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN]; /* To protect against buffer overflow */   if (checkArgs(queueP, "put", true, exception) == false ) return;   if (queueEntry == 0) {      strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,               "[%.100s:%d] Please provide a valid queueEntry pointer to function put()", __FILE__, __LINE__);      return;   }   if (queueEntry->uniqueId == 0) {      strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,               "[%.100s:%d] Please provide a valid queueEntry->uniqueId to function put()", __FILE__, __LINE__);      return;   }   if (*queueEntry->embeddedType == 0) {      strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,               "[%.100s:%d] Please provide a valid queueEntry->embeddedType to function put()", __FILE__, __LINE__);      return;   }   strncpy0(embeddedType, queueEntry->embeddedType, QUEUE_ENTRY_EMBEDDEDTYPE_LEN);   if (queueEntry->embeddedBlob.dataLen > 0 && queueEntry->embeddedBlob.data == 0) {      strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,               "[%.100s:%d] Please provide a valid queueEntry->embeddedBlob to function put()", __FILE__, __LINE__);      return;   }   dbInfo = getDbInfo(queueP);   if ((int64_t)dbInfo->numOfEntries >= dbInfo->prop.maxNumOfEntries) {      strncpy0(exception->errorCode, "resource.overflow.queue.entries", EXCEPTIONSTRUCT_ERRORCODE_LEN);      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,               "[%.100s:%d] The maximum number of queue entries = %d is exhausted", __FILE__, __LINE__, dbInfo->prop.maxNumOfEntries);      return;   }   if (dbInfo->numOfBytes >= dbInfo->prop.maxNumOfBytes) {      strncpy0(exception->errorCode, "resource.overflow.queue.bytes", EXCEPTIONSTRUCT_ERRORCODE_LEN);      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,               "[%.100s:%d] The maximum queue size of %s bytes is exhausted", __FILE__, __LINE__, int64ToStr(int64Str, dbInfo->prop.maxNumOfBytes));      return;   }   if (dbInfo->pVm_put == 0) {  /* Compile prepared query only once */      char queryString[LEN256];    /*INSERT INTO XB_ENTRIES VALUES ( 1081317015888000000, 'xmlBlaster_192_168_1_4_3412', 'topicStore_xmlBlaster_192_168_1_4_3412', 5, 'TOPIC_XML', 'T', 670, '\\254...')*/      SNPRINTF(queryString, LEN256, "INSERT INTO %.20sENTRIES VALUES ( ?, ?, ?, ?, ?, ?, ?)", dbInfo->prop.tablePrefix);      stateOk = compilePreparedQuery(queueP, "put", &dbInfo->pVm_put, queryString, exception);   }   if (stateOk) { /* set prepared statement tokens */      char intStr[INT64_STRLEN_MAX];      int index = 0;      const int len = -1; /* Calculated by sqlite_bind */      rc = SQLITE_OK;      int64ToStr(intStr, queueEntry->uniqueId);      /*LOG __FILE__, "put uniqueId as string '%s'", intStr);*/      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, dbInfo->prop.queueName, len, false);      SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", queueEntry->priority);      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, embeddedType, len, false);      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, queueEntry->isPersistent?"T":"F", len, false);      SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", (int32_t)queueEntry->embeddedBlob.dataLen);      if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);      if (rc == SQLITE_OK) {         /* As SQLite does only store strings we encode our blob to a string */         size_t estimatedSize = 2 +(257 * queueEntry->embeddedBlob.dataLen )/254;         unsigned char *out = (unsigned char *)malloc(estimatedSize*sizeof(char));         int encodedSize = sqlite_encode_binary((const unsigned char *)queueEntry->embeddedBlob.data,                              (int)queueEntry->embeddedBlob.dataLen, out);         rc = sqlite_bind(dbInfo->pVm_put, ++index, (const char *)out, encodedSize+1, true);         free(out);      }      if (rc != SQLITE_OK) {         LOG __FILE__, "put(%s) SQL error: %d %s", int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite_error_string(rc));         strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);         SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,                  "[%.100s:%d] put(%s) SQL error: %d %s", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite_error_string(rc));         stateOk = false;      }   }   if (stateOk) { /* start the query, process results */      int countRows = getResultRows(queueP, "put", dbInfo->pVm_put, 0, 0, false, exception);      stateOk = countRows >= 0;   }   if (stateOk) {      dbInfo->numOfEntries += 1;      dbInfo->numOfBytes += ((queueEntry->sizeInBytes > 0) ? queueEntry->sizeInBytes : queueEntry->embeddedBlob.dataLen);   }   LOG __FILE__, "put(%s) %s", int64ToStr(int64Str, queueEntry->uniqueId), stateOk ? "done" : "failed");}/** * Compile a prepared query.  * No parameters are checked, they must be valid * @param queueP The queue instance to use * @param methodName A nice string for logging * @param ppVm The virtual machine will be initialized if still 0 * @param queryString * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0 * @return false on error and exception->errorCode is not null */static bool compilePreparedQuery(I_Queue *queueP, const char *methodName,                    sqlite_vm **ppVm, const char *queryString, ExceptionStruct *exception){   int iRetry, numRetry=100;   char *errMsg = 0;   int rc = 0;   const char *pzTail = 0;   /* OUT: uncompiled tail of zSql */   bool stateOk = true;   DbInfo *dbInfo = getDbInfo(queueP);   if (*ppVm == 0) {  /* Compile prepared  query */      for (iRetry = 0; iRetry < numRetry; iRetry++) {         rc = sqlite_compile(dbInfo->db, queryString, &pzTail, ppVm, &errMsg);         switch (rc) {            case SQLITE_BUSY:               if (iRetry == (numRetry-1)) {                  strncpy0(exception->errorCode, "resource.db.block", EXCEPTIONSTRUCT_ERRORCODE_LEN);                  SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,                           "[%.100s:%d] SQL error #%d in %s(): %s %s", __FILE__, __LINE__, rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);               }               LOG __FILE__, "%s() Sleeping as other thread holds DB %s", methodName, (errMsg==0)?"":errMsg);               if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }               sleepMillis(10);               break;            case SQLITE_OK:               iRetry = numRetry; /* We're done */               LOG __FILE__, "%s() Pre-compiled prepared query '%s'", methodName, queryString);               if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }               break;            default:               LOG __FILE__, "SQL error #%d %s in %s(): %s: %s", rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);               strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);               SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,                        "[%.100s:%d] SQL error #%d %s in %s(): %s", __FILE__, __LINE__, rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);               iRetry = numRetry; /* We're done */               if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }               stateOk = false;               break;         }      }   }   if (*ppVm == 0) stateOk = false;   return stateOk;}/** * For each SQL result row parse it into a QueueEntry.  * No parameters are checked, they must be valid * Implements a ParseDataFp (function pointer) * @param queueP The 'this' pointer * @param currIndex * @param userP * @param pazValue * @param pazColName * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0 * @return false on error and exception->errorCode is not null */static bool parseQueueEntryArr(I_Queue *queueP, size_t currIndex, void *userP,                               const char **pazValue, const char **pazColName, ExceptionStruct *exception){   bool doContinue = true;   int numAssigned;   bool stateOk = true;   int decodeSize = 0;   QueueEntry *queueEntry = 0;   QueueEntryArr *queueEntryArr;   TmpHelper *helper = (TmpHelper*)userP;   QueueEntryArr **queueEntryArrPP = helper->queueEntryArrPP;   if (currIndex == 0) {      helper->currEntries = 0;      helper->currBytes = 0;   }   if (*queueEntryArrPP == 0) {      *queueEntryArrPP = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));;      if (helper->maxNumOfEntries == 0) {         doContinue = false;         return doContinue;      }   }   queueEntryArr = *queueEntryArrPP;   if (queueEntryArr->len == 0) {      queueEntryArr->len = 10;      queueEntryArr->queueEntryArr = (QueueEntry *)calloc(queueEntryArr->len, sizeof(QueueEntry));   }   else if (currIndex >= queueEntryArr->len) {      queueEntryArr->len += 10;      queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, queueEntryArr->len * sizeof(QueueEntry));   }   queueEntry = &queueEntryArr->queueEntryArr[currIndex];   memset(queueEntry, 0, sizeof(QueueEntry));   stateOk = strToInt64(&queueEntry->uniqueId, pazValue[XB_ENTRIES_DATA_ID]);   if (!stateOk) {      LOG __FILE__, "peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' to uniqueId, ignoring entry.", pazValue[XB_ENTRIES_DATA_ID]);      strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);      SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,               "[%.100s:%d] peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' col=%s to uniqueId, ignoring entry.", __FILE__, __LINE__, pazValue[XB_ENTRIES_DATA_ID], pazColName[XB_ENTRIES_DATA_ID]);      doContinue = false;      return doContinue;   }   LOG __FILE__, "peekWithSamePriority(%s) currIndex=%d", int64ToStr(int64Str, queueEntry->uniqueId), currIndex);   /* strncpy0(dbInfo->prop.queueName, pazValue[2], ID_MAX); */   numAssigned = sscanf(pazValue[XB_ENTRIES_PRIO], "%hd", &queueEntry->priority);   if (numAssigned != 1) {      LOG __FILE__, "peekWithSamePriority(%s) ERROR: Can't parse pazValue[XB_ENTRIES_PRIO] '%.20s' to priority, setting it to NORM", int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_PRIO]);      queueEntry->priority = 4;   }   strncpy0(queueEntry->embeddedType, pazValue[XB_ENTRIES_TYPE_NAME], QUEUE_ENTRY_EMBEDDEDTYPE_LEN);   queueEntry->isPersistent = *pazValue[XB_ENTRIES_PERSISTENT] == 'T' ? true : false;   {      int64_t ival = 0;      stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_SIZE_IN_BYTES]);      queueEntry->embeddedBlob.dataLen = (size_t)ival;   }   /* TODO!!! in Java the length is the size in RAM and not strlen(data) */   /* queueEntry->embeddedBlob.data = (char *)malloc(queueEntry->embeddedBlob.dataLen*sizeof(char)); */   queueEntry->embeddedBlob.data = (char *)malloc(strlen(pazValue[XB_ENTRIES_BLOB])*sizeof(char)); /* we spoil some 2 % */   decodeSize = sqlite_decode_binary((const unsigned char *)pazValue[XB_ENTRIES_BLOB], (unsigned char *)queueEntry->embeddedBlob.data);   if (decodeSize == -1 || (size_t)decodeSize != queueEntry->embeddedBlob.dataLen) {      *(queueEntry->embeddedBlob.data + strlen(pazValue[XB_ENTRIES_BLOB]) - 1) = 0;       LOG __FILE__, "peekWithSamePriority(%s) ERROR: Returned blob encoded='%s', decodeSize=%d"                        " but expected decoded len=%d: '%s'",                    int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_BLOB], decodeSize,                    queueEntry->embeddedBlob.dataLen, queueEntry->embeddedBlob.data);   }   helper->currEntries += 1;   helper->currBytes += queueEntry->embeddedBlob.dataLen;   /* Limit the number of entries */   if ((helper->maxNumOfEntries != -1 && helper->currEntries >= helper->maxNumOfEntries) ||       (helper->maxNumOfBytes != -1 && helper->currBytes >= helper->maxNumOfBytes)) {      /* sqlite_interrupt(dbInfo->db); -> sets rc==SQLITE_ERROR on next sqlite-step() which i can't distinguish from a real error */      doContinue = false;   }   return doContinue;}/** * Execute the query and get the query result.  * No parameters are checked, they must be valid * @param queueP  The this pointer * @param methodName The method called * @param pVm sqlite virtual machine

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -