⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cachequeueplugin.cpp

📁 java开源的企业总线.xmlBlaster
💻 CPP
字号:
/*------------------------------------------------------------------------------Name:      CacheQueuePlugin.cppProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/#include <util/queue/CacheQueuePlugin.h>#include <util/queue/QueueFactory.h>#include <util/XmlBlasterException.h>#include <util/Global.h>#ifdef XMLBLASTER_PERSISTENT_QUEUE // to compile on Windows#  include <util/queue/SQLiteQueuePlugin.h> // temporary for usage -> remove again#endifusing namespace std;using namespace org::xmlBlaster::util;using namespace org::xmlBlaster::util::thread;using namespace org::xmlBlaster::util::qos::storage;namespace org { namespace xmlBlaster { namespace util { namespace queue {CacheQueuePlugin::CacheQueuePlugin(org::xmlBlaster::util::Global& global, const org::xmlBlaster::util::qos::storage::ClientQueueProperty& property)   : ME("CacheQueuePlugin"),      global_(global),      log_(global.getLog("org.xmlBlaster.util.queue")),      property_(property),      transientQueueP_(0),      persistentQueueP_(0),      accessMutex_(){   // TODO: type/version should be set from outside!!!   transientQueueP_ = &QueueFactory::getFactory().getPlugin(global_, property, "RAM", "1.0");   try {      persistentQueueP_ = &QueueFactory::getFactory().getPlugin(global_, property, "SQLite", "1.0");      // Note: On startup we can only load the highest priority in a bulk, peekWithSamePriority() does not support to get all!      reloadFromPersistentStore();   }   catch (const XmlBlasterException &e) {      log_.warn(ME, "No persistent queue is available, we continue RAM based. Reason: " + e.getMessage());   }   log_.info(ME, "Created queue [" + getType() + "][" + getVersion() + "]");}/*CacheQueuePlugin::CacheQueuePlugin(const CacheQueuePlugin& queue)   : ME("CacheQueuePlugin"),      global_(queue.global_),      log_(queue.log_),      property_(queue.property_),      storage_(queue.storage_),      accessMutex_(){   numOfBytes_ = queue.numOfBytes_;}CacheQueuePlugin& CacheQueuePlugin::operator =(const CacheQueuePlugin& queue){   Lock lock(queue.accessMutex_);   property_   = queue.property_;   storage_    = queue.storage_;   numOfBytes_ = queue.numOfBytes_;   return *this;}*/CacheQueuePlugin::~CacheQueuePlugin(){   if (log_.call()) log_.call(ME, "destructor");   QueueFactory::getFactory().releasePlugin(transientQueueP_);   if (persistentQueueP_) QueueFactory::getFactory().releasePlugin(persistentQueueP_);} void CacheQueuePlugin::put(const MsgQueueEntry &entry){   if (log_.call()) log_.call(ME, "::put");   Lock lock(accessMutex_);   transientQueueP_->put(entry);   if (persistentQueueP_) {      if (entry.isPersistent()) {         try {           persistentQueueP_->put(entry);         }         catch (const XmlBlasterException &e) {            log_.warn(ME, "Ignoring problem to put entry into persistent queue, we are handling it transient: " + e.getMessage());         }      }   }}long CacheQueuePlugin::reloadFromPersistentStore() const{   if (persistentQueueP_ && transientQueueP_->getNumOfEntries() == 0 && persistentQueueP_->getNumOfEntries() > 0) {      // On startup shuffle them to the transient queue (only the highest priority is accessible with our I_Queue API)      const vector<EntryType> vec = persistentQueueP_->peekWithSamePriority(-1, -1);      long count = 0;      vector<EntryType>::const_iterator iter = vec.begin();      for (; iter != vec.end(); ++iter) {         const EntryType &entryType = (*iter);         transientQueueP_->put(*entryType);         count++;      }      return count;   }   return 0;}const vector<EntryType> CacheQueuePlugin::peekWithSamePriority(long maxNumOfEntries, long maxNumOfBytes) const{   Lock lock(accessMutex_);   vector<EntryType> vec = transientQueueP_->peekWithSamePriority(maxNumOfEntries, maxNumOfBytes);   if (vec.size() == 0) {      long count = reloadFromPersistentStore();      if (count > 0) {         return transientQueueP_->peekWithSamePriority(maxNumOfEntries, maxNumOfBytes);      }   }   return vec;}long CacheQueuePlugin::randomRemove(const vector<EntryType>::const_iterator &start, const vector<EntryType>::const_iterator &end) {   Lock lock(accessMutex_);   long count = transientQueueP_->randomRemove(start, end);   if (persistentQueueP_) {      vector<EntryType> persistents;      vector<EntryType>::const_iterator iter = start;      while (iter != end) {         const EntryType &entryType = (*iter);         if (entryType->isPersistent()) {            persistents.push_back(entryType);         }         iter++;      }      try {         persistentQueueP_->randomRemove(persistents.begin(), persistents.end());      }      catch (const XmlBlasterException &e) {         log_.warn(ME, "Ignoring problem to remove entry from persistent queue, we remove it from the transient queue only: " + e.getMessage());      }   }   return count;}long CacheQueuePlugin::getNumOfEntries() const{   return transientQueueP_->getNumOfEntries();}long CacheQueuePlugin::getMaxNumOfEntries() const{   return transientQueueP_->getMaxNumOfEntries();}int64_t CacheQueuePlugin::getNumOfBytes() const{   return transientQueueP_->getNumOfBytes();}int64_t CacheQueuePlugin::getMaxNumOfBytes() const{   return transientQueueP_->getMaxNumOfBytes();}void CacheQueuePlugin::clear(){   Lock lock(accessMutex_);   transientQueueP_->clear();   if (persistentQueueP_) {      try {         persistentQueueP_->clear();      }      catch (const XmlBlasterException &e) {         log_.warn(ME, "Ignoring problem to put entry into persistent queue, we are handling it transient: " + e.getMessage());      }   }}bool CacheQueuePlugin::empty() const{   return transientQueueP_->empty();}void CacheQueuePlugin::destroy(){   transientQueueP_->destroy();   if (persistentQueueP_) {      try {         persistentQueueP_->destroy();      }      catch (const XmlBlasterException &e) {         log_.warn(ME, "Ignoring problem to destroy the persistent queue: " + e.getMessage());      }   }}string CacheQueuePlugin::usage(){   std::string text = string("");   text += string("\nThe CACHE queue plugin configuration:");#ifdef XMLBLASTER_PERSISTENT_QUEUE // to compile on Windows   text += SQLiteQueuePlugin::usage();   // TODO: depending on persistency#else   text += ClientQueueProperty::usage();#endif   return text;}}}}} // namespace

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -