⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ramqueueplugin.cpp

📁 java开源的企业总线.xmlBlaster
💻 CPP
字号:
/*------------------------------------------------------------------------------Name:      RamQueuePlugin.cppProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/#include <util/queue/RamQueuePlugin.h>#include <util/XmlBlasterException.h>#include <util/Global.h>using namespace std;using namespace org::xmlBlaster::util;using namespace org::xmlBlaster::util::thread;using namespace org::xmlBlaster::util::qos::storage;namespace org { namespace xmlBlaster { namespace util { namespace queue {RamQueuePlugin::RamQueuePlugin(Global& global, const ClientQueueProperty& property)   : ME("RamQueuePlugin"),      global_(global),      log_(global.getLog("org.xmlBlaster.util.queue")),      property_(property),      storage_(),      accessMutex_(){   numOfBytes_ = 0;   log_.info(ME, "Created queue [" + getType() + "][" + getVersion() + "]");}RamQueuePlugin::RamQueuePlugin(const RamQueuePlugin& queue)   : ME("RamQueuePlugin"),      global_(queue.global_),      log_(queue.log_),      property_(queue.property_),      storage_(queue.storage_),      accessMutex_(){   numOfBytes_ = queue.numOfBytes_;}RamQueuePlugin& RamQueuePlugin::operator =(const RamQueuePlugin& queue){   Lock lock(queue.accessMutex_);   property_   = queue.property_;   storage_    = queue.storage_;   numOfBytes_ = queue.numOfBytes_;   return *this;}RamQueuePlugin::~RamQueuePlugin(){   if (log_.call()) log_.call(ME, "destructor");   if (!storage_.empty()) {      Lock lock(accessMutex_);      storage_.erase(storage_.begin(), storage_.end());   }} void RamQueuePlugin::put(const MsgQueueEntry &entry){   if (log_.call()) log_.call(ME, "::put");   if (log_.dump()) log_.dump(ME, string("::put, the entry is: ")  + entry.toXml());   Lock lock(accessMutex_);   if (numOfBytes_+entry.getSizeInBytes() > ((size_t)property_.getMaxBytes()) ) {      throw XmlBlasterException(RESOURCE_OVERFLOW_QUEUE_BYTES, ME + "::put", "client queue");   }   if (storage_.size() >= (size_t)property_.getMaxEntries() ) {      throw XmlBlasterException(RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME + "::put", "client queue");   }   try {      const EntryType help(*entry.getClone());      storage_.insert(help);      numOfBytes_ += entry.getSizeInBytes();      // add the sizeInBytes_ here ...   }   catch (exception& ex) {      throw XmlBlasterException(INTERNAL_UNKNOWN, ME + "::put", ex.what());   }         catch (...) {      throw XmlBlasterException(INTERNAL_UNKNOWN, ME + "::put", "the original type of this exception is unknown");   }}const vector<EntryType> RamQueuePlugin::peekWithSamePriority(long maxNumOfEntries, long maxNumOfBytes) const{   Lock lock(accessMutex_);   vector<EntryType> ret;   if (storage_.empty()) return ret;   StorageType::const_iterator iter = storage_.begin();   long numOfEntries = 0;   long numOfBytes = 0;   int referencePriority = (**iter).getPriority();   while (iter != storage_.end()) {      numOfBytes += (**iter).getSizeInBytes();      numOfEntries++;      if (numOfBytes > maxNumOfBytes && maxNumOfBytes > -1) break;      if (numOfEntries > maxNumOfEntries && maxNumOfEntries > -1) break;      if ((**iter).getPriority() != referencePriority ) break;      EntryType entry = (*iter);      ret.insert(ret.end(), entry);       iter++;   }   return ret;}long RamQueuePlugin::randomRemove(const vector<EntryType>::const_iterator &start, const vector<EntryType>::const_iterator &end) {   Lock lock(accessMutex_);   if (start == end || storage_.empty()) return 0;   vector<EntryType>::const_iterator iter = start;   long count = 0;   while (iter != end) {      long entrySize = (*iter)->getSizeInBytes();      if (storage_.empty()) return 0;      string::size_type help = storage_.erase(*iter);      if (help > 0) {         count += help;         numOfBytes_ -= help * entrySize;      }      iter++;   }   return count;}long RamQueuePlugin::getNumOfEntries() const{   return storage_.size();}long RamQueuePlugin::getMaxNumOfEntries() const{   return property_.getMaxEntries();}int64_t RamQueuePlugin::getNumOfBytes() const{   return numOfBytes_;}int64_t RamQueuePlugin::getMaxNumOfBytes() const{   return property_.getMaxBytes();}void RamQueuePlugin::clear(){   Lock lock(accessMutex_);   storage_.erase(storage_.begin(), storage_.end());   numOfBytes_ = 0;}bool RamQueuePlugin::empty() const{   return storage_.empty();}}}}} // namespace

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -