📄 sqlitequeueplugin.cpp
字号:
/*------------------------------------------------------------------------------Name: SQLiteQueuePlugin.cppProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileAuthor: xmlBlaster@marcelruff.info------------------------------------------------------------------------------*/#include <util/queue/SQLiteQueuePlugin.h>#include <util/XmlBlasterException.h>#include <util/Global.h>#include <stdarg.h> // va_start for logging#include <stdio.h> // vsnprintf for g++ 2.9x only#include <util/lexical_cast.h>#include <util/MessageUnit.h>#include <util/queue/ConnectQueueEntry.h>#include <util/queue/SubscribeQueueEntry.h>#include <util/queue/UnSubscribeQueueEntry.h>#include <util/queue/PublishQueueEntry.h>#include <socket/xmlBlasterSocket.h> // C xmlBlaster client library: for msgUnit serialize#include <util/queue/QueueInterface.h> // The C implementation interfaceusing namespace std;using namespace org::xmlBlaster::util;using namespace org::xmlBlaster::util::thread;using namespace org::xmlBlaster::util::qos::storage;using namespace org::xmlBlaster::util::key;using namespace org::xmlBlaster::util::qos;using namespace org::xmlBlaster::client::qos;using namespace org::xmlBlaster::client::key;//static ::XmlBlasterLogging loggingFp = ::xmlBlasterDefaultLogging;static void myLogger(void *logUserP, XMLBLASTER_LOG_LEVEL currLevel, XMLBLASTER_LOG_LEVEL level, const char *location, const char *fmt, ...);namespace org { namespace xmlBlaster { namespace util { namespace queue {SQLiteQueuePlugin::SQLiteQueuePlugin(Global& global, const ClientQueueProperty& property) : ME("SQLiteQueuePlugin"), global_(global), log_(global.getLog("org.xmlBlaster.util.queue")), property_(property), queueP_(0), connectQosFactory_(global_), statusQosFactory_(global_), msgKeyFactory_(global_), msgQosFactory_(global_), accessMutex_(){ if (log_.call()) log_.call(ME, "Constructor queue [" + getType() + "][" + getVersion() + "] ..."); /* TODO: Pass basic configuration from plugin key/values similar to (see xmlBlaster.properties) QueuePlugin[SQLite][1.0]=SQLiteQueuePlugin, url=/${user.home}${file.separator}tmp${file.separator}$_{xmlBlaster_uniqueId}.db,\ user=sqlite,\ password=,\ connectionPoolSize=1,\ connectionBusyTimeout=90000,\ maxWaitingThreads=300,\ tableNamePrefix=XB_,\ entriesTableName=ENTRIES,\ dbAdmin=true */ const std::string classRelating = "QueuePlugin["+getType()+"]["+getVersion()+"]"; // "QueuePlugin[SQLite][1.0]" const std::string instanceRelating = property.getPropertyPrefix(); // == "connection" // Should it be "queue/connection/tableNamePrefix" or "queue/QueuePlugin[SQLite][1.0]/tableNamePrefix" // The first allows different instances with changing "connection" to e.g. "tailback" etc. if (global_.getProperty().propertyExists(classRelating, true)) { log_.warn(ME, "Your setting of property '" + classRelating + "' is not supported"); } std::string defaultPath = ""; // for example: "/home/joe/tmp/" or "C:\Documents and Settings\marcel\tmp" if (global_.getProperty().get("user.home", "") != "") defaultPath = global_.getProperty().get("user.home", "") + global_.getProperty().get("file.separator", ""); //+ "tmp" + // We currently can't create missing directories, TODO!!! //global_.getProperty().get("file.separator", ""); const std::string url = global_.getProperty().get("queue/"+instanceRelating+"/url", defaultPath+"xmlBlasterClientCpp.db"); // "queue/connection/url" const std::string queueName = global_.getProperty().get("queue/"+instanceRelating+"/queueName", instanceRelating + "_" + global_.getStrippedImmutableId()); // "connection_clientJoe2" const std::string tableNamePrefix = global_.getProperty().get("queue/"+instanceRelating+"/tableNamePrefix", "XB_");// "queue/connection/tableNamePrefix" ::ExceptionStruct exception; ::QueueProperties queueProperties; memset(&queueProperties, 0, sizeof(QueueProperties)); strncpy0(queueProperties.dbName, url.c_str(), QUEUE_DBNAME_MAX); strncpy0(queueProperties.queueName, queueName.c_str(), QUEUE_ID_MAX); strncpy0(queueProperties.tablePrefix, tableNamePrefix.c_str(), QUEUE_PREFIX_MAX); queueProperties.maxNumOfEntries = (int32_t)property.getMaxEntries(); queueProperties.maxNumOfBytes = property.getMaxBytes(); queueProperties.logFp = myLogger; queueProperties.logLevel = (log_.call() || log_.trace()) ? XMLBLASTER_LOG_TRACE : XMLBLASTER_LOG_INFO; queueProperties.userObject = &log_; queueP_ = createQueue(&queueProperties, &exception); // &log_ Used in myLogger(), see above if (*exception.errorCode != 0) throw convertFromQueueException(&exception); log_.info(ME, "Created queue [" + getType() + "][" + getVersion() + "], queue/"+instanceRelating+"/url='" + queueProperties.dbName + "', queue/"+instanceRelating+"/queueName='" + queueProperties.queueName + "', queue/"+instanceRelating+"/maxEntries=" + lexical_cast<string>(queueProperties.maxNumOfEntries));}/*SQLiteQueuePlugin::SQLiteQueuePlugin(const SQLiteQueuePlugin& queue) : ME("SQLiteQueuePlugin"), global_(queue.global_), log_(queue.log_), property_(queue.property_), queueP_(queue.queueP_), accessMutex_(){}SQLiteQueuePlugin& SQLiteQueuePlugin::operator =(const SQLiteQueuePlugin& queue){ Lock lock(queue.accessMutex_); property_ = queue.property_; queueP_ = queue.queueP_; return *this;}*/SQLiteQueuePlugin::~SQLiteQueuePlugin(){ if (log_.call()) log_.call(ME, "destructor"); if (queueP_) { Lock lock(accessMutex_); ::ExceptionStruct exception; queueP_->shutdown(&queueP_, &exception); // NULLs the queueP_ if (*exception.errorCode != 0) { const int ERRORSTR_LEN = 1024; char errorString[ERRORSTR_LEN]; log_.warn(ME, string("Ignoring problem during shutdown: ") + getExceptionStr(errorString, ERRORSTR_LEN, &exception)); } }} void SQLiteQueuePlugin::put(const MsgQueueEntry &entry){ if (log_.call()) log_.call(ME, "::put"); if (log_.dump()) log_.dump(ME+".put()", string("The msg entry is: ") + entry.toXml()); Lock lock(accessMutex_); if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, put() failed"); ::ExceptionStruct exception; ::QueueEntry queueEntry; // Copy C++ to C struct ... queueEntry.priority = entry.getPriority(); queueEntry.isPersistent = entry.isPersistent(); queueEntry.uniqueId = entry.getUniqueId(); queueEntry.sizeInBytes = entry.getSizeInBytes(); strncpy0(queueEntry.embeddedType, entry.getEmbeddedType().c_str(), QUEUE_ENTRY_EMBEDDEDTYPE_LEN); // "MSG_RAW|publish" queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0; // dump MsgQueueEntry with SOCKET protocol into C ::MsgUnit ... const BlobHolder *blob = (const BlobHolder *)entry.getEmbeddedObject(); if (blob == 0) throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME, "put() failed, the entry " + entry.getLogId() + " returned NULL for embeddedObject"); queueEntry.embeddedBlob.data = blob->data; queueEntry.embeddedBlob.dataLen = blob->dataLen; if (log_.dump()) { char *dumpP = blobDump(&queueEntry.embeddedBlob); log_.dump(ME+".put()", string("Put blob to queue:") + dumpP); ::xmlBlasterFree(dumpP); } // Push into C persistent queue ... queueP_->put(queueP_, &queueEntry, &exception); if (*exception.errorCode != 0) throw convertFromQueueException(&exception);}const vector<EntryType> SQLiteQueuePlugin::peekWithSamePriority(long maxNumOfEntries, long maxNumOfBytes) const{ Lock lock(accessMutex_); if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, peekWithSamePriority() failed"); vector<EntryType> ret; if (queueP_->empty(queueP_)) return ret; if (log_.call()) log_.call(ME, "peekWithSamePriority maxNumOfEntries=" + lexical_cast<std::string>(maxNumOfEntries) + " maxNumOfBytes=" + lexical_cast<std::string>(maxNumOfBytes)); ::ExceptionStruct exception; ::QueueEntryArr *entriesC = queueP_->peekWithSamePriority(queueP_, (int32_t)maxNumOfEntries, maxNumOfBytes, &exception); if (*exception.errorCode != 0) throw convertFromQueueException(&exception); // Now we need to copy the C results into C++ classes ... for (size_t ii=0; ii<entriesC->len; ii++) { ::QueueEntry &queueEntryC = entriesC->queueEntryArr[ii]; string type, methodName; parseEmbeddedType(queueEntryC.embeddedType, type, methodName); if (type != Constants::ENTRY_TYPE_MSG_RAW) { string embedded = queueEntryC.embeddedType; freeQueueEntryArr(entriesC); throw XmlBlasterException(INTERNAL_UNKNOWN, ME + "::peekWithSamePriority", string("The queue entry embeddedType '") + embedded + "' type='" + type + "' is not supported"); } if (log_.dump()) { char *dumpP = blobDump(&queueEntryC.embeddedBlob); log_.dump(ME+".peekWithSamePriority()", string("Retrieved blob from queue:") + dumpP); ::xmlBlasterFree(dumpP); } ::MsgUnitArr *msgUnitArrC = ::parseMsgUnitArr(queueEntryC.embeddedBlob.dataLen, queueEntryC.embeddedBlob.data); for (size_t j=0; msgUnitArrC!=0 && j<msgUnitArrC->len; j++) { // TODO: Collect a PUBLISH_ARR !!! (currently we transform it to single publish()es) ::MsgUnit &msgUnit = msgUnitArrC->msgUnitArr[j]; if (log_.dump()) { char *dumpP = ::messageUnitToXmlLimited(&msgUnit, 128); log_.dump(ME+".peekWithSamePriority()", string("Retrieved and parsed C message from queue:") + dumpP); ::xmlBlasterFree(dumpP); } if (methodName == MethodName::PUBLISH) { MsgKeyData msgKeyData = msgKeyFactory_.readObject(string(msgUnit.key)); MsgQosData msgQosData = msgQosFactory_.readObject(string(msgUnit.qos)); MessageUnit messageUnit(msgKeyData, msgUnit.contentLen, (const unsigned char*)msgUnit.content, msgQosData); PublishQueueEntry *pq = new PublishQueueEntry(global_, messageUnit, queueEntryC.priority, queueEntryC.uniqueId); if (log_.trace()) log_.trace(ME, "Got PublishQueueEntry from queue"); ret.insert(ret.end(), EntryType(*pq)); if (log_.trace()) log_.trace(ME, "PublishQueueEntry is reference countet"); } else if (methodName == MethodName::CONNECT) { ConnectQosRef connectQos = connectQosFactory_.readObject(string(msgUnit.qos)); ConnectQueueEntry *pq = new ConnectQueueEntry(global_, connectQos, queueEntryC.priority, queueEntryC.uniqueId); if (log_.trace()) log_.trace(ME, "Got ConnectQueueEntry from queue"); ret.insert(ret.end(), EntryType(*pq)); if (log_.trace()) log_.trace(ME, "ConnectQueueEntry is reference countet"); } /* TODO: queryKeyFactory and queryQosFactory! else if (methodName == MethodName::SUBSCRIBE) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -