📄 connectionshandler.cpp
字号:
} return false;}PublishReturnQos ConnectionsHandler::publish(const MessageUnit& msgUnit){ if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::PUBLISH); if (log_.dump()) log_.dump(ME, string("::publish, the msgUnit is: ") + msgUnit.toXml()); Lock lock(publishMutex_); if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::PUBLISH); if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::PUBLISH); if (putToQueue()) return queuePublish(msgUnit); try { // fill in the sender absolute name if (!connectReturnQos_.isNull()) { msgUnit.getQos().setSender(connectReturnQos_->getSessionQos().getSessionName()); } return connection_->publish(msgUnit); } catch (XmlBlasterException& ex) { toPollingOrDead(&ex); if (putToQueue() && isRecoverable(&ex)) { log_.info(ME, string("::publish ") + msgUnit.getKey().getOid() + " is queued, exception=" + ex.getMessage()); return queuePublish(msgUnit); } else { log_.warn(ME, string("::publish failed throwing now exception, the msgUnit is: ") + msgUnit.toXml() + " exception=" + ex.getMessage()); throw ex; } }}void ConnectionsHandler::publishOneway(const vector<MessageUnit> &msgUnitArr){ if (log_.call()) log_.call(ME, "publishOneway"); Lock lock(publishMutex_); // fill in the sender absolute name if (!connectReturnQos_.isNull()) { for (vector<MessageUnit>::size_type i=0;i<msgUnitArr.size();i++) { msgUnitArr[i].getQos().setSender(connectReturnQos_->getSessionQos().getSessionName()); } } if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "publishOneway"); if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "publishOneway"); if (putToQueue()) { for (size_t i=0; i < msgUnitArr.size(); i++) queuePublish(msgUnitArr[i]); } try { connection_->publishOneway(msgUnitArr); } catch (XmlBlasterException& ex) { toPollingOrDead(&ex); if (putToQueue() && isRecoverable(&ex)) { for (size_t i=0; i < msgUnitArr.size(); i++) queuePublish(msgUnitArr[i]); } else throw ex; }}vector<PublishReturnQos> ConnectionsHandler::publishArr(const vector<MessageUnit> &msgUnitArr){ if (log_.call()) log_.call(ME, "publishArr"); Lock lock(publishMutex_); // fill in the sender absolute name if (!connectReturnQos_.isNull()) { for (vector<MessageUnit>::size_type i=0;i<msgUnitArr.size();i++) { msgUnitArr[i].getQos().setSender(connectReturnQos_->getSessionQos().getSessionName()); } } if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "publishArr"); if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "publishArr"); if (putToQueue()) { vector<PublishReturnQos> retQos; for (size_t i=0; i < msgUnitArr.size(); i++) { retQos.insert(retQos.end(), queuePublish(msgUnitArr[i])); } return retQos; } try { return connection_->publishArr(msgUnitArr); } catch (XmlBlasterException& ex) { toPollingOrDead(&ex); if (putToQueue() && isRecoverable(&ex)) { vector<PublishReturnQos> retQos; for (size_t i=0; i < msgUnitArr.size(); i++) { retQos.insert(retQos.end(), queuePublish(msgUnitArr[i])); } return retQos; } else throw ex; }}vector<EraseReturnQos> ConnectionsHandler::erase(const EraseKey& key, const EraseQos& qos){ if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::ERASE); if (log_.dump()) log_.dump(ME, string("::erase, the key is: ") + key.toXml()); if (log_.dump()) log_.dump(ME, string("::erase, the qos is: ") + qos.toXml()); if (status_ == START) throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::ERASE); if (status_ == DEAD) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::ERASE); if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::ERASE); try { return connection_->erase(key, qos); } catch (XmlBlasterException& ex) { toPollingOrDead(&ex); throw ex; }}void ConnectionsHandler::initFailsafe(I_ConnectionProblems* connectionProblems){// Lock lock(connectionMutex_); if (log_.trace()) log_.trace(ME, "Register initFailsafe " + lexical_cast<string>(connectionProblems!=0)); connectionProblemsListener_ = connectionProblems;}// If recoverable we queue a msgUnit, else we throw an exceptionbool ConnectionsHandler::isRecoverable(const org::xmlBlaster::util::XmlBlasterException* reason){ // TODO: Authorization could also be recoverable (by a server admin) // Such decision must be left to the user (we need a callback to the user here) // As a default all communication problems are assumed to be recoverable if (reason == 0) return true; bool ret = reason->isCommunication(); if (log_.call()) log_.call(ME, "isRecoverable " + lexical_cast<string>(ret)); return ret;}void ConnectionsHandler::toPollingOrDead(const org::xmlBlaster::util::XmlBlasterException* reason){ if (reason == 0) return; if (!reason->isCommunication()) return; if (log_.call()) log_.call(ME, "toPollingOrDead"); enum States oldState = status_; if (!isFailsafe()) { log_.info(ME, "going into DEAD status since not in failsafe mode. " "For failsafe mode set 'delay' to a positive long value, for example on the cmd line: -delay 10000" + ((reason != 0) ? (": " + reason->getMessage()) : "")); status_ = DEAD; connection_->shutdown(); if (connectionProblemsListener_) connectionProblemsListener_->reachedDead(oldState, this); return; } log_.info(ME, "going into POLLING status:" + ((reason != 0) ? (": " + reason->getMessage()) : "")); status_ = POLLING; currentRetry_ = 0; /* try { DisconnectQos discQos(global_); connection_->disconnect(discQos); } catch (...) { log_.warn(ME, "exception when trying to disconnect"); } */ connection_->shutdown(); if (connectionProblemsListener_) connectionProblemsListener_->reachedPolling(oldState, this); startPinger(true);}void ConnectionsHandler::timeout(void * /*userData*/){ Lock lock(connectMutex_); pingPollTimerKey_ = 0; if (doStopPing_) return; // then it must stop if ( log_.call() ) log_.call(ME, string("ping timeout occured with status '") + getStatusString() + "'" ); if (status_ == ALIVE) { // then I am pinging if ( log_.trace() ) log_.trace(ME, "ping timeout: status is 'ALIVE'"); try { if (connection_) { connection_->ping("<qos/>"); if ( log_.trace() ) log_.trace(ME, "lowlevel ping returned: status is 'ALIVE'"); startPinger(false); } } catch (XmlBlasterException& ex) { if ( log_.trace() ) log_.trace(ME, "lowlevel ping failed: " + ex.toString()); toPollingOrDead(&ex); } return; } if (status_ == POLLING) { if ( log_.trace() ) log_.trace(ME, "ping timeout: status is 'POLLING'"); try { if (connection_ && !connectQos_.isNull()) { if ( log_.trace() ) log_.trace(ME, "ping timeout: going to retry a connection"); string lastSessionId = connectQos_->getSessionQos().getSecretSessionId(); connectReturnQos_ = connection_->connect(*connectQos_); if (log_.trace()) log_.trace(ME, string("Successfully reconnected, ConnectRetQos: ") + connectReturnQos_->toXml()); string sessionId = connectReturnQos_->getSessionQos().getSecretSessionId(); log_.info(ME, string("Successfully reconnected as '") + connectReturnQos_->getSessionQos().getAbsoluteName() + "' after " + lexical_cast<string>(currentRetry_) + " attempts"); connectQos_->getSessionQos().setSecretSessionId(sessionId); if ( log_.trace() ) { log_.trace(ME, string("ping timeout: re-connection, the new connect returnQos: ") + connectReturnQos_->toXml()); } bool doFlush = true; enum States oldState = status_; status_ = ALIVE; if ( connectionProblemsListener_ ) doFlush = connectionProblemsListener_->reachedAlive(oldState, this); Lock lockPub(publishMutex_); // lock here to avoid publishing while flushing queue (to ensure sequence) if (sessionId != lastSessionId) { log_.trace(ME, string("When reconnecting the sessionId changed from '") + lastSessionId + "' to '" + sessionId + "'"); } if (doFlush) { try { flushQueueUnlocked(queue_, true); } catch (const XmlBlasterException &ex) { log_.warn(ME, "An exception occured when trying to asynchronously flush the contents of the queue. Probably not all messages have been sent. These unsent messages are still in the queue:" + ex.getMessage()); } catch (...) { log_.warn(ME, "An exception occured when trying to asynchronously flush the contents of the queue. Probably not all messages have been sent. These unsent messages are still in the queue"); } } startPinger(false); } } catch (XmlBlasterException ex) { if (log_.trace()) log_.trace(ME, "timeout got exception: " + ex.getMessage()); currentRetry_++; if ( currentRetry_ < retries_ || retries_ < 0) { // continue to poll startPinger(false); } else { enum States oldState = status_; status_ = DEAD; if ( connectionProblemsListener_ ) { connectionProblemsListener_->reachedDead(oldState, this); // stopping } } } return; } // if it comes here it will stop }SubscribeReturnQos ConnectionsHandler::queueSubscribe(const SubscribeKey& key, const SubscribeQos& qos){ if (!queue_) { if (connectQos_.isNull()) { throw XmlBlasterException(INTERNAL_SUBSCRIBE, ME + "::queueSubscribe", "need to create a queue but the connectQos is NULL (probably never connected)"); } if (log_.trace()) log_.trace(ME+":queueSubscribe", "creating a client queue ..."); queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty()); if (log_.trace()) log_.trace(ME+":queueSubscribe", "created a client queue"); } SubscribeReturnQos retQos(global_); SubscribeQos& q = const_cast<SubscribeQos&>(qos); SessionNameRef sessionName = global_.getSessionName(); std::string subscriptionId = q.generateSubscriptionId(sessionName, key); retQos.getData().setSubscriptionId(subscriptionId);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -