📄 sqlitequeueplugin.cpp
字号:
QueryKeyData queryKeyData = queryKeyFactory_.readObject(string(msgUnit.key)); SubscribeKey subscribeKey(global_, queryKeyData); QueryQosData queryQosData = queryQosFactory_.readObject(string(msgUnit.qos)); SubscribeQos subscribeQos(global_, queryQosData); SubscribeQueueEntry *pq = new SubscribeQueueEntry(global_, subscribeKey, subscribeQos, queueEntryC.priority, queueEntryC.uniqueId); if (log_.trace()) log_.trace(ME, "Got SubscribeQueueEntry from queue"); ret.insert(ret.end(), EntryType(*pq)); if (log_.trace()) log_.trace(ME, "SubscribeQueueEntry is reference countet"); } */ else { // TODO: How to handle: throw exception or remove the buggy entry? log_.error(ME + "::peekWithSamePriority", string("The queue entry embeddedType '") + queueEntryC.embeddedType + "' methodName='" + methodName + "' is not supported, we ignore it."); } } freeMsgUnitArr(msgUnitArrC); } freeQueueEntryArr(entriesC); return ret;}void SQLiteQueuePlugin::parseEmbeddedType(const string& embeddedType, string &type, string &methodName){ string::size_type pos = embeddedType.find("|"); if (pos == string::npos) { type = embeddedType; methodName = ""; return; } type = embeddedType.substr(0, pos); if (pos < embeddedType.size()) methodName = embeddedType.substr(pos+1); // No trim(): we assume no white spaces}long SQLiteQueuePlugin::randomRemove(const vector<EntryType>::const_iterator &start, const vector<EntryType>::const_iterator &end) { Lock lock(accessMutex_); if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, randomRemove() failed"); if (start == end || queueP_->empty(queueP_)) return 0; ::QueueEntryArr queueEntryArr; memset(&queueEntryArr, 0, sizeof(QueueEntryArr)); { vector<EntryType>::const_iterator iter = start; while (iter != end) { iter++; queueEntryArr.len++; } } if (queueEntryArr.len < 1) return 0; queueEntryArr.queueEntryArr = (QueueEntry *)calloc(queueEntryArr.len, sizeof(QueueEntry)); vector<EntryType>::const_iterator iter = start; for (int currIndex=0; iter != end; ++iter, currIndex++) { const EntryType &entryType = (*iter); const MsgQueueEntry &entry = *entryType; ::QueueEntry &queueEntry = queueEntryArr.queueEntryArr[currIndex]; // Copy C++ to C struct ... queueEntry.priority = entry.getPriority(); queueEntry.isPersistent = entry.isPersistent(); queueEntry.uniqueId = entry.getUniqueId(); queueEntry.sizeInBytes = entry.getSizeInBytes(); strncpy0(queueEntry.embeddedType, entry.getEmbeddedType().c_str(), QUEUE_ENTRY_EMBEDDEDTYPE_LEN); // "MSG_RAW|publish" "MSG_RAW|connect" queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0; /* const BlobHolder *blob = (const BlobHolder *)entry.getEmbeddedObject(); if (blob == 0) throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME, "put() failed, the entry " + entry.getLogId() + " returned NULL for embeddedObject"); queueEntry.embeddedBlob.data = blob->data; queueEntry.embeddedBlob.dataLen = blob->dataLen; */ if (log_.dump()) { char *dumpP = ::queueEntryToXml(&queueEntry, 200); log_.dump(ME+".put()", string("Put blob to queue:") + dumpP); xmlBlasterFree(dumpP); } } ::ExceptionStruct exception; int32_t numRemoved = queueP_->randomRemove(queueP_, &queueEntryArr, &exception); freeQueueEntryArrInternal(&queueEntryArr); if (*exception.errorCode != 0) throw convertFromQueueException(&exception); return (long)numRemoved;}long SQLiteQueuePlugin::getNumOfEntries() const{ if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getNumOfEntries() failed"); return queueP_->getNumOfEntries(queueP_);}long SQLiteQueuePlugin::getMaxNumOfEntries() const{ if (queueP_ == 0) return property_.getMaxEntries(); // throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getNumOfEntries() failed"); return queueP_->getMaxNumOfEntries(queueP_);}int64_t SQLiteQueuePlugin::getNumOfBytes() const{ if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getNumOfBytes() failed"); return queueP_->getNumOfBytes(queueP_);}int64_t SQLiteQueuePlugin::getMaxNumOfBytes() const{ if (queueP_ == 0) return property_.getMaxBytes(); // throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, getMaxNumOfBytes() failed"); return queueP_->getMaxNumOfBytes(queueP_);}void SQLiteQueuePlugin::clear(){ Lock lock(accessMutex_); if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, clear() failed"); ::ExceptionStruct exception; queueP_->clear(queueP_, &exception);}bool SQLiteQueuePlugin::empty() const{ if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, empty() failed"); return queueP_->empty(queueP_);}void SQLiteQueuePlugin::destroy(){ if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, destroy() failed"); ::ExceptionStruct exception; queueP_->destroy(&queueP_, &exception); if (*exception.errorCode != 0) throw convertFromQueueException(&exception);}// Exception conversion ....org::xmlBlaster::util::XmlBlasterException SQLiteQueuePlugin::convertFromQueueException(const ::ExceptionStruct *ex) const{ return org::xmlBlaster::util::XmlBlasterException( (*ex->errorCode=='\0')?string("internal.unknown"):string(ex->errorCode), string(""), ME, "en", string(ex->message), global_.getVersion() + " " + global_.getBuildTimestamp());}string SQLiteQueuePlugin::usage(){ std::string text = string(""); text += string("\nThe SQLite persistent queue plugin configuration:"); text += string("\n -queue/connection/url [xmlBlasterClientCpp.db]"); text += string("\n The database file name (incl. path), defaults to the current directory."); text += string("\n -queue/connection/tableNamePrefix [XB_]"); text += string("\n The prefix for all tables in the database."); text += ClientQueueProperty::usage(); return text;}}}}} // namespace/** * Customized logging output is handled by this method. * We redirect logging output from the C implementation to our C++ logging plugin. * <p> * Please compile with <code>XMLBLASTER_PERSISTENT_QUEUE</code> defined. * </p> * @param queueP * @param currLevel The actual log level of the client * @param level The level of this log entry * @param location A string describing the code place * @param fmt The formatting string * @param ... Other variables to log, corresponds to 'fmt' * @see xmlBlaster/src/c/msgUtil.c: xmlBlasterDefaultLogging() is the default * implementation */static void myLogger(void *logUserP, XMLBLASTER_LOG_LEVEL currLevel, XMLBLASTER_LOG_LEVEL level, const char *location, const char *fmt, ...){ /* Guess we need no more than 200 bytes. */ int n, size = 200; char *p = 0; va_list ap; ::I_Queue *queueP = (::I_Queue *)logUserP; //org::xmlBlaster::util::queue::SQLiteQueuePlugin *pluginP = // (org::xmlBlaster::util::queue::SQLiteQueuePlugin *)queueP->userObject; //org::xmlBlaster::util::I_Log& log = pluginP->getLog(); if (queueP->userObject == 0) { std::cout << "myLogger not initialized" << std::endl; return; } org::xmlBlaster::util::I_Log& log = *((org::xmlBlaster::util::I_Log*)queueP->userObject); if (level > currLevel) { /* XMLBLASTER_LOG_ERROR, XMLBLASTER_LOG_WARN, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_TRACE */ return; } if ((p = (char *)malloc (size)) == NULL) return; for (;;) { /* Try to print in the allocated space. */ va_start(ap, fmt); n = VSNPRINTF(p, size, fmt, ap); /* UNIX: vsnprintf(), WINDOWS: _vsnprintf() */ va_end(ap); /* If that worked, print the string to console. */ if (n > -1 && n < size) { if (level == XMLBLASTER_LOG_INFO) log.info(location, p); else if (level == XMLBLASTER_LOG_WARN) log.warn(location, p); else if (level == XMLBLASTER_LOG_ERROR) log.error(location, p); else log.trace(location, p); free(p); return; } /* Else try again with more space. */ if (n > -1) /* glibc 2.1 */ size = n+1; /* precisely what is needed */ else /* glibc 2.0 */ size *= 2; /* twice the old size */ if ((p = (char *)realloc (p, size)) == NULL) { return; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -