⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sqlite3queue.c.todo

📁 java开源的企业总线.xmlBlaster
💻 TODO
📖 第 1 页 / 共 4 页
字号:
   if (checkArgs(queueP, "fillCache", true, exception) == false ) return true;
   dbInfo = getDbInfo(queueP);

   SNPRINTF(queryString, LEN512, 
            "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'",
            dbInfo->prop.tablePrefix, dbInfo->prop.queueName);
   stateOk = compilePreparedQuery(queueP, "fillCache",
                  &dbInfo->pVm_fillCache, queryString, exception);

   if (stateOk) { /* start the query, calls parseCacheInfo() */
      int32_t currIndex = getResultRows(queueP, "fillCache",
                              dbInfo->pVm_fillCache, parseCacheInfo,
                              0, false, exception);
      stateOk = currIndex > 0;
   }

   LOG __FILE__, "fillCache() numOfEntries=%d numOfBytes=%s", dbInfo->numOfEntries, int64ToStr(int64Str, dbInfo->numOfBytes));
   return stateOk;
}

static bool persistentQueueEmpty(I_Queue *queueP)
{
   return getNumOfEntries(queueP) <= 0;
}

static int32_t getNumOfEntries(I_Queue *queueP)
{
   DbInfo *dbInfo;
   bool stateOk = true;
   ExceptionStruct exception;
   if (checkArgs(queueP, "getNumOfEntries", false, &exception) == false ) return -1;
   dbInfo = getDbInfo(queueP);
   if (dbInfo->numOfEntries == -1) {
      stateOk = fillCache(queueP, &exception);
   }
   return (stateOk) ? (int32_t)dbInfo->numOfEntries : -1;
}

static int32_t getMaxNumOfEntries(I_Queue *queueP)
{
   DbInfo *dbInfo;
   ExceptionStruct exception;
   if (checkArgs(queueP, "getMaxNumOfEntries", false, &exception) == false ) return -1;
   dbInfo = getDbInfo(queueP);
   return dbInfo->prop.maxNumOfEntries;
}

static int64_t getNumOfBytes(I_Queue *queueP)
{
   DbInfo *dbInfo;
   ExceptionStruct exception;
   bool stateOk = true;
   if (checkArgs(queueP, "getNumOfBytes", false, &exception) == false ) return -1;
   dbInfo = getDbInfo(queueP);
   if (dbInfo->numOfBytes == -1) {
      stateOk = fillCache(queueP, &exception);
   }
   return (stateOk) ? dbInfo->numOfBytes : -1;
}

static int64_t getMaxNumOfBytes(I_Queue *queueP)
{
   DbInfo *dbInfo;
   ExceptionStruct exception;
   if (checkArgs(queueP, "getMaxNumOfBytes", false, &exception) == false ) return -1;
   dbInfo = getDbInfo(queueP);
   return dbInfo->prop.maxNumOfBytes;
}

/**
 * Shutdown without destroying any entry. 
 * Clears all open DB resources.
 */
static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception)
{
   I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
   if (checkArgs(queueP, "shutdown", false, exception) == false ) return;
   shutdownInternal(queuePP, exception);
   freeQueue(queuePP);
}

/**
 * Shutdown used internally without calling freeQueue(). 
 */
static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception)
{
   I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
   if (checkArgs(queueP, "shutdown", false, exception) == false ) return;
   {
      DbInfo *dbInfo = getDbInfo(queueP);
      queueP->isInitialized = false;
      if(dbInfo) {
         if (dbInfo->pVm_put) {
		      sqlite3_finalize(dbInfo->pVm_put);
            dbInfo->pVm_put = 0;
         }
         if (dbInfo->pVm_peekWithSamePriority) {
		      sqlite3_finalize(dbInfo->pVm_peekWithSamePriority);
            dbInfo->pVm_peekWithSamePriority = 0;
         }
         if (dbInfo->pVm_fillCache) {
		      sqlite3_finalize(dbInfo->pVm_fillCache);
           dbInfo->pVm_fillCache = 0;
         }
         if (dbInfo->db) {
            sqlite3_close(dbInfo->db);
            dbInfo->db = 0;
         }
         LOG __FILE__, "shutdown() done");
      }
   }
}

/**
 * Frees everything inside QueueEntryArr and the struct QueueEntryArr itself
 * @param queueEntryArr The struct to free, passing NULL is OK
 */
Dll_Export void freeQueueEntryArr(QueueEntryArr *queueEntryArr)
{
   if (queueEntryArr == (QueueEntryArr *)0) return;
   freeQueueEntryArrInternal(queueEntryArr);
   free(queueEntryArr);
}

/**
 * Frees everything inside QueueEntryArr but NOT the struct QueueEntryArr itself
 * @param queueEntryArr The struct internals to free, passing NULL is OK
 */
Dll_Export void freeQueueEntryArrInternal(QueueEntryArr *queueEntryArr)
{
   size_t i;
   if (queueEntryArr == (QueueEntryArr *)0) return;
   for (i=0; i<queueEntryArr->len; i++) {
      freeQueueEntryData(&queueEntryArr->queueEntryArr[i]);
   }
   free(queueEntryArr->queueEntryArr);
   queueEntryArr->len = 0;
}

/**
 * Does not free the queueEntry itself
 */
static void freeQueueEntryData(QueueEntry *queueEntry)
{
   if (queueEntry == (QueueEntry *)0) return;
   if (queueEntry->embeddedBlob.data != 0) {
      free((char *)queueEntry->embeddedBlob.data);
      queueEntry->embeddedBlob.data = 0;
   }
   queueEntry->embeddedBlob.dataLen = 0;
}

/**
 * Frees the internal blob and the queueEntry itself. 
 * @param queueEntry Its memory is freed, it is not usable anymore after this call
 */
Dll_Export void freeQueueEntry(QueueEntry *queueEntry)
{
   if (queueEntry == (QueueEntry *)0) return;
   freeQueueEntryData(queueEntry);
   free(queueEntry);
}

/**
 * NOTE: You need to free the returned pointer with xmlBlasterFree() (which calls free())!
 *
 * @param queueEntry The data to put to the queue
 * @param maxContentDumpLen for -1 get the complete content, else limit the
 *        content to the given number of bytes
 * @return A ASCII XML formatted entry or NULL if out of memory
 */
Dll_Export char *queueEntryToXml(QueueEntry *queueEntry, int maxContentDumpLen)
{
   if (queueEntry == (QueueEntry *)0) return 0;
   {
   char *contentStr = strFromBlobAlloc(queueEntry->embeddedBlob.data, queueEntry->embeddedBlob.dataLen);
   const size_t blobLen = (maxContentDumpLen >= 0) ? maxContentDumpLen : queueEntry->embeddedBlob.dataLen;
   const size_t len = 200 + QUEUE_ENTRY_EMBEDDEDTYPE_LEN + blobLen;
   char *xml = (char *)calloc(len, sizeof(char));
   if (xml == 0) {
      free(contentStr);
      return 0;
   }
   if (maxContentDumpLen == 0)
      *contentStr = 0;
   else if (maxContentDumpLen > 0 && queueEntry->embeddedBlob.dataLen > 5 &&
            (size_t)maxContentDumpLen < (queueEntry->embeddedBlob.dataLen-5))
      strcpy(contentStr+maxContentDumpLen, " ...");

   SNPRINTF(xml, len, "\n <QueueEntry id='%s' priority='%hd' persistent='%s' type='%s'>"
                      "\n  <content size='%lu'><![CDATA[%s]]></content>"
                      "\n <QueueEntry>",
                        int64ToStr(int64Str, queueEntry->uniqueId), queueEntry->priority,
                        queueEntry->isPersistent?"true":"false",
                        queueEntry->embeddedType,
                        (unsigned long)queueEntry->embeddedBlob.dataLen, contentStr);
   free(contentStr);
   return xml;
   }
}

Dll_Export void freeEntryDump(char *entryDump)
{
   if (entryDump) free(entryDump);
}

/**
 * Checks the given arguments to be valid.
 * @param queueP The queue instance
 * @param methodName For logging
 * @param checkIsConnected If true does check the connection state as well 
 * @param exception Transporting errors 
 * @return false if the parameters are not usable,
 *         in this case 'exception' is filled with detail informations
 */
static bool checkArgs(I_Queue *queueP, const char *methodName,
                      bool checkIsConnected, ExceptionStruct *exception)
{
   if (queueP == 0) {
      if (exception == 0) {
         printf("[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to %s()\n",
                  __FILE__, __LINE__, methodName);
      }
      else {
         strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
         SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
                  "[%.100s:%d] Please provide a valid I_Queue pointer to %.16s()",
                   __FILE__, __LINE__, methodName);
         LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
      }
      return false;
   }

   if (exception == 0) {
      LOG __FILE__, "[%s:%d] Please provide valid exception pointer to %s()", __FILE__, __LINE__, methodName);
      return false;
   }

   if (checkIsConnected) {
      if (queueP->privateObject==0 ||
          ((DbInfo *)(queueP->privateObject))->db==0 ||
          !queueP->isInitialized) {
         strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
         SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
                  "[%.100s:%d] Not connected to database, %s() failed",
                   __FILE__, __LINE__, methodName);
         LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
         return false;
      }
   }

   initializeExceptionStruct(exception);

   LOG __FILE__, "%s() entering ...", methodName);

   return true;
}

/*=================== TESTCODE =======================*/
# ifdef QUEUE_MAIN
#include <stdio.h>
static void testRun(int argc, char **argv) {
   ExceptionStruct exception;
   QueueEntryArr *entries = 0;
   QueueProperties queueProperties;
   I_Queue *queueP = 0;

   memset(&queueProperties, 0, sizeof(QueueProperties));
   strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX);
   strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX);
   strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX);
   queueProperties.maxNumOfEntries = 10000000L;
   queueProperties.maxNumOfBytes = 1000000000LL;
   queueProperties.logFp = xmlBlasterDefaultLogging;
   queueProperties.logLevel = XMLBLASTER_LOG_TRACE;
   queueProperties.userObject = 0;

   queueP = createQueue(&queueProperties, &exception);
   /* DbInfo *dbInfo = (DbInfo *)queueP->privateObject; */
   if (argc || argv) {} /* to avoid compiler warning */

   printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");

   {
      int64_t idArr[] =   { 1081492136826000000ll, 1081492136856000000ll, 1081492136876000000ll };
      int16_t prioArr[] = { 5                    , 1                    , 5 };
      char *data[] =      { "Hello"              , " World"             , "!!!" };
      size_t i;
      for (i=0; i<sizeof(idArr)/sizeof(int64_t); i++) {
         QueueEntry queueEntry;
         memset(&queueEntry, 0, sizeof(QueueEntry));
         queueEntry.priority = prioArr[i];
         queueEntry.isPersistent = true;
         queueEntry.uniqueId = idArr[i];
         strncpy0(queueEntry.embeddedType, "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
         queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;
         queueEntry.embeddedBlob.data = data[i];
         queueEntry.embeddedBlob.dataLen = strlen(queueEntry.embeddedBlob.data);

         queueP->put(queueP, &queueEntry, &exception);
         if (*exception.errorCode != 0) {
            LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
         }
      }
   }
   
   entries = queueP->peekWithSamePriority(queueP, -1, 6, &exception);
   if (*exception.errorCode != 0) {
      LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
   }
   if (entries != 0) {
      size_t i;
      printf("testRun after peekWithSamePriority() dump %lu entries:\n", (unsigned long)entries->len);
      for (i=0; i<entries->len; i++) {
         QueueEntry *queueEntry = &entries->queueEntryArr[i];
         char *dump = queueEntryToXml(queueEntry, 200);
         printf("%s\n", dump);
         free(dump);
      }
   }

   printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
   queueP->randomRemove(queueP, entries, &exception);
   if (*exception.errorCode != 0) {
      LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
   }

   freeQueueEntryArr(entries);
   printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
   
   queueP->clear(queueP, &exception);
   printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");

   queueP->shutdown(&queueP, &exception);
}

int main(int argc, char **argv) {
   int i;
   for (i=0; i<1; i++) {
      testRun(argc, argv);
   }
   return 0;
}
#endif /*QUEUE_MAIN*/
/*=================== TESTCODE =======================*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -