⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketdriver.cpp

📁 java开源的企业总线.xmlBlaster
💻 CPP
📖 第 1 页 / 共 3 页
字号:
}

/**
 * Called on polling, must be synchronized from outside,
 * throws an exception on failure
 */
void SocketDriver::reconnectOnIpLevel(void)
{
   log_.trace(ME, "Trying to reconnect to server");

   freeResources(true); // Cleanup if old connection exists

   // Give a chance to new configuration settings
   if (argsStructP_ != 0) {
      global_.freeArgs(*argsStructP_);
      delete argsStructP_;
      argsStructP_ = 0;
   }
   argsStructP_ = new ArgsStruct_T;
   global_.fillArgs(*argsStructP_);

   ::ExceptionStruct socketException;

   try {
      connection_ = getXmlBlasterAccessUnparsed((int)argsStructP_->argc, argsStructP_->argv);
      connection_->userObject = this; // Transports us to the myUpdate() method
      connection_->log = myLogger;    // Register our own logging function
      connection_->logUserP = this;   // Pass SocketDriver to myLogger()
   } catch_MACRO("::Constructor", true)
   
   try {
      if (log_.trace()) log_.trace(ME, "Before createCallbackServer");
      if (connection_->initialize(connection_, myUpdate, &socketException) == false) {
         if (log_.trace()) log_.trace(ME, string("Reconnection to xmlBlaster failed, please start the server or check your network: ") + socketException.message);
         throw socketException;
      }
      registerProgressListener(this->progressListener_); // Re-register
      if (log_.trace()) log_.trace(ME, "After createCallbackServer");
   } catch_MACRO("::initialize", true)
}

SocketDriver::~SocketDriver()
{
   if (log_.call()) log_.call(ME, "~SocketDriver()");
   try {
      freeResources(true);
   }
   catch (...) {
      log_.error(ME, "Unexpected catch in ~SocketDriver()");
   }
}

XMLBLASTER_C_bool myUpdate(::MsgUnitArr *msgUnitArr, void *userData,
                     ::ExceptionStruct *exception)
{
   XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userData;
   SocketDriver* socketDriver = static_cast<SocketDriver*>(xa->userObject);
   Global& global = socketDriver->getGlobal();
   I_Log& log = socketDriver->getLog();
   const string &ME = socketDriver->me();

   try {
      for (size_t i=0; i<msgUnitArr->len; i++) {
         //char *xml = messageUnitToXml(&msgUnitArr->msgUnitArr[i]);
         //printf("[client] CALLBACK update(): Asynchronous message update arrived:%s\n",xml);
                        //xmlBlasterFree(xml);
         if (log.trace()) log.trace(ME, "Received callback message");
         ::MsgUnit& msgUnit = msgUnitArr->msgUnitArr[i];
         I_Callback* cb = socketDriver->getCallbackClient();
         if (cb != 0) {
            UpdateKey updateKey(global, socketDriver->getMsgKeyFactory().readObject(string(msgUnit.key)));
            UpdateQos updateQos(global, socketDriver->getMsgQosFactory().readObject(string(msgUnit.qos)));
            std::string retQos = cb->update(msgUnitArr->secretSessionId,
                          updateKey, (const unsigned char*)msgUnit.content,
                          msgUnit.contentLen, updateQos);
            msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc(retQos.c_str());
         }
         else { /* Return QoS: Everything is OK */
            log.error(ME, string("Ignoring unexpected update message as client has not registered a callback: ") + msgUnit.key);
            msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc(Constants::RET_OK); // "<qos><state id='OK'/></qos>");
         }
      }
      //throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "TEST THROWING EXCEPTION");
   } 
   catch (XmlBlasterException &e) {
      string tmp = "Exception caught in C++ update(), " +
                   lexical_cast<std::string>(msgUnitArr->len) +
                   " messages are handled as not delivered: " +
                   e.getMessage();
      log.error(ME, tmp);
      for (size_t i=0; i<msgUnitArr->len; i++) {
         char* xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100);
         log.error(ME, xml);
         xmlBlasterFree(xml);
      }
      strncpy0(exception->errorCode, e.getErrorCodeStr().c_str(), XMLBLASTEREXCEPTION_ERRORCODE_LEN);
      strncpy0(exception->message, tmp.c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
      return (XMLBLASTER_C_bool)0;
   }
   catch(...) {
      string tmp = "Unidentified exception caught in C++ update(), " + lexical_cast<std::string>(msgUnitArr->len) + " messages are handled as not delivered";
      log.error(ME, tmp);
      for (size_t i=0; i<msgUnitArr->len; i++) {
         char* xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100);
         log.error(ME, xml);
         xmlBlasterFree(xml);
      }
      strncpy0(exception->errorCode, "user.update.error", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
      strncpy0(exception->message, tmp.c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
      return (XMLBLASTER_C_bool)0;
   }
   return (XMLBLASTER_C_bool)1;
}

I_Callback* SocketDriver::getCallbackClient()
{
   return callbackClient_;
}

/** Enforced by I_CallbackServer */
void SocketDriver::initialize(const string& name, I_Callback &client)
{
   ::ExceptionStruct socketException;
   ME = string("SocketDriver-") + instanceName_ + "-" + name;
   if (log_.call()) log_.call(ME, "initialize() callback server");
   callbackClient_ = &client;
   Lock lock(mutex_);
   if (connection_ == 0) {
      if (log_.trace()) log_.trace(ME, "ERROR: connection_ is null");
      throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, name, ME + ".initialize", "en",
                       global_.getVersion() + " " + global_.getBuildTimestamp() + " The connection_ handle is NULL");
   }
   try {
      if (log_.trace()) log_.trace(ME, "Before createCallbackServer");
      if (connection_->initialize(connection_, myUpdate, &socketException) == false) {
         log_.warn(ME, "Connection to xmlBlaster failed,"
                " please start the server or check your configuration\n");
         freeResources(true);
      }
      if (log_.trace()) log_.trace(ME, "After createCallbackServer");
   } catch_MACRO("::initialize", true)
}

string SocketDriver::getCbProtocol()
{
    return Constants::SOCKET; // "SOCKET";
}                             

string SocketDriver::getCbAddress()
{
   Lock lock(mutex_);
   if (connection_ == 0 || connection_->callbackP == 0) {
      return string("socket://:");
   }
   try {
      return string("socket://") + string(connection_->callbackP->hostCB) + ":" +
             lexical_cast<std::string>(connection_->callbackP->portCB);
   } catch_MACRO("::getCbAddress", false)
}

bool SocketDriver::shutdownCb()
{
   Lock lock(mutex_);
   if (connection_ == 0 || connection_->callbackP == 0) return false;
   connection_->callbackP->shutdown(connection_->callbackP);
   return true;
}

ConnectReturnQosRef SocketDriver::connect(const ConnectQosRef& qos) //throw (XmlBlasterException) // Visual C++ emits a warning with this throw clause
{
   if (log_.call()) log_.call(ME, string("connect() ") + string((connection_==0)?"connection_==0":"connection_!=0") +
                              ", secretSessionId_="+secretSessionId_);
                              //+" isConnected=" + ((connection_==0)?XMLBLASTER_FALSE:lexical_cast<string>(connection_->isConnected(connection_))));
   ::ExceptionStruct socketException;
   Lock lock(mutex_);
   try {
      loginName_ = qos->getUserId();
      if (connection_ == 0) {
         reconnectOnIpLevel(); // Connects on IP level only, throws an exception on failure
         if (secretSessionId_ != "") {
            qos->getSessionQos().setSecretSessionId(secretSessionId_);
         }
         if (connection_ != 0 && connection_->callbackP != 0) {
            ConnectQos *qq = const_cast<ConnectQos*>(&(*qos));
            if (qq->getSessionCbQueueProperty().getCurrentCallbackAddress()->getType() == Constants::SOCKET) {
               // Force callback address, it could have changed on reconnect (checked to cb not be a delegate)
               string addr = string("socket://") + string(connection_->callbackP->hostCB) + ":" +
                      lexical_cast<std::string>(connection_->callbackP->portCB);
               qq->getSessionCbQueueProperty().getCurrentCallbackAddress()->setAddress(addr);
               log_.trace(ME, "Setting callback address to " + addr);
            }
         }
      }

      char *retQos = connection_->connect(connection_, qos->toXml().c_str(),
                                          myUpdate, &socketException);
      if (*socketException.errorCode != 0) {
         throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
      }
      ConnectQosFactory factory(global_);
      ConnectReturnQosRef connectReturnQos = factory.readObject(retQos);
      xmlBlasterFree(retQos);
      secretSessionId_ = connectReturnQos->getSecretSessionId();
      return connectReturnQos;
   } catch_MACRO("::connect", false)
}

bool SocketDriver::disconnect(const DisconnectQos& qos)
{
   if (log_.call()) log_.call(ME, "disconnect()");
   if (connection_ == 0) return false;
   ::ExceptionStruct socketException;
   Lock lock(mutex_);
   try {
      bool ret = connection_->disconnect(connection_, qos.toXml().c_str(), &socketException);
      if (*socketException.errorCode != 0) {
         throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
      }
      return ret;
   } catch_MACRO("::disconnect", false)
   return true;
}

string SocketDriver::getProtocol()
{
   return Constants::SOCKET; // "SOCKET";
}

/** Called when going to POLLING mode */
bool SocketDriver::shutdown()
{
   if (log_.call()) log_.call(ME, "shutdown()");
   Lock lock(mutex_);
   if (connection_ == 0) return false;
   freeResources(true);
   return true;
}

string SocketDriver::getLoginName()
{
   return loginName_;
}

bool SocketDriver::isLoggedIn()
{
   Lock lock(mutex_);
   return connection_ != 0 && connection_->isConnected(connection_);
}

string SocketDriver::ping(const string& qos)
{
   ::ExceptionStruct socketException;
   Lock lock(mutex_);
   if (connection_ == 0) {
      throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, "", ME + ".ping", "en",
                       global_.getVersion() + " " + global_.getBuildTimestamp() + " The connection_ handle is NULL");
   }
   try {
      char *retQosP = connection_->ping(connection_, qos.c_str(), &socketException);
      if (retQosP == 0 || *socketException.errorCode != 0) {
         throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
      }
      string retQos(retQosP);
      xmlBlasterFree(retQosP);
      return retQos;
   } catch_MACRO("::ping", false)
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -