⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sqlitequeueplugin.cpp

📁 java开源的企业总线.xmlBlaster
💻 CPP
📖 第 1 页 / 共 2 页
字号:
            QueryKeyData queryKeyData = queryKeyFactory_.readObject(string(msgUnit.key));            SubscribeKey subscribeKey(global_, queryKeyData);            QueryQosData queryQosData = queryQosFactory_.readObject(string(msgUnit.qos));            SubscribeQos subscribeQos(global_, queryQosData);            SubscribeQueueEntry *pq = new SubscribeQueueEntry(global_, subscribeKey, subscribeQos,                                           queueEntryC.priority, queueEntryC.uniqueId);            if (log_.trace()) log_.trace(ME, "Got SubscribeQueueEntry from queue");            ret.insert(ret.end(), EntryType(*pq));            if (log_.trace()) log_.trace(ME, "SubscribeQueueEntry is reference countet");         }         */         else {  // TODO: How to handle: throw exception or remove the buggy entry?            log_.error(ME + "::peekWithSamePriority", string("The queue entry embeddedType '") + queueEntryC.embeddedType + "' methodName='" + methodName + "' is not supported, we ignore it.");         }      }      freeMsgUnitArr(msgUnitArrC);   }   freeQueueEntryArr(entriesC);   return ret;}void SQLiteQueuePlugin::parseEmbeddedType(const string& embeddedType, string &type, string &methodName){   string::size_type pos = embeddedType.find("|");   if (pos == string::npos) {      type = embeddedType;      methodName = "";      return;   }   type = embeddedType.substr(0, pos);   if (pos < embeddedType.size())      methodName = embeddedType.substr(pos+1);   // No trim(): we assume no white spaces}long SQLiteQueuePlugin::randomRemove(const vector<EntryType>::const_iterator &start, const vector<EntryType>::const_iterator &end) {   Lock lock(accessMutex_);   if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, randomRemove() failed");   if (start == end || queueP_->empty(queueP_)) return 0;   ::QueueEntryArr queueEntryArr;   memset(&queueEntryArr, 0, sizeof(QueueEntryArr));   {      vector<EntryType>::const_iterator iter = start;      while (iter != end) {         iter++;         queueEntryArr.len++;      }   }   if (queueEntryArr.len < 1) return 0;   queueEntryArr.queueEntryArr = (QueueEntry *)calloc(queueEntryArr.len, sizeof(QueueEntry));   vector<EntryType>::const_iterator iter = start;   for (int currIndex=0; iter != end; ++iter, currIndex++) {      const EntryType &entryType = (*iter);      const MsgQueueEntry &entry = *entryType;      ::QueueEntry &queueEntry = queueEntryArr.queueEntryArr[currIndex];      // Copy C++ to C struct ...      queueEntry.priority = entry.getPriority();      queueEntry.isPersistent = entry.isPersistent();      queueEntry.uniqueId = entry.getUniqueId();      queueEntry.sizeInBytes = entry.getSizeInBytes();      strncpy0(queueEntry.embeddedType, entry.getEmbeddedType().c_str(), QUEUE_ENTRY_EMBEDDEDTYPE_LEN);  // "MSG_RAW|publish" "MSG_RAW|connect"      queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;      /*      const BlobHolder *blob = (const BlobHolder *)entry.getEmbeddedObject();      if (blob == 0) throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME, "put() failed, the entry " + entry.getLogId() + " returned NULL for embeddedObject");      queueEntry.embeddedBlob.data = blob->data;      queueEntry.embeddedBlob.dataLen = blob->dataLen;      */      if (log_.dump()) {         char *dumpP = ::queueEntryToXml(&queueEntry, 200);         log_.dump(ME+".put()", string("Put blob to queue:") + dumpP);         xmlBlasterFree(dumpP);      }   }   ::ExceptionStruct exception;   int32_t numRemoved = queueP_->randomRemove(queueP_, &queueEntryArr, &exception);   freeQueueEntryArrInternal(&queueEntryArr);   if (*exception.errorCode != 0) throw convertFromQueueException(&exception);   return (long)numRemoved;}long SQLiteQueuePlugin::getNumOfEntries() const{   if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getNumOfEntries() failed");   return queueP_->getNumOfEntries(queueP_);}long SQLiteQueuePlugin::getMaxNumOfEntries() const{   if (queueP_ == 0) return property_.getMaxEntries(); // throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getNumOfEntries() failed");   return queueP_->getMaxNumOfEntries(queueP_);}int64_t SQLiteQueuePlugin::getNumOfBytes() const{   if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getNumOfBytes() failed");   return queueP_->getNumOfBytes(queueP_);}int64_t SQLiteQueuePlugin::getMaxNumOfBytes() const{     if (queueP_ == 0) return property_.getMaxBytes(); // throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getMaxNumOfBytes() failed");   return queueP_->getMaxNumOfBytes(queueP_);}void SQLiteQueuePlugin::clear(){   Lock lock(accessMutex_);   if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, clear() failed");   ::ExceptionStruct exception;   queueP_->clear(queueP_, &exception);}bool SQLiteQueuePlugin::empty() const{   if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, empty() failed");   return queueP_->empty(queueP_);}void SQLiteQueuePlugin::destroy(){   if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, destroy() failed");   ::ExceptionStruct exception;   queueP_->destroy(&queueP_, &exception);   if (*exception.errorCode != 0) throw convertFromQueueException(&exception);}// Exception conversion ....org::xmlBlaster::util::XmlBlasterException SQLiteQueuePlugin::convertFromQueueException(const ::ExceptionStruct *ex) const{   return org::xmlBlaster::util::XmlBlasterException(            (*ex->errorCode=='\0')?string("internal.unknown"):string(ex->errorCode),            string(""),            ME,            "en",            string(ex->message),            global_.getVersion() + " " + global_.getBuildTimestamp());}string SQLiteQueuePlugin::usage(){   std::string text = string("");   text += string("\nThe SQLite persistent queue plugin configuration:");   text += string("\n   -queue/connection/url [xmlBlasterClientCpp.db]");   text += string("\n                       The database file name (incl. path), defaults to the current directory.");   text += string("\n   -queue/connection/tableNamePrefix [XB_]");   text += string("\n                       The prefix for all tables in the database.");   text += ClientQueueProperty::usage();   return text;}}}}} // namespace/** * Customized logging output is handled by this method.  * We redirect logging output from the C implementation to our C++ logging plugin. * <p> * Please compile with <code>XMLBLASTER_PERSISTENT_QUEUE</code> defined. * </p> * @param queueP * @param currLevel The actual log level of the client * @param level The level of this log entry * @param location A string describing the code place * @param fmt The formatting string * @param ... Other variables to log, corresponds to 'fmt' * @see xmlBlaster/src/c/msgUtil.c: xmlBlasterDefaultLogging() is the default *      implementation */static void myLogger(void *logUserP,                      XMLBLASTER_LOG_LEVEL currLevel,                     XMLBLASTER_LOG_LEVEL level,                     const char *location, const char *fmt, ...){   /* Guess we need no more than 200 bytes. */   int n, size = 200;   char *p = 0;   va_list ap;   ::I_Queue *queueP = (::I_Queue *)logUserP;   //org::xmlBlaster::util::queue::SQLiteQueuePlugin *pluginP =   //      (org::xmlBlaster::util::queue::SQLiteQueuePlugin *)queueP->userObject;   //org::xmlBlaster::util::I_Log& log = pluginP->getLog();   if (queueP->userObject == 0) {      std::cout << "myLogger not initialized" << std::endl;      return;   }   org::xmlBlaster::util::I_Log& log = *((org::xmlBlaster::util::I_Log*)queueP->userObject);   if (level > currLevel) { /* XMLBLASTER_LOG_ERROR, XMLBLASTER_LOG_WARN, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_TRACE */      return;   }   if ((p = (char *)malloc (size)) == NULL)      return;   for (;;) {      /* Try to print in the allocated space. */      va_start(ap, fmt);      n = VSNPRINTF(p, size, fmt, ap); /* UNIX: vsnprintf(), WINDOWS: _vsnprintf() */      va_end(ap);      /* If that worked, print the string to console. */      if (n > -1 && n < size) {         if (level == XMLBLASTER_LOG_INFO)            log.info(location, p);         else if (level == XMLBLASTER_LOG_WARN)            log.warn(location, p);         else if (level == XMLBLASTER_LOG_ERROR)            log.error(location, p);         else            log.trace(location, p);         free(p);         return;      }      /* Else try again with more space. */      if (n > -1)    /* glibc 2.1 */         size = n+1; /* precisely what is needed */      else           /* glibc 2.0 */         size *= 2;  /* twice the old size */      if ((p = (char *)realloc (p, size)) == NULL) {         return;      }   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -