⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sqlitequeue.c

📁 java开源的企业总线.xmlBlaster
💻 C
📖 第 1 页 / 共 4 页
字号:
   LOG __FILE__, "fillCache() numOfEntries=%d numOfBytes=%s", dbInfo->numOfEntries, int64ToStr(int64Str, dbInfo->numOfBytes));   return stateOk;}static bool persistentQueueEmpty(I_Queue *queueP){   return getNumOfEntries(queueP) <= 0;}static int32_t getNumOfEntries(I_Queue *queueP){   DbInfo *dbInfo;   bool stateOk = true;   ExceptionStruct exception;   if (checkArgs(queueP, "getNumOfEntries", false, &exception) == false ) return -1;   dbInfo = getDbInfo(queueP);   if (dbInfo->numOfEntries == -1) {      stateOk = fillCache(queueP, &exception);   }   return (stateOk) ? (int32_t)dbInfo->numOfEntries : -1;}static int32_t getMaxNumOfEntries(I_Queue *queueP){   DbInfo *dbInfo;   ExceptionStruct exception;   if (checkArgs(queueP, "getMaxNumOfEntries", false, &exception) == false ) return -1;   dbInfo = getDbInfo(queueP);   return dbInfo->prop.maxNumOfEntries;}static int64_t getNumOfBytes(I_Queue *queueP){   DbInfo *dbInfo;   ExceptionStruct exception;   bool stateOk = true;   if (checkArgs(queueP, "getNumOfBytes", false, &exception) == false ) return -1;   dbInfo = getDbInfo(queueP);   if (dbInfo->numOfBytes == -1) {      stateOk = fillCache(queueP, &exception);   }   return (stateOk) ? dbInfo->numOfBytes : -1;}static int64_t getMaxNumOfBytes(I_Queue *queueP){   DbInfo *dbInfo;   ExceptionStruct exception;   if (checkArgs(queueP, "getMaxNumOfBytes", false, &exception) == false ) return -1;   dbInfo = getDbInfo(queueP);   return dbInfo->prop.maxNumOfBytes;}/** * Shutdown without destroying any entry.  * Clears all open DB resources. */static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception){   I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;   if (checkArgs(queueP, "shutdown", false, exception) == false ) return;   shutdownInternal(queuePP, exception);   freeQueue(queuePP);}/** * Shutdown used internally without calling freeQueue().  */static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception){   I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;   if (checkArgs(queueP, "shutdown", false, exception) == false ) return;   {      DbInfo *dbInfo = getDbInfo(queueP);      queueP->isInitialized = false;      if(dbInfo) {         if (dbInfo->pVm_put) {            char *errMsg = 0;            /*int rc =*/ sqlite_finalize(dbInfo->pVm_put, &errMsg);            if (errMsg != 0) sqlite_freemem(errMsg);            dbInfo->pVm_put = 0;         }         if (dbInfo->pVm_peekWithSamePriority) {            char *errMsg = 0;            sqlite_finalize(dbInfo->pVm_peekWithSamePriority, &errMsg);            if (errMsg != 0) sqlite_freemem(errMsg);            dbInfo->pVm_peekWithSamePriority = 0;         }         if (dbInfo->pVm_fillCache) {            char *errMsg = 0;            sqlite_finalize(dbInfo->pVm_fillCache, &errMsg);            if (errMsg != 0) sqlite_freemem(errMsg);            dbInfo->pVm_fillCache = 0;         }         if (dbInfo->db) {            sqlite_close(dbInfo->db);            dbInfo->db = 0;         }         LOG __FILE__, "shutdown() done");      }   }}/** * Frees everything inside QueueEntryArr and the struct QueueEntryArr itself * @param queueEntryArr The struct to free, passing NULL is OK */Dll_Export void freeQueueEntryArr(QueueEntryArr *queueEntryArr){   if (queueEntryArr == (QueueEntryArr *)0) return;   freeQueueEntryArrInternal(queueEntryArr);   free(queueEntryArr);}/** * Frees everything inside QueueEntryArr but NOT the struct QueueEntryArr itself * @param queueEntryArr The struct internals to free, passing NULL is OK */Dll_Export void freeQueueEntryArrInternal(QueueEntryArr *queueEntryArr){   size_t i;   if (queueEntryArr == (QueueEntryArr *)0) return;   for (i=0; i<queueEntryArr->len; i++) {      freeQueueEntryData(&queueEntryArr->queueEntryArr[i]);   }   free(queueEntryArr->queueEntryArr);   queueEntryArr->len = 0;}/** * Does not free the queueEntry itself */static void freeQueueEntryData(QueueEntry *queueEntry){   if (queueEntry == (QueueEntry *)0) return;   if (queueEntry->embeddedBlob.data != 0) {      free((char *)queueEntry->embeddedBlob.data);      queueEntry->embeddedBlob.data = 0;   }   queueEntry->embeddedBlob.dataLen = 0;}/** * Frees the internal blob and the queueEntry itself.  * @param queueEntry Its memory is freed, it is not usable anymore after this call */Dll_Export void freeQueueEntry(QueueEntry *queueEntry){   if (queueEntry == (QueueEntry *)0) return;   freeQueueEntryData(queueEntry);   free(queueEntry);}/** * NOTE: You need to free the returned pointer with xmlBlasterFree() (which calls free())! * * @param queueEntry The data to put to the queue * @param maxContentDumpLen for -1 get the complete content, else limit the *        content to the given number of bytes * @return A ASCII XML formatted entry or NULL if out of memory */Dll_Export char *queueEntryToXml(QueueEntry *queueEntry, int maxContentDumpLen){   if (queueEntry == (QueueEntry *)0) return 0;   {   char *contentStr = strFromBlobAlloc(queueEntry->embeddedBlob.data, queueEntry->embeddedBlob.dataLen);   const size_t blobLen = (maxContentDumpLen >= 0) ? maxContentDumpLen : queueEntry->embeddedBlob.dataLen;   const size_t len = 200 + QUEUE_ENTRY_EMBEDDEDTYPE_LEN + blobLen;   char *xml = (char *)calloc(len, sizeof(char));   if (xml == 0) {      free(contentStr);      return 0;   }   if (maxContentDumpLen == 0)      *contentStr = 0;   else if (maxContentDumpLen > 0 && queueEntry->embeddedBlob.dataLen > 5 &&            (size_t)maxContentDumpLen < (queueEntry->embeddedBlob.dataLen-5))      strcpy(contentStr+maxContentDumpLen, " ...");   SNPRINTF(xml, len, "\n <QueueEntry id='%s' priority='%hd' persistent='%s' type='%s'>"                      "\n  <content size='%lu'><![CDATA[%s]]></content>"                      "\n <QueueEntry>",                        int64ToStr(int64Str, queueEntry->uniqueId), queueEntry->priority,                        queueEntry->isPersistent?"true":"false",                        queueEntry->embeddedType,                        (unsigned long)queueEntry->embeddedBlob.dataLen, contentStr);   free(contentStr);   return xml;   }}Dll_Export void freeEntryDump(char *entryDump){   if (entryDump) free(entryDump);}/** * Checks the given arguments to be valid. * @param queueP The queue instance * @param methodName For logging * @param checkIsConnected If true does check the connection state as well  * @param exception Transporting errors  * @return false if the parameters are not usable, *         in this case 'exception' is filled with detail informations */static bool checkArgs(I_Queue *queueP, const char *methodName,                      bool checkIsConnected, ExceptionStruct *exception){   if (queueP == 0) {      if (exception == 0) {         printf("[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to %s()\n",                  __FILE__, __LINE__, methodName);      }      else {         strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);         SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,                  "[%.100s:%d] Please provide a valid I_Queue pointer to %.16s()",                   __FILE__, __LINE__, methodName);         LOG __FILE__, "%s: %s", exception->errorCode, exception->message);      }      return false;   }   if (exception == 0) {      LOG __FILE__, "[%s:%d] Please provide valid exception pointer to %s()", __FILE__, __LINE__, methodName);      return false;   }   if (checkIsConnected) {      if (queueP->privateObject==0 ||          ((DbInfo *)(queueP->privateObject))->db==0 ||          !queueP->isInitialized) {         strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);         SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,                  "[%.100s:%d] Not connected to database, %s() failed",                   __FILE__, __LINE__, methodName);         LOG __FILE__, "%s: %s", exception->errorCode, exception->message);         return false;      }   }   initializeExceptionStruct(exception);   LOG __FILE__, "%s() entering ...", methodName);   return true;}/*=================== TESTCODE =======================*/# ifdef QUEUE_MAIN/* NOTE:    This code may be totally outdated, for current examples please use:    xmlBlaster/testsuite/src/c/TestQueue.c*/#include <stdio.h>static void testRun(int argc, char **argv) {   ExceptionStruct exception;   QueueEntryArr *entries = 0;   QueueProperties queueProperties;   I_Queue *queueP = 0;   memset(&queueProperties, 0, sizeof(QueueProperties));   strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX);   strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX);   strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX);   queueProperties.maxNumOfEntries = 10000000L;   queueProperties.maxNumOfBytes = 1000000000LL;   queueProperties.logFp = xmlBlasterDefaultLogging;   queueProperties.logLevel = XMLBLASTER_LOG_TRACE;   queueProperties.userObject = 0;   queueP = createQueue(&queueProperties, &exception);   /* DbInfo *dbInfo = (DbInfo *)queueP->privateObject; */   if (argc || argv) {} /* to avoid compiler warning */   printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");   {      int64_t idArr[] =   { 1081492136826000000ll, 1081492136856000000ll, 1081492136876000000ll };      int16_t prioArr[] = { 5                    , 1                    , 5 };      char *data[] =      { "Hello"              , " World"             , "!!!" };      size_t i;      for (i=0; i<sizeof(idArr)/sizeof(int64_t); i++) {         QueueEntry queueEntry;         memset(&queueEntry, 0, sizeof(QueueEntry));         queueEntry.priority = prioArr[i];         queueEntry.isPersistent = true;         queueEntry.uniqueId = idArr[i];         strncpy0(queueEntry.embeddedType, "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN);         queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;         queueEntry.embeddedBlob.data = data[i];         queueEntry.embeddedBlob.dataLen = strlen(queueEntry.embeddedBlob.data);         queueP->put(queueP, &queueEntry, &exception);         if (*exception.errorCode != 0) {            LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);         }      }   }      entries = queueP->peekWithSamePriority(queueP, -1, 6, &exception);   if (*exception.errorCode != 0) {      LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);   }   if (entries != 0) {      size_t i;      printf("testRun after peekWithSamePriority() dump %lu entries:\n", (unsigned long)entries->len);      for (i=0; i<entries->len; i++) {         QueueEntry *queueEntry = &entries->queueEntryArr[i];         char *dump = queueEntryToXml(queueEntry, 200);         printf("%s\n", dump);         free(dump);      }   }   printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");   queueP->randomRemove(queueP, entries, &exception);   if (*exception.errorCode != 0) {      LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);   }   freeQueueEntryArr(entries);   printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");      queueP->clear(queueP, &exception);   printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");   queueP->shutdown(&queueP, &exception);}int main(int argc, char **argv) {   int i;   for (i=0; i<1; i++) {      testRun(argc, argv);   }   return 0;}#endif /*QUEUE_MAIN*//*=================== TESTCODE =======================*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -