⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connectionshandler.cpp

📁 java开源的企业总线.xmlBlaster
💻 CPP
📖 第 1 页 / 共 3 页
字号:
   }   return false;}PublishReturnQos ConnectionsHandler::publish(const MessageUnit& msgUnit){   if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::PUBLISH);   if (log_.dump()) log_.dump(ME, string("::publish, the msgUnit is: ") + msgUnit.toXml());   Lock lock(publishMutex_);   if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::PUBLISH);   if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::PUBLISH);   if (putToQueue()) return queuePublish(msgUnit);   try {      // fill in the sender absolute name      if (!connectReturnQos_.isNull()) {         msgUnit.getQos().setSender(connectReturnQos_->getSessionQos().getSessionName());      }      return connection_->publish(msgUnit);   }      catch (XmlBlasterException& ex) {      toPollingOrDead(&ex);      if (putToQueue() && isRecoverable(&ex)) {         log_.info(ME, string("::publish ") + msgUnit.getKey().getOid() + " is queued, exception=" + ex.getMessage());         return queuePublish(msgUnit);      }      else {         log_.warn(ME, string("::publish failed throwing now exception, the msgUnit is: ") + msgUnit.toXml() + " exception=" + ex.getMessage());         throw ex;      }   }}void ConnectionsHandler::publishOneway(const vector<MessageUnit> &msgUnitArr){   if (log_.call()) log_.call(ME, "publishOneway");   Lock lock(publishMutex_);   // fill in the sender absolute name   if (!connectReturnQos_.isNull()) {      for (vector<MessageUnit>::size_type i=0;i<msgUnitArr.size();i++) {         msgUnitArr[i].getQos().setSender(connectReturnQos_->getSessionQos().getSessionName());      }   }   if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "publishOneway");   if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "publishOneway");   if (putToQueue()) {      for (size_t i=0; i < msgUnitArr.size(); i++) queuePublish(msgUnitArr[i]);   }   try {      connection_->publishOneway(msgUnitArr);   }      catch (XmlBlasterException& ex) {      toPollingOrDead(&ex);      if (putToQueue() && isRecoverable(&ex)) {         for (size_t i=0; i < msgUnitArr.size(); i++) queuePublish(msgUnitArr[i]);      }      else         throw ex;   }}vector<PublishReturnQos> ConnectionsHandler::publishArr(const vector<MessageUnit> &msgUnitArr){   if (log_.call()) log_.call(ME, "publishArr");   Lock lock(publishMutex_);   // fill in the sender absolute name   if (!connectReturnQos_.isNull()) {      for (vector<MessageUnit>::size_type i=0;i<msgUnitArr.size();i++) {         msgUnitArr[i].getQos().setSender(connectReturnQos_->getSessionQos().getSessionName());      }   }   if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "publishArr");   if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "publishArr");   if (putToQueue()) {      vector<PublishReturnQos> retQos;      for (size_t i=0; i < msgUnitArr.size(); i++) {         retQos.insert(retQos.end(), queuePublish(msgUnitArr[i]));      }      return retQos;   }   try {      return connection_->publishArr(msgUnitArr);   }      catch (XmlBlasterException& ex) {      toPollingOrDead(&ex);      if (putToQueue() && isRecoverable(&ex)) {         vector<PublishReturnQos> retQos;         for (size_t i=0; i < msgUnitArr.size(); i++) {            retQos.insert(retQos.end(), queuePublish(msgUnitArr[i]));         }         return retQos;      }      else throw ex;   }}vector<EraseReturnQos> ConnectionsHandler::erase(const EraseKey& key, const EraseQos& qos){   if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::ERASE);   if (log_.dump()) log_.dump(ME, string("::erase, the key is: ") + key.toXml());   if (log_.dump()) log_.dump(ME, string("::erase, the qos is: ") + qos.toXml());   if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::ERASE);   if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::ERASE);   if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::ERASE);   try {      return connection_->erase(key, qos);   }      catch (XmlBlasterException& ex) {      toPollingOrDead(&ex);      throw ex;   }}void ConnectionsHandler::initFailsafe(I_ConnectionProblems* connectionProblems){//   Lock lock(connectionMutex_);   if (log_.trace()) log_.trace(ME, "Register initFailsafe " + lexical_cast<string>(connectionProblems!=0));   connectionProblemsListener_ = connectionProblems;}// If recoverable we queue a msgUnit, else we throw an exceptionbool ConnectionsHandler::isRecoverable(const org::xmlBlaster::util::XmlBlasterException* reason){	// TODO: Authorization could also be recoverable (by a server admin)	//       Such decision must be left to the user (we need a callback to the user here)	// As a default all communication problems are assumed to be recoverable	if (reason == 0)		return true;	bool ret = reason->isCommunication();    if (log_.call()) log_.call(ME, "isRecoverable " + lexical_cast<string>(ret));	return ret;}void ConnectionsHandler::toPollingOrDead(const org::xmlBlaster::util::XmlBlasterException* reason){   if (reason == 0)	   return;   if (!reason->isCommunication())	  return;	      if (log_.call()) log_.call(ME, "toPollingOrDead");      enum States oldState = status_;   if (!isFailsafe()) {      log_.info(ME, "going into DEAD status since not in failsafe mode. "                    "For failsafe mode set 'delay' to a positive long value, for example on the cmd line: -delay 10000" +                    ((reason != 0) ? (": " + reason->getMessage()) : ""));      status_ = DEAD;      connection_->shutdown();      if (connectionProblemsListener_) connectionProblemsListener_->reachedDead(oldState, this);      return;   }   log_.info(ME, "going into POLLING status:" + ((reason != 0) ? (": " + reason->getMessage()) : ""));   status_ = POLLING;   currentRetry_ = 0;   /*   try {      DisconnectQos discQos(global_);      connection_->disconnect(discQos);   }   catch (...) {      log_.warn(ME, "exception when trying to disconnect");   }   */   connection_->shutdown();   if (connectionProblemsListener_) connectionProblemsListener_->reachedPolling(oldState, this);   startPinger(true);}void ConnectionsHandler::timeout(void * /*userData*/){                                                      Lock lock(connectMutex_);   pingPollTimerKey_ = 0;   if (doStopPing_) return; // then it must stop   if ( log_.call() ) log_.call(ME, string("ping timeout occured with status '") + getStatusString() + "'" );   if (status_ == ALIVE) { // then I am pinging      if ( log_.trace() ) log_.trace(ME, "ping timeout: status is 'ALIVE'");      try {         if (connection_) {            connection_->ping("<qos/>");            if ( log_.trace() ) log_.trace(ME, "lowlevel ping returned: status is 'ALIVE'");            startPinger(false);         }      }      catch (XmlBlasterException& ex) {         if ( log_.trace() ) log_.trace(ME, "lowlevel ping failed: " + ex.toString());         toPollingOrDead(&ex);      }      return;   }    if (status_ == POLLING) {      if ( log_.trace() ) log_.trace(ME, "ping timeout: status is 'POLLING'");      try {         if (connection_ && !connectQos_.isNull()) {            if ( log_.trace() ) log_.trace(ME, "ping timeout: going to retry a connection");             string lastSessionId = connectQos_->getSessionQos().getSecretSessionId();            connectReturnQos_ = connection_->connect(*connectQos_);            if (log_.trace()) log_.trace(ME, string("Successfully reconnected, ConnectRetQos: ") + connectReturnQos_->toXml());            string sessionId = connectReturnQos_->getSessionQos().getSecretSessionId();            log_.info(ME, string("Successfully reconnected as '") + connectReturnQos_->getSessionQos().getAbsoluteName() +                          "' after " + lexical_cast<string>(currentRetry_) + " attempts");            connectQos_->getSessionQos().setSecretSessionId(sessionId);             if ( log_.trace() ) {               log_.trace(ME, string("ping timeout: re-connection, the new connect returnQos: ") + connectReturnQos_->toXml());            }             bool doFlush = true;            enum States oldState = status_;            status_ = ALIVE;            if ( connectionProblemsListener_ ) doFlush = connectionProblemsListener_->reachedAlive(oldState, this);             Lock lockPub(publishMutex_); // lock here to avoid publishing while flushing queue (to ensure sequence)            if (sessionId != lastSessionId) {               log_.trace(ME, string("When reconnecting the sessionId changed from '") + lastSessionId + "' to '" + sessionId + "'");            }             if (doFlush) {               try {                  flushQueueUnlocked(queue_, true);               }               catch (const XmlBlasterException &ex) {                  log_.warn(ME, "An exception occured when trying to asynchronously flush the contents of the queue. Probably not all messages have been sent. These unsent messages are still in the queue:" + ex.getMessage());               }               catch (...) {                  log_.warn(ME, "An exception occured when trying to asynchronously flush the contents of the queue. Probably not all messages have been sent. These unsent messages are still in the queue");               }            }            startPinger(false);         }      }      catch (XmlBlasterException ex) {         if (log_.trace()) log_.trace(ME, "timeout got exception: " + ex.getMessage());         currentRetry_++;         if ( currentRetry_ < retries_ || retries_ < 0) { // continue to poll            startPinger(false);         }         else {            enum States oldState = status_;            status_ = DEAD;            if ( connectionProblemsListener_ ) {               connectionProblemsListener_->reachedDead(oldState, this);               // stopping            }         }      }      return;   }    // if it comes here it will stop }SubscribeReturnQos ConnectionsHandler::queueSubscribe(const SubscribeKey& key, const SubscribeQos& qos){   if (!queue_) {      if (connectQos_.isNull()) {         throw XmlBlasterException(INTERNAL_SUBSCRIBE, ME + "::queueSubscribe", "need to create a queue but the connectQos is NULL (probably never connected)");      }      if (log_.trace()) log_.trace(ME+":queueSubscribe", "creating a client queue ...");      queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());      if (log_.trace()) log_.trace(ME+":queueSubscribe", "created a client queue");   }   SubscribeReturnQos retQos(global_);   SubscribeQos& q = const_cast<SubscribeQos&>(qos);   SessionNameRef sessionName = global_.getSessionName();   std::string subscriptionId = q.generateSubscriptionId(sessionName, key);   retQos.getData().setSubscriptionId(subscriptionId);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -