⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connectionshandler.cpp

📁 java开源的企业总线.xmlBlaster
💻 CPP
📖 第 1 页 / 共 3 页
字号:
/*------------------------------------------------------------------------------Name:      ConnectionsHandler.cppProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment:   Handles the I_XmlBlasterConnections ------------------------------------------------------------------------------*/#include <util/dispatch/ConnectionsHandler.h>#include <util/Global.h>#include <util/Timeout.h>#include <util/Timestamp.h>#include <util/Constants.h>#include <util/lexical_cast.h>#include <util/queue/QueueFactory.h>#include <util/queue/PublishQueueEntry.h>#include <util/queue/ConnectQueueEntry.h>#include <util/queue/SubscribeQueueEntry.h>namespace org { namespace xmlBlaster { namespace util { namespace dispatch {using namespace std;using namespace org::xmlBlaster::client::protocol;using namespace org::xmlBlaster::client;using namespace org::xmlBlaster::util;using namespace org::xmlBlaster::util::qos;using namespace org::xmlBlaster::util::thread;using namespace org::xmlBlaster::util::qos::storage;using namespace org::xmlBlaster::util::queue;using namespace org::xmlBlaster::client::qos;using namespace org::xmlBlaster::client::key;ConnectionsHandler::ConnectionsHandler(org::xmlBlaster::util::Global& global,                                       const string& instanceName)   : ME(string("ConnectionsHandler-") + instanceName),      connectQos_((ConnectQos*)0),     connectReturnQos_((ConnectReturnQos*)0),     status_(START),      global_(global),      log_(global.getLog("org.xmlBlaster.util.dispatch")),     connectMutex_(),     publishMutex_(),     postSendListener_(0),     instanceName_(instanceName){   ClientQueueProperty prop(global_, "");   connectionProblemsListener_ = NULL;   connection_         = NULL;   queue_              = NULL;   retries_            = -1;   currentRetry_       = 0;   pingPollTimerKey_   = 0;   doStopPing_         = false;   if (log_.call()) log_.call(ME, "constructor");}ConnectionsHandler::~ConnectionsHandler(){   if (log_.call()) log_.call(ME, "destructor");   if (pingPollTimerKey_ != 0) {      global_.getPingTimer().removeTimeoutListener(pingPollTimerKey_);      pingPollTimerKey_ = 0;   }   doStopPing_ = true;   /*   while (pingIsStarted_) {      Thread::sleep(200);   }   */   Lock lock(connectMutex_);   string type = (connectQos_.isNull()) ? org::xmlBlaster::util::Global::getDefaultProtocol() : connectQos_->getAddress()->getType(); // "SOCKET"   string version = "1.0"; // currently hardcoded   if (connection_) {      global_.getDispatchManager().releasePlugin(instanceName_, type, version);      connection_ = NULL;   }   if ( queue_ ) {      delete queue_;      queue_ = NULL;   }   if (log_.trace()) log_.trace(ME, "destructor: going to delete the connectQos");   status_ = END;   if (log_.trace()) log_.trace(ME, "destructor ended");} ConnectReturnQosRef ConnectionsHandler::connect(const ConnectQosRef& qos){   if (log_.call()) log_.call(ME, string("::connect status is '") + lexical_cast<std::string>(status_) + "'");   if (qos.isNull()) {      throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "::connect", "your connectQos is null");   }   if (log_.dump()) log_.dump(ME, string("::connect, the qos is: ") + qos->toXml());   Lock lock(connectMutex_);   if (isConnected()) {      log_.warn(ME, "connect: you are already connected");      return connectReturnQos_;   }   connectQos_ = qos;   global_.setSessionName(connectQos_->getSessionQos().getSessionName());   global_.setImmutableId(connectQos_->getSessionQos().getRelativeName());   global_.setId(connectQos_->getSessionQos().getAbsoluteName()); // temporary   //log_.info(ME, "BEFORE id=" + global_.getId() + " immutable=" + global_.getImmutableId() + " sessionName=" + global_.getSessionName()->getAbsoluteName());   retries_ = connectQos_->getAddress()->getRetries();   long pingInterval = connectQos_->getAddress()->getPingInterval();   if (log_.trace()) {      log_.trace(ME, string("connect: number of retries during communication failure: ") + lexical_cast<std::string>(retries_));      log_.trace(ME, string("connect: Ping Interval: ") + lexical_cast<std::string>(pingInterval));   }   string type = connectQos_->getAddress()->getType();   string version = "1.0"; // currently hardcoded   if (!connection_) {      connection_ = &(global_.getDispatchManager().getPlugin(instanceName_, type, version));   }   try {      connectReturnQos_ = connection_->connect(*connectQos_);      global_.setSessionName(connectReturnQos_->getSessionQos().getSessionName());      // For "joe/1" it remains immutable; For "joe" there is added the server side generated sessionId "joe/-33":      global_.setImmutableId(connectReturnQos_->getSessionQos().getRelativeName());      global_.setId(connectReturnQos_->getSessionQos().getAbsoluteName());                //log_.info(ME, "AFTER id=" + global_.getId() + " immutable=" + global_.getImmutableId() + " sessionName=" + global_.getSessionName()->getAbsoluteName());   }   catch (XmlBlasterException &ex) {      if ((ex.isCommunication() || ex.getErrorCodeStr().find("user.configuration") == 0)) {         log_.warn(ME, "Got exception when connecting, polling now: " + ex.toString());         if (pingPollTimerKey_ == 0)            startPinger(false);         return queueConnect();      }      else {         if (log_.trace()) log_.trace(ME, string("the exception in connect is ") + ex.toXml());         throw ex;      }   }                                                                                                                                                                                                                                                                                          log_.info(ME, string("successfully connected with sessionId = '") + connectReturnQos_->getSessionQos().getSecretSessionId() + "'");   connectQos_->getSessionQos().setSecretSessionId(connectReturnQos_->getSessionQos().getSecretSessionId());   enum States oldState = status_;   status_ = ALIVE;   if (connectionProblemsListener_) connectionProblemsListener_->reachedAlive(oldState, this);   // start the ping if in failsafe, i.e. if delay > 0   startPinger(false);   if (log_.dump()) log_.dump(ME, string("::connect, the return qos is: ") + connectReturnQos_->toXml());   flushQueue();   return connectReturnQos_;}bool ConnectionsHandler::disconnect(const DisconnectQos& qos){   Lock lock(connectMutex_);   if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::DISCONNECT);   if (log_.dump()) log_.dump(ME, string("::disconnect, the qos is: ") + qos.toXml());   if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::DISCONNECT);   if (status_ == DEAD) {      log_.warn(ME, "already disconnected");      return false;   }   if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::DISCONNECT);   if (qos.getClearClientQueue() && queue_ != 0) queue_->clear();   bool ret = connection_->disconnect(qos);   enum States oldState = status_;   status_ = DEAD;   if (connectionProblemsListener_) connectionProblemsListener_->reachedDead(oldState, this);   return ret;}string ConnectionsHandler::getProtocol(){   return connection_->getProtocol();}/*string ConnectionsHandler::loginRaw(){   return connection_->loginRaw();}*/bool ConnectionsHandler::shutdown(){   if (connection_) {      return connection_->shutdown();   }   return false;}string ConnectionsHandler::getLoginName() {   return connection_->getLoginName();}bool ConnectionsHandler::isLoggedIn(){   return connection_->isLoggedIn();}string ConnectionsHandler::ping(const string& qos){//   Lock lock(connectionMutex_);   return connection_->ping(qos);}SubscribeReturnQos ConnectionsHandler::subscribe(const SubscribeKey& key, const SubscribeQos& qos){   if (log_.call()) log_.call(ME, MethodName::SUBSCRIBE);   if (log_.dump()) log_.dump(ME, string("::subscribe, the key is: ") + key.toXml());   if (log_.dump()) log_.dump(ME, string("::subscribe, the qos is: ") + qos.toXml());//   Lock lock(connectionMutex_);   if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, MethodName::SUBSCRIBE);   if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, MethodName::SUBSCRIBE);   if (putToQueue()) return queueSubscribe(key, qos);   try {      SubscribeReturnQos ret = connection_->subscribe(key, qos);      return ret;   }      catch (XmlBlasterException& ex) {      toPollingOrDead(&ex);      if (putToQueue() && isRecoverable(&ex)) {         log_.info(ME, string("::subscribe ") + key.getOid() + " is queued, exception=" + ex.getMessage());         return queueSubscribe(key, qos);      }      else {         log_.warn(ME, string("::subscribe failed throwing now exception: ") + key.toXml() + qos.toXml() + " exception=" + ex.getMessage());         throw ex;      }   }}vector<MessageUnit> ConnectionsHandler::get(const GetKey& key, const GetQos& qos){   if (log_.call()) log_.call(ME, "get");   if (log_.dump()) log_.dump(ME, string("::get, the key is: ") + key.toXml());   if (log_.dump()) log_.dump(ME, string("::get, the qos is: ") + qos.toXml());   if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "get");   if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "get");   if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, "get");   try {      return connection_->get(key, qos);   }      catch (XmlBlasterException& ex) {      toPollingOrDead(&ex);      throw ex;   }}vector<UnSubscribeReturnQos>    ConnectionsHandler::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos){   if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);   if (log_.dump()) log_.dump(ME, string("::unSubscribe, the key is: ") + key.toXml());   if (log_.dump()) log_.dump(ME, string("::unSubscribe, the qos is: ") + qos.toXml());   if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);   if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);   if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);   try {      vector<UnSubscribeReturnQos> ret = connection_->unSubscribe(key, qos);      return ret;   }      catch (XmlBlasterException& ex) {      toPollingOrDead(&ex);      throw ex;   }}bool ConnectionsHandler::putToQueue() {   if (status_ == POLLING) return true;   if (queue_ && queue_->getNumOfEntries() > 0) {      return true; // guarantee sequence

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -