⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sqlitequeueplugin.cpp

📁 java开源的企业总线.xmlBlaster
💻 CPP
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------------Name:      SQLiteQueuePlugin.cppProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileAuthor:    xmlBlaster@marcelruff.info------------------------------------------------------------------------------*/#include <util/queue/SQLiteQueuePlugin.h>#include <util/XmlBlasterException.h>#include <util/Global.h>#include <stdarg.h>           // va_start for logging#include <stdio.h>            // vsnprintf for g++ 2.9x only#include <util/lexical_cast.h>#include <util/MessageUnit.h>#include <util/queue/ConnectQueueEntry.h>#include <util/queue/SubscribeQueueEntry.h>#include <util/queue/UnSubscribeQueueEntry.h>#include <util/queue/PublishQueueEntry.h>#include <socket/xmlBlasterSocket.h> // C xmlBlaster client library: for msgUnit serialize#include <util/queue/QueueInterface.h> // The C implementation interfaceusing namespace std;using namespace org::xmlBlaster::util;using namespace org::xmlBlaster::util::thread;using namespace org::xmlBlaster::util::qos::storage;using namespace org::xmlBlaster::util::key;using namespace org::xmlBlaster::util::qos;using namespace org::xmlBlaster::client::qos;using namespace org::xmlBlaster::client::key;//static ::XmlBlasterLogging loggingFp = ::xmlBlasterDefaultLogging;static void myLogger(void *logUserP,                      XMLBLASTER_LOG_LEVEL currLevel,                     XMLBLASTER_LOG_LEVEL level,                     const char *location, const char *fmt, ...);namespace org { namespace xmlBlaster { namespace util { namespace queue {SQLiteQueuePlugin::SQLiteQueuePlugin(Global& global, const ClientQueueProperty& property)   : ME("SQLiteQueuePlugin"),      global_(global),      log_(global.getLog("org.xmlBlaster.util.queue")),      property_(property),      queueP_(0),      connectQosFactory_(global_),     statusQosFactory_(global_),     msgKeyFactory_(global_),     msgQosFactory_(global_),     accessMutex_(){   if (log_.call()) log_.call(ME, "Constructor queue [" + getType() + "][" + getVersion() + "] ...");   /*    TODO: Pass basic configuration from plugin key/values similar to (see xmlBlaster.properties)     QueuePlugin[SQLite][1.0]=SQLiteQueuePlugin,         url=/${user.home}${file.separator}tmp${file.separator}$_{xmlBlaster_uniqueId}.db,\         user=sqlite,\         password=,\         connectionPoolSize=1,\         connectionBusyTimeout=90000,\         maxWaitingThreads=300,\         tableNamePrefix=XB_,\         entriesTableName=ENTRIES,\         dbAdmin=true   */   const std::string classRelating = "QueuePlugin["+getType()+"]["+getVersion()+"]"; // "QueuePlugin[SQLite][1.0]"   const std::string instanceRelating = property.getPropertyPrefix();                // == "connection"   // Should it be "queue/connection/tableNamePrefix" or "queue/QueuePlugin[SQLite][1.0]/tableNamePrefix"   // The first allows different instances with changing "connection" to e.g. "tailback" etc.   if (global_.getProperty().propertyExists(classRelating, true)) {      log_.warn(ME, "Your setting of property '" + classRelating + "' is not supported");   }   std::string defaultPath = ""; // for example: "/home/joe/tmp/" or "C:\Documents and Settings\marcel\tmp"   if (global_.getProperty().get("user.home", "") != "")      defaultPath = global_.getProperty().get("user.home", "") +                    global_.getProperty().get("file.separator", "");                    //+ "tmp" +                                     // We currently can't create missing directories, TODO!!!                    //global_.getProperty().get("file.separator", "");   const std::string url = global_.getProperty().get("queue/"+instanceRelating+"/url", defaultPath+"xmlBlasterClientCpp.db");  // "queue/connection/url"   const std::string queueName = global_.getProperty().get("queue/"+instanceRelating+"/queueName", instanceRelating + "_" + global_.getStrippedImmutableId()); // "connection_clientJoe2"   const std::string tableNamePrefix = global_.getProperty().get("queue/"+instanceRelating+"/tableNamePrefix", "XB_");// "queue/connection/tableNamePrefix"   ::ExceptionStruct exception;   ::QueueProperties queueProperties;   memset(&queueProperties, 0, sizeof(QueueProperties));   strncpy0(queueProperties.dbName, url.c_str(), QUEUE_DBNAME_MAX);   strncpy0(queueProperties.queueName, queueName.c_str(), QUEUE_ID_MAX);   strncpy0(queueProperties.tablePrefix, tableNamePrefix.c_str(), QUEUE_PREFIX_MAX);   queueProperties.maxNumOfEntries = (int32_t)property.getMaxEntries();   queueProperties.maxNumOfBytes = property.getMaxBytes();   queueProperties.logFp = myLogger;   queueProperties.logLevel = (log_.call() || log_.trace()) ? XMLBLASTER_LOG_TRACE : XMLBLASTER_LOG_INFO;   queueProperties.userObject = &log_;   queueP_ = createQueue(&queueProperties, &exception); // &log_ Used in myLogger(), see above   if (*exception.errorCode != 0) throw convertFromQueueException(&exception);   log_.info(ME, "Created queue [" + getType() + "][" + getVersion() + "], queue/"+instanceRelating+"/url='" +                 queueProperties.dbName + "', queue/"+instanceRelating+"/queueName='" + queueProperties.queueName +                 "', queue/"+instanceRelating+"/maxEntries=" + lexical_cast<string>(queueProperties.maxNumOfEntries));}/*SQLiteQueuePlugin::SQLiteQueuePlugin(const SQLiteQueuePlugin& queue)   : ME("SQLiteQueuePlugin"),      global_(queue.global_),      log_(queue.log_),      property_(queue.property_),      queueP_(queue.queueP_),      accessMutex_(){}SQLiteQueuePlugin& SQLiteQueuePlugin::operator =(const SQLiteQueuePlugin& queue){   Lock lock(queue.accessMutex_);   property_   = queue.property_;   queueP_    = queue.queueP_;   return *this;}*/SQLiteQueuePlugin::~SQLiteQueuePlugin(){   if (log_.call()) log_.call(ME, "destructor");   if (queueP_) {      Lock lock(accessMutex_);      ::ExceptionStruct exception;      queueP_->shutdown(&queueP_, &exception); // NULLs the queueP_      if (*exception.errorCode != 0) {         const int ERRORSTR_LEN = 1024;         char errorString[ERRORSTR_LEN];         log_.warn(ME, string("Ignoring problem during shutdown: ") + getExceptionStr(errorString, ERRORSTR_LEN, &exception));      }   }} void SQLiteQueuePlugin::put(const MsgQueueEntry &entry){   if (log_.call()) log_.call(ME, "::put");   if (log_.dump()) log_.dump(ME+".put()", string("The msg entry is: ")  + entry.toXml());   Lock lock(accessMutex_);   if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, put() failed");   ::ExceptionStruct exception;   ::QueueEntry queueEntry;   // Copy C++ to C struct ...   queueEntry.priority = entry.getPriority();   queueEntry.isPersistent = entry.isPersistent();   queueEntry.uniqueId = entry.getUniqueId();   queueEntry.sizeInBytes = entry.getSizeInBytes();   strncpy0(queueEntry.embeddedType, entry.getEmbeddedType().c_str(), QUEUE_ENTRY_EMBEDDEDTYPE_LEN);  // "MSG_RAW|publish"   queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;   // dump MsgQueueEntry with SOCKET protocol into C ::MsgUnit ...      const BlobHolder *blob = (const BlobHolder *)entry.getEmbeddedObject();   if (blob == 0) throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME, "put() failed, the entry " + entry.getLogId() + " returned NULL for embeddedObject");   queueEntry.embeddedBlob.data = blob->data;   queueEntry.embeddedBlob.dataLen = blob->dataLen;   if (log_.dump()) {      char *dumpP = blobDump(&queueEntry.embeddedBlob);      log_.dump(ME+".put()", string("Put blob to queue:") + dumpP);      ::xmlBlasterFree(dumpP);   }   // Push into C persistent queue ...   queueP_->put(queueP_, &queueEntry, &exception);   if (*exception.errorCode != 0) throw convertFromQueueException(&exception);}const vector<EntryType> SQLiteQueuePlugin::peekWithSamePriority(long maxNumOfEntries, long maxNumOfBytes) const{   Lock lock(accessMutex_);   if (queueP_ == 0) throw XmlBlasterException(RESOURCE_DB_UNAVAILABLE, ME, "Sorry, no persistent queue is available, peekWithSamePriority() failed");   vector<EntryType> ret;   if (queueP_->empty(queueP_)) return ret;   if (log_.call()) log_.call(ME, "peekWithSamePriority maxNumOfEntries=" + lexical_cast<std::string>(maxNumOfEntries) + " maxNumOfBytes=" + lexical_cast<std::string>(maxNumOfBytes));   ::ExceptionStruct exception;   ::QueueEntryArr *entriesC = queueP_->peekWithSamePriority(queueP_, (int32_t)maxNumOfEntries, maxNumOfBytes, &exception);   if (*exception.errorCode != 0) throw convertFromQueueException(&exception);   // Now we need to copy the C results into C++ classes ...   for (size_t ii=0; ii<entriesC->len; ii++) {      ::QueueEntry &queueEntryC = entriesC->queueEntryArr[ii];      string type, methodName;      parseEmbeddedType(queueEntryC.embeddedType, type, methodName);      if (type != Constants::ENTRY_TYPE_MSG_RAW) {         string embedded = queueEntryC.embeddedType;         freeQueueEntryArr(entriesC);         throw XmlBlasterException(INTERNAL_UNKNOWN, ME + "::peekWithSamePriority", string("The queue entry embeddedType '") + embedded + "' type='" + type + "' is not supported");      }      if (log_.dump()) {         char *dumpP = blobDump(&queueEntryC.embeddedBlob);         log_.dump(ME+".peekWithSamePriority()", string("Retrieved blob from queue:") + dumpP);         ::xmlBlasterFree(dumpP);      }      ::MsgUnitArr *msgUnitArrC = ::parseMsgUnitArr(queueEntryC.embeddedBlob.dataLen, queueEntryC.embeddedBlob.data);      for (size_t j=0; msgUnitArrC!=0 && j<msgUnitArrC->len; j++) { // TODO: Collect a PUBLISH_ARR !!! (currently we transform it to single publish()es)         ::MsgUnit &msgUnit = msgUnitArrC->msgUnitArr[j];         if (log_.dump()) {            char *dumpP = ::messageUnitToXmlLimited(&msgUnit, 128);            log_.dump(ME+".peekWithSamePriority()", string("Retrieved and parsed C message from queue:") + dumpP);            ::xmlBlasterFree(dumpP);         }         if (methodName == MethodName::PUBLISH) {            MsgKeyData msgKeyData = msgKeyFactory_.readObject(string(msgUnit.key));            MsgQosData msgQosData = msgQosFactory_.readObject(string(msgUnit.qos));            MessageUnit messageUnit(msgKeyData, msgUnit.contentLen, (const unsigned char*)msgUnit.content, msgQosData);            PublishQueueEntry *pq = new PublishQueueEntry(global_, messageUnit,                                           queueEntryC.priority, queueEntryC.uniqueId);            if (log_.trace()) log_.trace(ME, "Got PublishQueueEntry from queue");            ret.insert(ret.end(), EntryType(*pq));            if (log_.trace()) log_.trace(ME, "PublishQueueEntry is reference countet");         }         else if (methodName == MethodName::CONNECT) {            ConnectQosRef connectQos = connectQosFactory_.readObject(string(msgUnit.qos));            ConnectQueueEntry *pq = new ConnectQueueEntry(global_, connectQos,                                           queueEntryC.priority, queueEntryC.uniqueId);            if (log_.trace()) log_.trace(ME, "Got ConnectQueueEntry from queue");            ret.insert(ret.end(), EntryType(*pq));            if (log_.trace()) log_.trace(ME, "ConnectQueueEntry is reference countet");         }         /* TODO: queryKeyFactory and queryQosFactory!         else if (methodName == MethodName::SUBSCRIBE) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -