⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connectionshandler.cpp

📁 java开源的企业总线.xmlBlaster
💻 CPP
📖 第 1 页 / 共 3 页
字号:
   retQos.getData().setState(Constants::STATE_OK);   retQos.getData().setStateInfo(Constants::INFO_QUEUED); // "QUEUED"   qos.setSubscriptionId(subscriptionId);   SubscribeQueueEntry entry(global_, key, qos, qos.getData().getPriority());   queue_->put(entry);   //if (log_.trace())       log_.warn(ME, string("queueSubscribe: entry '") + key.getOid() +                     "' has been queued with client side generated subscriptionId=" + subscriptionId);   return retQos;}PublishReturnQos ConnectionsHandler::queuePublish(const MessageUnit& msgUnit){   if (log_.call()) log_.call(ME, "queuePublish");   if (!queue_) {      if (connectQos_.isNull()) {         throw XmlBlasterException(INTERNAL_PUBLISH, ME + "::queuePublish", "need to create a queue but the connectQos is NULL (probably never connected)");      }      if (log_.trace()) log_.trace(ME+":queuePublish", "creating a client queue ...");      queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());      if (log_.trace()) log_.trace(ME+":queuePublish", "created a client queue");   }   if (log_.trace())       log_.trace(ME, string("queuePublish: entry '") + msgUnit.getKey().getOid() + "' has been queued");   PublishReturnQos retQos(global_);   retQos.setKeyOid(msgUnit.getKey().getOid());   retQos.setState(Constants::STATE_OK);   retQos.getData().setStateInfo(Constants::INFO_QUEUED); // "QUEUED"   PublishQueueEntry entry(global_, msgUnit, msgUnit.getQos().getPriority());   queue_->put(entry);   return retQos;}ConnectReturnQosRef& ConnectionsHandler::queueConnect(){   if (log_.call()) log_.call(ME, string("::queueConnect with sessionQos: '") + connectQos_->getSessionQos().getAbsoluteName() + "'");   long tmp = connectQos_->getSessionQos().getPubSessionId();    if ( tmp <= 0) {      if (log_.trace()) log_.trace(ME, string("::queueConnect, the public session id is '") + lexical_cast<std::string>(tmp));      throw XmlBlasterException(USER_CONNECT, ME + "::queueConnect", "queueing connection request not possible because you did not specify a positive public sessionId");   }   if (!queue_) {      if (log_.trace()) log_.info(ME, "::queueConnect: created a client queue");      queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());   }   if (log_.trace())       log_.trace(ME, string("queueConnect: entry '") + connectQos_->getSessionQos().getAbsoluteName() + "' has been queued");   connectReturnQos_ = new ConnectReturnQos(*connectQos_);   /* Michele thinks we should not queue the ConnectQos   ConnectQueueEntry entry(global_, *connectQos_);   queue_->put(entry);   */   enum States oldState = status_;   status_ = POLLING;   if ( connectionProblemsListener_ ) {      connectionProblemsListener_->reachedPolling(oldState, this);      // stopping   }   startPinger(true);   return connectReturnQos_;}I_PostSendListener* ConnectionsHandler::registerPostSendListener(I_PostSendListener *listener) {   I_PostSendListener* old = postSendListener_;    postSendListener_ = listener;   return old;}/** * Flushes all entries in the queue, i.e. the entries of the queue are sent to xmlBlaster. * If the queue is empty or NULL, then 0 is returned. If the state is in POLLING or DEAD, or the  * connection is not established yet (i.e. connection_ = NULL),  then -1 is * returned.. This method blocks until all entries in the queue have been sent. */long ConnectionsHandler::flushQueue(){   if (log_.call()) log_.call(ME, "flushQueue");   //   Lock lock(connectionMutex_);   if (!queue_) {      if (connectQos_.isNull()) {         log_.error(ME+".flusgQueue", "need to create a queue but the connectQos is NULL (probably never connected)");      }      if (log_.trace()) log_.trace(ME+".flushQueue", "creating the client queue ...");      queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());      if (queue_->getNumOfEntries() < 1) {         if (log_.trace()) log_.trace(ME+".flushQueue", "Created queue [" + queue_->getType() + "][" + queue_->getVersion() +                                                        "], it is empty, nothing to do.");         return 0;      }      log_.info(ME, "Created queue [" + queue_->getType() + "][" + queue_->getVersion() + "] which contains " +                    lexical_cast<string>(queue_->getNumOfEntries()) + " entries.");   }   return flushQueueUnlocked(queue_, true);}     long ConnectionsHandler::flushQueueUnlocked(I_Queue *queueToFlush, bool doRemove){   if ( log_.call() ) log_.call(ME, "flushQueueUnlocked");           if (!queueToFlush || queueToFlush->empty()) return 0;   if (status_ != ALIVE || connection_ == NULL) return -1;   long ret = 0;   if (!queueToFlush->empty()) {      log_.info(ME, "Queue [" + queue_->getType() + "][" + queue_->getVersion() + "] contains " +                  lexical_cast<string>(queue_->getNumOfEntries()) + " entries, we send them to the server");   }   while (!queueToFlush->empty()) {       long maxNumOfEntries= (doRemove) ? 1 : -1; // doRemove==false makes no sense, TODO: remove this arg      if (log_.trace()) log_.trace(ME, "flushQueueUnlocked: flushing one priority sweep maxNumOfEntries=" + lexical_cast<string>(maxNumOfEntries));      const vector<EntryType> entries = queueToFlush->peekWithSamePriority(maxNumOfEntries);      vector<EntryType>::const_iterator iter = entries.begin();      while (iter != entries.end()) {         try {            if (log_.trace()) log_.trace(ME, "sending the content to xmlBlaster: " + (*iter)->toXml());            const EntryType entry = (*iter);            const MsgQueueEntry &entry2 = *entry;            {               MsgQueueEntry &entry3 = const_cast<MsgQueueEntry&>(entry2);               entry3.setSender(connectReturnQos_->getSessionQos().getSessionName());            }            entry2.send(*this); // entry2 contains the PublishReturnQos after calling send            if (log_.trace()) log_.trace(ME, "content to xmlBlaster successfully sent");            I_PostSendListener *p = postSendListener_;            if (p) {                p->postSend(entry2);            }         }         catch (XmlBlasterException &ex) {           if (ex.isCommunication()) toPollingOrDead(&ex);           log_.warn(ME, "flushQueueUnlocked: can't send queued message to server: " + ex.getMessage());           //if (doRemove) queueToFlush->randomRemove(entries.begin(), iter);           throw ex;         }         iter++;      }      if (doRemove) {          //log_.trace(ME, "remove send message from client queue");          ret += queueToFlush->randomRemove(entries.begin(), entries.end());      }   }   return ret;}I_Queue* ConnectionsHandler::getQueue(){   if (!queue_) {      if (log_.trace()) log_.trace(ME+".getQueue", "creating the client queue ...");      queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());      log_.info(ME, "Created queue [" + queue_->getType() + "][" + queue_->getVersion() + "] which contains " +                    lexical_cast<string>(queue_->getNumOfEntries()) + " entries.");   }   return queue_;}bool ConnectionsHandler::isFailsafe() const{   if (connectQos_.isNull()) return false;   return connectQos_->getAddress()->getDelay() > 0;}// pinger or pollerbool ConnectionsHandler::startPinger(bool withInitialPing){   if (log_.call()) log_.call(ME, "startPinger");   if (doStopPing_) return false;   if (log_.trace()) log_.trace(ME, "startPinger (no request to stop the pinger is active for the moment)");   if (pingPollTimerKey_ != 0 && !withInitialPing) {      if (log_.trace()) log_.trace(ME, "startPinger: the pinger is already running. I will return without starting a new thread");      return false;     }   long delay        = 10000;   long pingInterval = 0;   if (connectQos_.isNull()) {      ConnectQos tmp(global_);      delay        = tmp.getAddress()->getDelay();      pingInterval = tmp.getAddress()->getPingInterval();   }   else {      delay        = connectQos_->getAddress()->getDelay();      pingInterval = connectQos_->getAddress()->getPingInterval();   }   if (log_.trace()) {      log_.trace(ME, string("startPinger(status=") +                getStatusString() +               "): parameters are: delay '" + lexical_cast<std::string>(delay) +               "' and pingInterval '" + lexical_cast<std::string>(pingInterval) +               " withInitialPing=" + lexical_cast<string>(withInitialPing));   }   if (delay > 0 && pingInterval > 0) {      long delta = delay;      if (status_ == ALIVE) delta = pingInterval;      if (withInitialPing) delta = 400;      pingPollTimerKey_ = global_.getPingTimer().addOrRefreshTimeoutListener(this, delta, NULL, pingPollTimerKey_);   }   return true;}string ConnectionsHandler::getStatusString() const{   if (status_ == ALIVE) return "ALIVE";   else if (status_ == POLLING) return "POLLING";   else if (status_ == DEAD) return "DEAD";   else if (status_ == START) return "START";   return "END";;}bool ConnectionsHandler::isConnected() const{   return status_ == ALIVE || status_ == POLLING;}bool ConnectionsHandler::isAlive() const{   return status_ == ALIVE;}bool ConnectionsHandler::isPolling() const{   return status_ == POLLING;}bool ConnectionsHandler::isDead() const{   return status_ == DEAD;}ConnectReturnQosRef ConnectionsHandler::connectRaw(const ConnectQosRef& connectQos){   if (log_.call()) log_.call(ME, "::connectRaw");   connectReturnQos_ = connection_->connect(connectQos);   connectQos_ = connectQos;   log_.info(ME, string("Successfully connected with sessionId = '") + connectReturnQos_->getSessionQos().getSecretSessionId() + "'");   connectQos_->getSessionQos().setSecretSessionId(connectReturnQos_->getSessionQos().getSecretSessionId());   return connectReturnQos_;}I_XmlBlasterConnection& ConnectionsHandler::getConnection() const{   if (!connection_) {      throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "::getConnection", "the connection is still NULL: it is not assigned yet. You probably called this method before a connection was made");   }   return *connection_;}ConnectReturnQosRef ConnectionsHandler::getConnectReturnQos(){   return connectReturnQos_;}ConnectQosRef ConnectionsHandler::getConnectQos(){   return connectReturnQos_; // contains everything and is typedef on ConnectQos}/*void ConnectionsHandler::setConnectReturnQos(const connectReturnQos& retQos){   if (connectReturnQos_)  {      delete connectReturnQos_;      connectReturnQos_ = NULL;   }   connectReturnQos_ = new ConnectReturnQos(retQos);}*/}}}} // namespaces

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -