⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketdriver.cpp

📁 java开源的企业总线.xmlBlaster
💻 CPP
📖 第 1 页 / 共 3 页
字号:

SubscribeReturnQos SocketDriver::subscribe(const SubscribeKey& key, const SubscribeQos& qos)
{
   ::ExceptionStruct socketException;
   Lock lock(mutex_);
   if (connection_ == 0) {
      throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
   }
   try {
      char *response = connection_->subscribe(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
      if (*socketException.errorCode != 0) {
         throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
      }
      SubscribeReturnQos subscribeReturnQos(global_, statusQosFactory_.readObject(response));
      xmlBlasterFree(response);
      return subscribeReturnQos;
   } catch_MACRO("::subscribe", false)
}

vector<MessageUnit> SocketDriver::get(const GetKey& getKey, const GetQos& getQos)
{
   ::ExceptionStruct socketException;
   Lock lock(mutex_);
   if (connection_ == 0) {
      throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
   }
   try {
      MsgUnitArr *msgUnitArr;  // The returned C struct array
      string key = getKey.toXml();
      string qos = getQos.toXml();
      msgUnitArr = connection_->get(connection_, key.c_str(), qos.c_str(), &socketException);
      if (*socketException.errorCode != 0) {
         throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
      }
      if (msgUnitArr != (MsgUnitArr *)0) {
         vector<MessageUnit> ret;
         for (size_t i=0; i<msgUnitArr->len; i++) {
            MsgKeyData msgKeyData = msgKeyFactory_.readObject(string(msgUnitArr->msgUnitArr[i].key));
            MsgQosData msgQosData = msgQosFactory_.readObject(string(msgUnitArr->msgUnitArr[i].qos));
            MessageUnit messageUnit(msgKeyData,
                         msgUnitArr->msgUnitArr[i].contentLen,
                         (const unsigned char*)msgUnitArr->msgUnitArr[i].content,
                         msgQosData);
            ret.insert(ret.end(),  messageUnit);
         }
         freeMsgUnitArr(msgUnitArr);
         return ret;
      }
   } catch_MACRO("::get", false)
   return vector<MessageUnit>();
}

vector<UnSubscribeReturnQos>
SocketDriver::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos)
{
   ::ExceptionStruct socketException;
   Lock lock(mutex_);
   if (connection_ == 0) {
      throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
   }
   try {
      QosArr* retC = connection_->unSubscribe(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
      if (*socketException.errorCode != 0) {
         throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
      }
      vector<UnSubscribeReturnQos> ret;
      for (size_t ii=0; ii<retC->len; ii++) {
         ret.insert(ret.end(),  UnSubscribeReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[ii])));
      }
      freeQosArr(retC);
      return ret;
   } catch_MACRO("::unSubscribe", false)
   return vector<UnSubscribeReturnQos>();
}

PublishReturnQos SocketDriver::publish(const MessageUnit& msgUnit)
{
   ::ExceptionStruct socketException;
   Lock lock(mutex_);
   if (connection_ == 0) {
      throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
   }
   try {
      if (log_.call()) log_.call(ME, "publish");
      ::MsgUnit msgUnitC;
      const string key = msgUnit.getKey().toXml();
      msgUnitC.key = key.c_str();
      msgUnitC.content = reinterpret_cast<const char *>(msgUnit.getContent());
      msgUnitC.contentLen = msgUnit.getContentLen();
      const string qos = msgUnit.getQos().toXml();
      msgUnitC.qos = qos.c_str();

      char* response = connection_->publish(connection_, &msgUnitC, &socketException);

      if (*socketException.errorCode != 0) {
         throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
      }

      //freeMsgUnitData(&msgUnitC); -> not needed as it contains pointers only
      if (log_.trace()) log_.trace(ME, "successfully published");
      PublishReturnQos publishReturnQos(global_, statusQosFactory_.readObject(response));
      xmlBlasterFree(response);
      return publishReturnQos;
   } catch_MACRO("::publish", false)
}

void SocketDriver::publishOneway(const vector<MessageUnit> &msgUnitArr)
{
   ::ExceptionStruct socketException;
   Lock lock(mutex_);
   if (connection_ == 0) {
      throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
   }
   try {

      // Copy C++ MessageUnit to C MsgUnit
      ::MsgUnitArr msgUnitArrC;
      vector<MessageUnit>::const_iterator iter;
      memset(&msgUnitArrC, 0, sizeof(::MsgUnitArr));
      msgUnitArrC.len = msgUnitArr.size();
      msgUnitArrC.msgUnitArr = (::MsgUnit *)calloc(msgUnitArrC.len, sizeof(::MsgUnit));
      size_t ii=0;
      vector<string> keyArr;  // We need to hold key/qos on the stack because toXml() returns a temporary string
      vector<string> qosArr;
      for (iter = msgUnitArr.begin(); iter != msgUnitArr.end(); ++iter) {
         //log_.trace(ME, "ii=" + lexical_cast<string>(ii) + ", len=" + lexical_cast<string>(msgUnitArrC.len));
         const MessageUnit& msgUnitCpp = *iter;
         ::MsgUnit& msgUnitC = msgUnitArrC.msgUnitArr[ii];
         keyArr.push_back(msgUnitCpp.getKey().toXml());
         msgUnitC.key = keyArr[ii].c_str();
         qosArr.push_back(msgUnitCpp.getQos().toXml());
         msgUnitC.qos = qosArr[ii].c_str();
         msgUnitC.contentLen = (size_t)msgUnitCpp.getContentLen();
         msgUnitC.content = reinterpret_cast<const char *>(msgUnitCpp.getContent());
         ii++;
      }

      connection_->publishOneway(connection_, &msgUnitArrC, &socketException);

      ::free(msgUnitArrC.msgUnitArr);

      if (*socketException.errorCode != 0) {
         throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
      }
   } catch_MACRO("::publishOneway", false)
}

vector<PublishReturnQos> SocketDriver::publishArr(const vector<MessageUnit> &msgUnitArr)
{
   ::ExceptionStruct socketException;
   Lock lock(mutex_);
   if (connection_ == 0) {
      throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
   }
   try {

      // Copy C++ MessageUnit to C MsgUnit
      ::MsgUnitArr msgUnitArrC;
      vector<MessageUnit>::const_iterator iter;
      memset(&msgUnitArrC, 0, sizeof(::MsgUnitArr));
      msgUnitArrC.len = msgUnitArr.size();
      msgUnitArrC.msgUnitArr = (::MsgUnit *)calloc(msgUnitArrC.len, sizeof(::MsgUnit));
      size_t ii=0;
      vector<string> keyArr;  // We need to hold key/qos on the stack because toXml() returns a temporary string
      vector<string> qosArr;
      for (iter = msgUnitArr.begin(); iter != msgUnitArr.end(); ++iter) {
         //log_.trace(ME, "ii=" + lexical_cast<string>(ii) + ", len=" + lexical_cast<string>(msgUnitArrC.len));
         const MessageUnit& msgUnitCpp = *iter;
         ::MsgUnit& msgUnitC = msgUnitArrC.msgUnitArr[ii];
         keyArr.push_back(msgUnitCpp.getKey().toXml());
         msgUnitC.key = keyArr[ii].c_str();
         qosArr.push_back(msgUnitCpp.getQos().toXml());
         msgUnitC.qos = qosArr[ii].c_str();
         msgUnitC.contentLen = (size_t)msgUnitCpp.getContentLen();
         msgUnitC.content = reinterpret_cast<const char *>(msgUnitCpp.getContent());
         ii++;
      }

      QosArr* retC = connection_->publishArr(connection_, &msgUnitArrC, &socketException);

      ::free(msgUnitArrC.msgUnitArr);

      if (*socketException.errorCode != 0) {
         throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
      }
      vector<PublishReturnQos> ret;
      for (size_t jj=0; jj<retC->len; jj++) {
         ret.insert(ret.end(),  PublishReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[jj])) );
      }
      freeQosArr(retC);
      return ret;
   } catch_MACRO("::publishArr", false)
   return vector<PublishReturnQos>();
}

vector<EraseReturnQos> SocketDriver::erase(const EraseKey& key, const EraseQos& qos)
{
   ::ExceptionStruct socketException;
   Lock lock(mutex_);
   if (connection_ == 0) {
      throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
   }
   try {
      QosArr* retC = connection_->erase(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
      if (*socketException.errorCode != 0) {
         throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
      }
      vector<EraseReturnQos> ret;
      for (size_t ii=0; ii<retC->len; ii++) {
         ret.insert(ret.end(),  EraseReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[ii])) );
      }
      freeQosArr(retC);
      return ret;
   } catch_MACRO("::erase", false)
   return vector<EraseReturnQos>();
}

I_ProgressListener* SocketDriver::registerProgressListener(I_ProgressListener *listener) {
   I_ProgressListener *old = this->progressListener_;
   this->progressListener_ = listener;
   if (connection_ && connection_->callbackP != 0) {
      connection_->callbackP->readFromSocket.numReadUserP = this;
      if (this->progressListener_ && connection_->callbackP != 0) {
         connection_->callbackP->readFromSocket.numReadFuncP = callbackProgressListener;
      }
      else {
         connection_->callbackP->readFromSocket.numReadFuncP = 0; // Dangerous: not thread safe, TODO: Add a mutex
      }
   }
   return old;
}

string SocketDriver::usage()
{
   char usage[XMLBLASTER_MAX_USAGE_LEN];
   ::xmlBlasterAccessUnparsedUsage(usage);
   return  "\nThe SOCKET plugin configuration:" +
           string(usage);
}

// Exception conversion ....
org::xmlBlaster::util::XmlBlasterException SocketDriver::convertFromSocketException(const ::ExceptionStruct& ex) const
{
   return org::xmlBlaster::util::XmlBlasterException(
            (*ex.errorCode=='\0')?string("internal.unknown"):string(ex.errorCode),
            string(""),
            ME,
            "en",
            string(ex.message),
            global_.getVersion() + " " + global_.getBuildTimestamp());
            // TODO: isServerSide!!!
}


::ExceptionStruct SocketDriver::convertToSocketException(org::xmlBlaster::util::XmlBlasterException& ex)
{
   ::ExceptionStruct exSocket;
   ::initializeXmlBlasterException(&exSocket);
   strncpy0(exSocket.errorCode, ex.getErrorCodeStr().c_str(), XMLBLASTEREXCEPTION_ERRORCODE_LEN);
   strncpy0(exSocket.message, ex.getMessage().c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
   //exSocket.remote = ??
   return exSocket;
}

}}}}} // namespaces

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -