📄 socketdriver.cpp
字号:
SubscribeReturnQos SocketDriver::subscribe(const SubscribeKey& key, const SubscribeQos& qos)
{
::ExceptionStruct socketException;
Lock lock(mutex_);
if (connection_ == 0) {
throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
}
try {
char *response = connection_->subscribe(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
if (*socketException.errorCode != 0) {
throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
}
SubscribeReturnQos subscribeReturnQos(global_, statusQosFactory_.readObject(response));
xmlBlasterFree(response);
return subscribeReturnQos;
} catch_MACRO("::subscribe", false)
}
vector<MessageUnit> SocketDriver::get(const GetKey& getKey, const GetQos& getQos)
{
::ExceptionStruct socketException;
Lock lock(mutex_);
if (connection_ == 0) {
throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
}
try {
MsgUnitArr *msgUnitArr; // The returned C struct array
string key = getKey.toXml();
string qos = getQos.toXml();
msgUnitArr = connection_->get(connection_, key.c_str(), qos.c_str(), &socketException);
if (*socketException.errorCode != 0) {
throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
}
if (msgUnitArr != (MsgUnitArr *)0) {
vector<MessageUnit> ret;
for (size_t i=0; i<msgUnitArr->len; i++) {
MsgKeyData msgKeyData = msgKeyFactory_.readObject(string(msgUnitArr->msgUnitArr[i].key));
MsgQosData msgQosData = msgQosFactory_.readObject(string(msgUnitArr->msgUnitArr[i].qos));
MessageUnit messageUnit(msgKeyData,
msgUnitArr->msgUnitArr[i].contentLen,
(const unsigned char*)msgUnitArr->msgUnitArr[i].content,
msgQosData);
ret.insert(ret.end(), messageUnit);
}
freeMsgUnitArr(msgUnitArr);
return ret;
}
} catch_MACRO("::get", false)
return vector<MessageUnit>();
}
vector<UnSubscribeReturnQos>
SocketDriver::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos)
{
::ExceptionStruct socketException;
Lock lock(mutex_);
if (connection_ == 0) {
throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
}
try {
QosArr* retC = connection_->unSubscribe(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
if (*socketException.errorCode != 0) {
throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
}
vector<UnSubscribeReturnQos> ret;
for (size_t ii=0; ii<retC->len; ii++) {
ret.insert(ret.end(), UnSubscribeReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[ii])));
}
freeQosArr(retC);
return ret;
} catch_MACRO("::unSubscribe", false)
return vector<UnSubscribeReturnQos>();
}
PublishReturnQos SocketDriver::publish(const MessageUnit& msgUnit)
{
::ExceptionStruct socketException;
Lock lock(mutex_);
if (connection_ == 0) {
throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
}
try {
if (log_.call()) log_.call(ME, "publish");
::MsgUnit msgUnitC;
const string key = msgUnit.getKey().toXml();
msgUnitC.key = key.c_str();
msgUnitC.content = reinterpret_cast<const char *>(msgUnit.getContent());
msgUnitC.contentLen = msgUnit.getContentLen();
const string qos = msgUnit.getQos().toXml();
msgUnitC.qos = qos.c_str();
char* response = connection_->publish(connection_, &msgUnitC, &socketException);
if (*socketException.errorCode != 0) {
throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
}
//freeMsgUnitData(&msgUnitC); -> not needed as it contains pointers only
if (log_.trace()) log_.trace(ME, "successfully published");
PublishReturnQos publishReturnQos(global_, statusQosFactory_.readObject(response));
xmlBlasterFree(response);
return publishReturnQos;
} catch_MACRO("::publish", false)
}
void SocketDriver::publishOneway(const vector<MessageUnit> &msgUnitArr)
{
::ExceptionStruct socketException;
Lock lock(mutex_);
if (connection_ == 0) {
throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
}
try {
// Copy C++ MessageUnit to C MsgUnit
::MsgUnitArr msgUnitArrC;
vector<MessageUnit>::const_iterator iter;
memset(&msgUnitArrC, 0, sizeof(::MsgUnitArr));
msgUnitArrC.len = msgUnitArr.size();
msgUnitArrC.msgUnitArr = (::MsgUnit *)calloc(msgUnitArrC.len, sizeof(::MsgUnit));
size_t ii=0;
vector<string> keyArr; // We need to hold key/qos on the stack because toXml() returns a temporary string
vector<string> qosArr;
for (iter = msgUnitArr.begin(); iter != msgUnitArr.end(); ++iter) {
//log_.trace(ME, "ii=" + lexical_cast<string>(ii) + ", len=" + lexical_cast<string>(msgUnitArrC.len));
const MessageUnit& msgUnitCpp = *iter;
::MsgUnit& msgUnitC = msgUnitArrC.msgUnitArr[ii];
keyArr.push_back(msgUnitCpp.getKey().toXml());
msgUnitC.key = keyArr[ii].c_str();
qosArr.push_back(msgUnitCpp.getQos().toXml());
msgUnitC.qos = qosArr[ii].c_str();
msgUnitC.contentLen = (size_t)msgUnitCpp.getContentLen();
msgUnitC.content = reinterpret_cast<const char *>(msgUnitCpp.getContent());
ii++;
}
connection_->publishOneway(connection_, &msgUnitArrC, &socketException);
::free(msgUnitArrC.msgUnitArr);
if (*socketException.errorCode != 0) {
throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
}
} catch_MACRO("::publishOneway", false)
}
vector<PublishReturnQos> SocketDriver::publishArr(const vector<MessageUnit> &msgUnitArr)
{
::ExceptionStruct socketException;
Lock lock(mutex_);
if (connection_ == 0) {
throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
}
try {
// Copy C++ MessageUnit to C MsgUnit
::MsgUnitArr msgUnitArrC;
vector<MessageUnit>::const_iterator iter;
memset(&msgUnitArrC, 0, sizeof(::MsgUnitArr));
msgUnitArrC.len = msgUnitArr.size();
msgUnitArrC.msgUnitArr = (::MsgUnit *)calloc(msgUnitArrC.len, sizeof(::MsgUnit));
size_t ii=0;
vector<string> keyArr; // We need to hold key/qos on the stack because toXml() returns a temporary string
vector<string> qosArr;
for (iter = msgUnitArr.begin(); iter != msgUnitArr.end(); ++iter) {
//log_.trace(ME, "ii=" + lexical_cast<string>(ii) + ", len=" + lexical_cast<string>(msgUnitArrC.len));
const MessageUnit& msgUnitCpp = *iter;
::MsgUnit& msgUnitC = msgUnitArrC.msgUnitArr[ii];
keyArr.push_back(msgUnitCpp.getKey().toXml());
msgUnitC.key = keyArr[ii].c_str();
qosArr.push_back(msgUnitCpp.getQos().toXml());
msgUnitC.qos = qosArr[ii].c_str();
msgUnitC.contentLen = (size_t)msgUnitCpp.getContentLen();
msgUnitC.content = reinterpret_cast<const char *>(msgUnitCpp.getContent());
ii++;
}
QosArr* retC = connection_->publishArr(connection_, &msgUnitArrC, &socketException);
::free(msgUnitArrC.msgUnitArr);
if (*socketException.errorCode != 0) {
throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
}
vector<PublishReturnQos> ret;
for (size_t jj=0; jj<retC->len; jj++) {
ret.insert(ret.end(), PublishReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[jj])) );
}
freeQosArr(retC);
return ret;
} catch_MACRO("::publishArr", false)
return vector<PublishReturnQos>();
}
vector<EraseReturnQos> SocketDriver::erase(const EraseKey& key, const EraseQos& qos)
{
::ExceptionStruct socketException;
Lock lock(mutex_);
if (connection_ == 0) {
throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "Sorry, you are not connected to the server");
}
try {
QosArr* retC = connection_->erase(connection_, key.toXml().c_str(), qos.toXml().c_str(), &socketException);
if (*socketException.errorCode != 0) {
throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
}
vector<EraseReturnQos> ret;
for (size_t ii=0; ii<retC->len; ii++) {
ret.insert(ret.end(), EraseReturnQos(global_, statusQosFactory_.readObject(retC->qosArr[ii])) );
}
freeQosArr(retC);
return ret;
} catch_MACRO("::erase", false)
return vector<EraseReturnQos>();
}
I_ProgressListener* SocketDriver::registerProgressListener(I_ProgressListener *listener) {
I_ProgressListener *old = this->progressListener_;
this->progressListener_ = listener;
if (connection_ && connection_->callbackP != 0) {
connection_->callbackP->readFromSocket.numReadUserP = this;
if (this->progressListener_ && connection_->callbackP != 0) {
connection_->callbackP->readFromSocket.numReadFuncP = callbackProgressListener;
}
else {
connection_->callbackP->readFromSocket.numReadFuncP = 0; // Dangerous: not thread safe, TODO: Add a mutex
}
}
return old;
}
string SocketDriver::usage()
{
char usage[XMLBLASTER_MAX_USAGE_LEN];
::xmlBlasterAccessUnparsedUsage(usage);
return "\nThe SOCKET plugin configuration:" +
string(usage);
}
// Exception conversion ....
org::xmlBlaster::util::XmlBlasterException SocketDriver::convertFromSocketException(const ::ExceptionStruct& ex) const
{
return org::xmlBlaster::util::XmlBlasterException(
(*ex.errorCode=='\0')?string("internal.unknown"):string(ex.errorCode),
string(""),
ME,
"en",
string(ex.message),
global_.getVersion() + " " + global_.getBuildTimestamp());
// TODO: isServerSide!!!
}
::ExceptionStruct SocketDriver::convertToSocketException(org::xmlBlaster::util::XmlBlasterException& ex)
{
::ExceptionStruct exSocket;
::initializeXmlBlasterException(&exSocket);
strncpy0(exSocket.errorCode, ex.getErrorCodeStr().c_str(), XMLBLASTEREXCEPTION_ERRORCODE_LEN);
strncpy0(exSocket.message, ex.getMessage().c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
//exSocket.remote = ??
return exSocket;
}
}}}}} // namespaces
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -