📄 connectionshandler.cpp
字号:
retQos.getData().setState(Constants::STATE_OK); retQos.getData().setStateInfo(Constants::INFO_QUEUED); // "QUEUED" qos.setSubscriptionId(subscriptionId); SubscribeQueueEntry entry(global_, key, qos, qos.getData().getPriority()); queue_->put(entry); //if (log_.trace()) log_.warn(ME, string("queueSubscribe: entry '") + key.getOid() + "' has been queued with client side generated subscriptionId=" + subscriptionId); return retQos;}PublishReturnQos ConnectionsHandler::queuePublish(const MessageUnit& msgUnit){ if (log_.call()) log_.call(ME, "queuePublish"); if (!queue_) { if (connectQos_.isNull()) { throw XmlBlasterException(INTERNAL_PUBLISH, ME + "::queuePublish", "need to create a queue but the connectQos is NULL (probably never connected)"); } if (log_.trace()) log_.trace(ME+":queuePublish", "creating a client queue ..."); queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty()); if (log_.trace()) log_.trace(ME+":queuePublish", "created a client queue"); } if (log_.trace()) log_.trace(ME, string("queuePublish: entry '") + msgUnit.getKey().getOid() + "' has been queued"); PublishReturnQos retQos(global_); retQos.setKeyOid(msgUnit.getKey().getOid()); retQos.setState(Constants::STATE_OK); retQos.getData().setStateInfo(Constants::INFO_QUEUED); // "QUEUED" PublishQueueEntry entry(global_, msgUnit, msgUnit.getQos().getPriority()); queue_->put(entry); return retQos;}ConnectReturnQosRef& ConnectionsHandler::queueConnect(){ if (log_.call()) log_.call(ME, string("::queueConnect with sessionQos: '") + connectQos_->getSessionQos().getAbsoluteName() + "'"); long tmp = connectQos_->getSessionQos().getPubSessionId(); if ( tmp <= 0) { if (log_.trace()) log_.trace(ME, string("::queueConnect, the public session id is '") + lexical_cast<std::string>(tmp)); throw XmlBlasterException(USER_CONNECT, ME + "::queueConnect", "queueing connection request not possible because you did not specify a positive public sessionId"); } if (!queue_) { if (log_.trace()) log_.info(ME, "::queueConnect: created a client queue"); queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty()); } if (log_.trace()) log_.trace(ME, string("queueConnect: entry '") + connectQos_->getSessionQos().getAbsoluteName() + "' has been queued"); connectReturnQos_ = new ConnectReturnQos(*connectQos_); /* Michele thinks we should not queue the ConnectQos ConnectQueueEntry entry(global_, *connectQos_); queue_->put(entry); */ enum States oldState = status_; status_ = POLLING; if ( connectionProblemsListener_ ) { connectionProblemsListener_->reachedPolling(oldState, this); // stopping } startPinger(true); return connectReturnQos_;}I_PostSendListener* ConnectionsHandler::registerPostSendListener(I_PostSendListener *listener) { I_PostSendListener* old = postSendListener_; postSendListener_ = listener; return old;}/** * Flushes all entries in the queue, i.e. the entries of the queue are sent to xmlBlaster. * If the queue is empty or NULL, then 0 is returned. If the state is in POLLING or DEAD, or the * connection is not established yet (i.e. connection_ = NULL), then -1 is * returned.. This method blocks until all entries in the queue have been sent. */long ConnectionsHandler::flushQueue(){ if (log_.call()) log_.call(ME, "flushQueue"); // Lock lock(connectionMutex_); if (!queue_) { if (connectQos_.isNull()) { log_.error(ME+".flusgQueue", "need to create a queue but the connectQos is NULL (probably never connected)"); } if (log_.trace()) log_.trace(ME+".flushQueue", "creating the client queue ..."); queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty()); if (queue_->getNumOfEntries() < 1) { if (log_.trace()) log_.trace(ME+".flushQueue", "Created queue [" + queue_->getType() + "][" + queue_->getVersion() + "], it is empty, nothing to do."); return 0; } log_.info(ME, "Created queue [" + queue_->getType() + "][" + queue_->getVersion() + "] which contains " + lexical_cast<string>(queue_->getNumOfEntries()) + " entries."); } return flushQueueUnlocked(queue_, true);} long ConnectionsHandler::flushQueueUnlocked(I_Queue *queueToFlush, bool doRemove){ if ( log_.call() ) log_.call(ME, "flushQueueUnlocked"); if (!queueToFlush || queueToFlush->empty()) return 0; if (status_ != ALIVE || connection_ == NULL) return -1; long ret = 0; if (!queueToFlush->empty()) { log_.info(ME, "Queue [" + queue_->getType() + "][" + queue_->getVersion() + "] contains " + lexical_cast<string>(queue_->getNumOfEntries()) + " entries, we send them to the server"); } while (!queueToFlush->empty()) { long maxNumOfEntries= (doRemove) ? 1 : -1; // doRemove==false makes no sense, TODO: remove this arg if (log_.trace()) log_.trace(ME, "flushQueueUnlocked: flushing one priority sweep maxNumOfEntries=" + lexical_cast<string>(maxNumOfEntries)); const vector<EntryType> entries = queueToFlush->peekWithSamePriority(maxNumOfEntries); vector<EntryType>::const_iterator iter = entries.begin(); while (iter != entries.end()) { try { if (log_.trace()) log_.trace(ME, "sending the content to xmlBlaster: " + (*iter)->toXml()); const EntryType entry = (*iter); const MsgQueueEntry &entry2 = *entry; { MsgQueueEntry &entry3 = const_cast<MsgQueueEntry&>(entry2); entry3.setSender(connectReturnQos_->getSessionQos().getSessionName()); } entry2.send(*this); // entry2 contains the PublishReturnQos after calling send if (log_.trace()) log_.trace(ME, "content to xmlBlaster successfully sent"); I_PostSendListener *p = postSendListener_; if (p) { p->postSend(entry2); } } catch (XmlBlasterException &ex) { if (ex.isCommunication()) toPollingOrDead(&ex); log_.warn(ME, "flushQueueUnlocked: can't send queued message to server: " + ex.getMessage()); //if (doRemove) queueToFlush->randomRemove(entries.begin(), iter); throw ex; } iter++; } if (doRemove) { //log_.trace(ME, "remove send message from client queue"); ret += queueToFlush->randomRemove(entries.begin(), entries.end()); } } return ret;}I_Queue* ConnectionsHandler::getQueue(){ if (!queue_) { if (log_.trace()) log_.trace(ME+".getQueue", "creating the client queue ..."); queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty()); log_.info(ME, "Created queue [" + queue_->getType() + "][" + queue_->getVersion() + "] which contains " + lexical_cast<string>(queue_->getNumOfEntries()) + " entries."); } return queue_;}bool ConnectionsHandler::isFailsafe() const{ if (connectQos_.isNull()) return false; return connectQos_->getAddress()->getDelay() > 0;}// pinger or pollerbool ConnectionsHandler::startPinger(bool withInitialPing){ if (log_.call()) log_.call(ME, "startPinger"); if (doStopPing_) return false; if (log_.trace()) log_.trace(ME, "startPinger (no request to stop the pinger is active for the moment)"); if (pingPollTimerKey_ != 0 && !withInitialPing) { if (log_.trace()) log_.trace(ME, "startPinger: the pinger is already running. I will return without starting a new thread"); return false; } long delay = 10000; long pingInterval = 0; if (connectQos_.isNull()) { ConnectQos tmp(global_); delay = tmp.getAddress()->getDelay(); pingInterval = tmp.getAddress()->getPingInterval(); } else { delay = connectQos_->getAddress()->getDelay(); pingInterval = connectQos_->getAddress()->getPingInterval(); } if (log_.trace()) { log_.trace(ME, string("startPinger(status=") + getStatusString() + "): parameters are: delay '" + lexical_cast<std::string>(delay) + "' and pingInterval '" + lexical_cast<std::string>(pingInterval) + " withInitialPing=" + lexical_cast<string>(withInitialPing)); } if (delay > 0 && pingInterval > 0) { long delta = delay; if (status_ == ALIVE) delta = pingInterval; if (withInitialPing) delta = 400; pingPollTimerKey_ = global_.getPingTimer().addOrRefreshTimeoutListener(this, delta, NULL, pingPollTimerKey_); } return true;}string ConnectionsHandler::getStatusString() const{ if (status_ == ALIVE) return "ALIVE"; else if (status_ == POLLING) return "POLLING"; else if (status_ == DEAD) return "DEAD"; else if (status_ == START) return "START"; return "END";;}bool ConnectionsHandler::isConnected() const{ return status_ == ALIVE || status_ == POLLING;}bool ConnectionsHandler::isAlive() const{ return status_ == ALIVE;}bool ConnectionsHandler::isPolling() const{ return status_ == POLLING;}bool ConnectionsHandler::isDead() const{ return status_ == DEAD;}ConnectReturnQosRef ConnectionsHandler::connectRaw(const ConnectQosRef& connectQos){ if (log_.call()) log_.call(ME, "::connectRaw"); connectReturnQos_ = connection_->connect(connectQos); connectQos_ = connectQos; log_.info(ME, string("Successfully connected with sessionId = '") + connectReturnQos_->getSessionQos().getSecretSessionId() + "'"); connectQos_->getSessionQos().setSecretSessionId(connectReturnQos_->getSessionQos().getSecretSessionId()); return connectReturnQos_;}I_XmlBlasterConnection& ConnectionsHandler::getConnection() const{ if (!connection_) { throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "::getConnection", "the connection is still NULL: it is not assigned yet. You probably called this method before a connection was made"); } return *connection_;}ConnectReturnQosRef ConnectionsHandler::getConnectReturnQos(){ return connectReturnQos_;}ConnectQosRef ConnectionsHandler::getConnectQos(){ return connectReturnQos_; // contains everything and is typedef on ConnectQos}/*void ConnectionsHandler::setConnectReturnQos(const connectReturnQos& retQos){ if (connectReturnQos_) { delete connectReturnQos_; connectReturnQos_ = NULL; } connectReturnQos_ = new ConnectReturnQos(retQos);}*/}}}} // namespaces
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -