📄 socketdriver.cpp
字号:
}
/**
* Called on polling, must be synchronized from outside,
* throws an exception on failure
*/
void SocketDriver::reconnectOnIpLevel(void)
{
log_.trace(ME, "Trying to reconnect to server");
freeResources(true); // Cleanup if old connection exists
// Give a chance to new configuration settings
if (argsStructP_ != 0) {
global_.freeArgs(*argsStructP_);
delete argsStructP_;
argsStructP_ = 0;
}
argsStructP_ = new ArgsStruct_T;
global_.fillArgs(*argsStructP_);
::ExceptionStruct socketException;
try {
connection_ = getXmlBlasterAccessUnparsed((int)argsStructP_->argc, argsStructP_->argv);
connection_->userObject = this; // Transports us to the myUpdate() method
connection_->log = myLogger; // Register our own logging function
connection_->logUserP = this; // Pass SocketDriver to myLogger()
} catch_MACRO("::Constructor", true)
try {
if (log_.trace()) log_.trace(ME, "Before createCallbackServer");
if (connection_->initialize(connection_, myUpdate, &socketException) == false) {
if (log_.trace()) log_.trace(ME, string("Reconnection to xmlBlaster failed, please start the server or check your network: ") + socketException.message);
throw socketException;
}
registerProgressListener(this->progressListener_); // Re-register
if (log_.trace()) log_.trace(ME, "After createCallbackServer");
} catch_MACRO("::initialize", true)
}
SocketDriver::~SocketDriver()
{
if (log_.call()) log_.call(ME, "~SocketDriver()");
try {
freeResources(true);
}
catch (...) {
log_.error(ME, "Unexpected catch in ~SocketDriver()");
}
}
XMLBLASTER_C_bool myUpdate(::MsgUnitArr *msgUnitArr, void *userData,
::ExceptionStruct *exception)
{
XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userData;
SocketDriver* socketDriver = static_cast<SocketDriver*>(xa->userObject);
Global& global = socketDriver->getGlobal();
I_Log& log = socketDriver->getLog();
const string &ME = socketDriver->me();
try {
for (size_t i=0; i<msgUnitArr->len; i++) {
//char *xml = messageUnitToXml(&msgUnitArr->msgUnitArr[i]);
//printf("[client] CALLBACK update(): Asynchronous message update arrived:%s\n",xml);
//xmlBlasterFree(xml);
if (log.trace()) log.trace(ME, "Received callback message");
::MsgUnit& msgUnit = msgUnitArr->msgUnitArr[i];
I_Callback* cb = socketDriver->getCallbackClient();
if (cb != 0) {
UpdateKey updateKey(global, socketDriver->getMsgKeyFactory().readObject(string(msgUnit.key)));
UpdateQos updateQos(global, socketDriver->getMsgQosFactory().readObject(string(msgUnit.qos)));
std::string retQos = cb->update(msgUnitArr->secretSessionId,
updateKey, (const unsigned char*)msgUnit.content,
msgUnit.contentLen, updateQos);
msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc(retQos.c_str());
}
else { /* Return QoS: Everything is OK */
log.error(ME, string("Ignoring unexpected update message as client has not registered a callback: ") + msgUnit.key);
msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc(Constants::RET_OK); // "<qos><state id='OK'/></qos>");
}
}
//throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "TEST THROWING EXCEPTION");
}
catch (XmlBlasterException &e) {
string tmp = "Exception caught in C++ update(), " +
lexical_cast<std::string>(msgUnitArr->len) +
" messages are handled as not delivered: " +
e.getMessage();
log.error(ME, tmp);
for (size_t i=0; i<msgUnitArr->len; i++) {
char* xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100);
log.error(ME, xml);
xmlBlasterFree(xml);
}
strncpy0(exception->errorCode, e.getErrorCodeStr().c_str(), XMLBLASTEREXCEPTION_ERRORCODE_LEN);
strncpy0(exception->message, tmp.c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
return (XMLBLASTER_C_bool)0;
}
catch(...) {
string tmp = "Unidentified exception caught in C++ update(), " + lexical_cast<std::string>(msgUnitArr->len) + " messages are handled as not delivered";
log.error(ME, tmp);
for (size_t i=0; i<msgUnitArr->len; i++) {
char* xml = messageUnitToXmlLimited(&msgUnitArr->msgUnitArr[i], 100);
log.error(ME, xml);
xmlBlasterFree(xml);
}
strncpy0(exception->errorCode, "user.update.error", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
strncpy0(exception->message, tmp.c_str(), XMLBLASTEREXCEPTION_MESSAGE_LEN);
return (XMLBLASTER_C_bool)0;
}
return (XMLBLASTER_C_bool)1;
}
I_Callback* SocketDriver::getCallbackClient()
{
return callbackClient_;
}
/** Enforced by I_CallbackServer */
void SocketDriver::initialize(const string& name, I_Callback &client)
{
::ExceptionStruct socketException;
ME = string("SocketDriver-") + instanceName_ + "-" + name;
if (log_.call()) log_.call(ME, "initialize() callback server");
callbackClient_ = &client;
Lock lock(mutex_);
if (connection_ == 0) {
if (log_.trace()) log_.trace(ME, "ERROR: connection_ is null");
throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, name, ME + ".initialize", "en",
global_.getVersion() + " " + global_.getBuildTimestamp() + " The connection_ handle is NULL");
}
try {
if (log_.trace()) log_.trace(ME, "Before createCallbackServer");
if (connection_->initialize(connection_, myUpdate, &socketException) == false) {
log_.warn(ME, "Connection to xmlBlaster failed,"
" please start the server or check your configuration\n");
freeResources(true);
}
if (log_.trace()) log_.trace(ME, "After createCallbackServer");
} catch_MACRO("::initialize", true)
}
string SocketDriver::getCbProtocol()
{
return Constants::SOCKET; // "SOCKET";
}
string SocketDriver::getCbAddress()
{
Lock lock(mutex_);
if (connection_ == 0 || connection_->callbackP == 0) {
return string("socket://:");
}
try {
return string("socket://") + string(connection_->callbackP->hostCB) + ":" +
lexical_cast<std::string>(connection_->callbackP->portCB);
} catch_MACRO("::getCbAddress", false)
}
bool SocketDriver::shutdownCb()
{
Lock lock(mutex_);
if (connection_ == 0 || connection_->callbackP == 0) return false;
connection_->callbackP->shutdown(connection_->callbackP);
return true;
}
ConnectReturnQosRef SocketDriver::connect(const ConnectQosRef& qos) //throw (XmlBlasterException) // Visual C++ emits a warning with this throw clause
{
if (log_.call()) log_.call(ME, string("connect() ") + string((connection_==0)?"connection_==0":"connection_!=0") +
", secretSessionId_="+secretSessionId_);
//+" isConnected=" + ((connection_==0)?XMLBLASTER_FALSE:lexical_cast<string>(connection_->isConnected(connection_))));
::ExceptionStruct socketException;
Lock lock(mutex_);
try {
loginName_ = qos->getUserId();
if (connection_ == 0) {
reconnectOnIpLevel(); // Connects on IP level only, throws an exception on failure
if (secretSessionId_ != "") {
qos->getSessionQos().setSecretSessionId(secretSessionId_);
}
if (connection_ != 0 && connection_->callbackP != 0) {
ConnectQos *qq = const_cast<ConnectQos*>(&(*qos));
if (qq->getSessionCbQueueProperty().getCurrentCallbackAddress()->getType() == Constants::SOCKET) {
// Force callback address, it could have changed on reconnect (checked to cb not be a delegate)
string addr = string("socket://") + string(connection_->callbackP->hostCB) + ":" +
lexical_cast<std::string>(connection_->callbackP->portCB);
qq->getSessionCbQueueProperty().getCurrentCallbackAddress()->setAddress(addr);
log_.trace(ME, "Setting callback address to " + addr);
}
}
}
char *retQos = connection_->connect(connection_, qos->toXml().c_str(),
myUpdate, &socketException);
if (*socketException.errorCode != 0) {
throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
}
ConnectQosFactory factory(global_);
ConnectReturnQosRef connectReturnQos = factory.readObject(retQos);
xmlBlasterFree(retQos);
secretSessionId_ = connectReturnQos->getSecretSessionId();
return connectReturnQos;
} catch_MACRO("::connect", false)
}
bool SocketDriver::disconnect(const DisconnectQos& qos)
{
if (log_.call()) log_.call(ME, "disconnect()");
if (connection_ == 0) return false;
::ExceptionStruct socketException;
Lock lock(mutex_);
try {
bool ret = connection_->disconnect(connection_, qos.toXml().c_str(), &socketException);
if (*socketException.errorCode != 0) {
throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
}
return ret;
} catch_MACRO("::disconnect", false)
return true;
}
string SocketDriver::getProtocol()
{
return Constants::SOCKET; // "SOCKET";
}
/** Called when going to POLLING mode */
bool SocketDriver::shutdown()
{
if (log_.call()) log_.call(ME, "shutdown()");
Lock lock(mutex_);
if (connection_ == 0) return false;
freeResources(true);
return true;
}
string SocketDriver::getLoginName()
{
return loginName_;
}
bool SocketDriver::isLoggedIn()
{
Lock lock(mutex_);
return connection_ != 0 && connection_->isConnected(connection_);
}
string SocketDriver::ping(const string& qos)
{
::ExceptionStruct socketException;
Lock lock(mutex_);
if (connection_ == 0) {
throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, "", ME + ".ping", "en",
global_.getVersion() + " " + global_.getBuildTimestamp() + " The connection_ handle is NULL");
}
try {
char *retQosP = connection_->ping(connection_, qos.c_str(), &socketException);
if (retQosP == 0 || *socketException.errorCode != 0) {
throw socketException; // Is converted to util::XmlBlasterException in catch_MACRO
}
string retQos(retQosP);
xmlBlasterFree(retQosP);
return retQos;
} catch_MACRO("::ping", false)
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -