📄 connectionshandler.cpp
字号:
/*------------------------------------------------------------------------------Name: ConnectionsHandler.cppProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment: Handles the I_XmlBlasterConnections ------------------------------------------------------------------------------*/#include <util/dispatch/ConnectionsHandler.h>#include <util/Global.h>#include <util/Timeout.h>#include <util/Timestamp.h>#include <util/Constants.h>#include <util/lexical_cast.h>#include <util/queue/QueueFactory.h>#include <util/queue/PublishQueueEntry.h>#include <util/queue/ConnectQueueEntry.h>#include <util/queue/SubscribeQueueEntry.h>namespace org { namespace xmlBlaster { namespace util { namespace dispatch {using namespace std;using namespace org::xmlBlaster::client::protocol;using namespace org::xmlBlaster::client;using namespace org::xmlBlaster::util;using namespace org::xmlBlaster::util::qos;using namespace org::xmlBlaster::util::thread;using namespace org::xmlBlaster::util::qos::storage;using namespace org::xmlBlaster::util::queue;using namespace org::xmlBlaster::client::qos;using namespace org::xmlBlaster::client::key;ConnectionsHandler::ConnectionsHandler(org::xmlBlaster::util::Global& global, const string& instanceName) : ME(string("ConnectionsHandler-") + instanceName), connectQos_((ConnectQos*)0), connectReturnQos_((ConnectReturnQos*)0), status_(START), global_(global), log_(global.getLog("org.xmlBlaster.util.dispatch")), connectMutex_(), publishMutex_(), postSendListener_(0), instanceName_(instanceName){ ClientQueueProperty prop(global_, ""); connectionProblemsListener_ = NULL; connection_ = NULL; queue_ = NULL; retries_ = -1; currentRetry_ = 0; pingPollTimerKey_ = 0; doStopPing_ = false; if (log_.call()) log_.call(ME, "constructor");}ConnectionsHandler::~ConnectionsHandler(){ if (log_.call()) log_.call(ME, "destructor"); if (pingPollTimerKey_ != 0) { global_.getPingTimer().removeTimeoutListener(pingPollTimerKey_); pingPollTimerKey_ = 0; } doStopPing_ = true; /* while (pingIsStarted_) { Thread::sleep(200); } */ Lock lock(connectMutex_); string type = (connectQos_.isNull()) ? org::xmlBlaster::util::Global::getDefaultProtocol() : connectQos_->getAddress()->getType(); // "SOCKET" string version = "1.0"; // currently hardcoded if (connection_) { global_.getDispatchManager().releasePlugin(instanceName_, type, version); connection_ = NULL; } if ( queue_ ) { delete queue_; queue_ = NULL; } if (log_.trace()) log_.trace(ME, "destructor: going to delete the connectQos"); status_ = END; if (log_.trace()) log_.trace(ME, "destructor ended");} ConnectReturnQosRef ConnectionsHandler::connect(const ConnectQosRef& qos){ if (log_.call()) log_.call(ME, string("::connect status is '") + lexical_cast<std::string>(status_) + "'"); if (qos.isNull()) { throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "::connect", "your connectQos is null"); } if (log_.dump()) log_.dump(ME, string("::connect, the qos is: ") + qos->toXml()); Lock lock(connectMutex_); if (isConnected()) { log_.warn(ME, "connect: you are already connected"); return connectReturnQos_; } connectQos_ = qos; global_.setSessionName(connectQos_->getSessionQos().getSessionName()); global_.setImmutableId(connectQos_->getSessionQos().getRelativeName()); global_.setId(connectQos_->getSessionQos().getAbsoluteName()); // temporary //log_.info(ME, "BEFORE id=" + global_.getId() + " immutable=" + global_.getImmutableId() + " sessionName=" + global_.getSessionName()->getAbsoluteName()); retries_ = connectQos_->getAddress()->getRetries(); long pingInterval = connectQos_->getAddress()->getPingInterval(); if (log_.trace()) { log_.trace(ME, string("connect: number of retries during communication failure: ") + lexical_cast<std::string>(retries_)); log_.trace(ME, string("connect: Ping Interval: ") + lexical_cast<std::string>(pingInterval)); } string type = connectQos_->getAddress()->getType(); string version = "1.0"; // currently hardcoded if (!connection_) { connection_ = &(global_.getDispatchManager().getPlugin(instanceName_, type, version)); } try { connectReturnQos_ = connection_->connect(*connectQos_); global_.setSessionName(connectReturnQos_->getSessionQos().getSessionName()); // For "joe/1" it remains immutable; For "joe" there is added the server side generated sessionId "joe/-33": global_.setImmutableId(connectReturnQos_->getSessionQos().getRelativeName()); global_.setId(connectReturnQos_->getSessionQos().getAbsoluteName()); //log_.info(ME, "AFTER id=" + global_.getId() + " immutable=" + global_.getImmutableId() + " sessionName=" + global_.getSessionName()->getAbsoluteName()); } catch (XmlBlasterException &ex) { if ((ex.isCommunication() || ex.getErrorCodeStr().find("user.configuration") == 0)) { log_.warn(ME, "Got exception when connecting, polling now: " + ex.toString()); if (pingPollTimerKey_ == 0) startPinger(false); return queueConnect(); } else { if (log_.trace()) log_.trace(ME, string("the exception in connect is ") + ex.toXml()); throw ex; } } log_.info(ME, string("successfully connected with sessionId = '") + connectReturnQos_->getSessionQos().getSecretSessionId() + "'"); connectQos_->getSessionQos().setSecretSessionId(connectReturnQos_->getSessionQos().getSecretSessionId()); enum States oldState = status_; status_ = ALIVE; if (connectionProblemsListener_) connectionProblemsListener_->reachedAlive(oldState, this); // start the ping if in failsafe, i.e. if delay > 0 startPinger(false); if (log_.dump()) log_.dump(ME, string("::connect, the return qos is: ") + connectReturnQos_->toXml()); flushQueue(); return connectReturnQos_;}bool ConnectionsHandler::disconnect(const DisconnectQos& qos){ Lock lock(connectMutex_); if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::DISCONNECT); if (log_.dump()) log_.dump(ME, string("::disconnect, the qos is: ") + qos.toXml()); if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::DISCONNECT); if (status_ == DEAD) { log_.warn(ME, "already disconnected"); return false; } if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::DISCONNECT); if (qos.getClearClientQueue() && queue_ != 0) queue_->clear(); bool ret = connection_->disconnect(qos); enum States oldState = status_; status_ = DEAD; if (connectionProblemsListener_) connectionProblemsListener_->reachedDead(oldState, this); return ret;}string ConnectionsHandler::getProtocol(){ return connection_->getProtocol();}/*string ConnectionsHandler::loginRaw(){ return connection_->loginRaw();}*/bool ConnectionsHandler::shutdown(){ if (connection_) { return connection_->shutdown(); } return false;}string ConnectionsHandler::getLoginName() { return connection_->getLoginName();}bool ConnectionsHandler::isLoggedIn(){ return connection_->isLoggedIn();}string ConnectionsHandler::ping(const string& qos){// Lock lock(connectionMutex_); return connection_->ping(qos);}SubscribeReturnQos ConnectionsHandler::subscribe(const SubscribeKey& key, const SubscribeQos& qos){ if (log_.call()) log_.call(ME, MethodName::SUBSCRIBE); if (log_.dump()) log_.dump(ME, string("::subscribe, the key is: ") + key.toXml()); if (log_.dump()) log_.dump(ME, string("::subscribe, the qos is: ") + qos.toXml());// Lock lock(connectionMutex_); if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, MethodName::SUBSCRIBE); if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, MethodName::SUBSCRIBE); if (putToQueue()) return queueSubscribe(key, qos); try { SubscribeReturnQos ret = connection_->subscribe(key, qos); return ret; } catch (XmlBlasterException& ex) { toPollingOrDead(&ex); if (putToQueue() && isRecoverable(&ex)) { log_.info(ME, string("::subscribe ") + key.getOid() + " is queued, exception=" + ex.getMessage()); return queueSubscribe(key, qos); } else { log_.warn(ME, string("::subscribe failed throwing now exception: ") + key.toXml() + qos.toXml() + " exception=" + ex.getMessage()); throw ex; } }}vector<MessageUnit> ConnectionsHandler::get(const GetKey& key, const GetQos& qos){ if (log_.call()) log_.call(ME, "get"); if (log_.dump()) log_.dump(ME, string("::get, the key is: ") + key.toXml()); if (log_.dump()) log_.dump(ME, string("::get, the qos is: ") + qos.toXml()); if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "get"); if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "get"); if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, "get"); try { return connection_->get(key, qos); } catch (XmlBlasterException& ex) { toPollingOrDead(&ex); throw ex; }}vector<UnSubscribeReturnQos> ConnectionsHandler::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos){ if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE); if (log_.dump()) log_.dump(ME, string("::unSubscribe, the key is: ") + key.toXml()); if (log_.dump()) log_.dump(ME, string("::unSubscribe, the qos is: ") + qos.toXml()); if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE); if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE); if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE); try { vector<UnSubscribeReturnQos> ret = connection_->unSubscribe(key, qos); return ret; } catch (XmlBlasterException& ex) { toPollingOrDead(&ex); throw ex; }}bool ConnectionsHandler::putToQueue() { if (status_ == POLLING) return true; if (queue_ && queue_->getNumOfEntries() > 0) { return true; // guarantee sequence
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -