📄 socketdriver.cpp
字号:
/*------------------------------------------------------------------------------
Name: SocketDriver.cpp
Project: xmlBlaster.org
Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
Comment: The client driver for the socket protocol
------------------------------------------------------------------------------*/
#include <client/protocol/socket/SocketDriver.h>
#include <util/ErrorCode.h>
#include <util/XmlBlasterException.h>
#include <util/Global.h>
#include <util/lexical_cast.h>
#include <XmlBlasterAccessUnparsed.h> // The C SOCKET client library
#include <util/qos/ConnectQosFactory.h>
#include <util/Properties.h>
#include <string>
#include <stdarg.h> // va_start
#include <stdio.h> // vsnprintf for g++ 2.9x only
static void myLogger(void *logUserP,
XMLBLASTER_LOG_LEVEL currLevel,
XMLBLASTER_LOG_LEVEL level,
const char *location, const char *fmt, ...);
//static ::XmlBlasterNumReadFunc callbackProgressListener; // what's wrong with this?
static void callbackProgressListener(void *userP, const size_t currBytesRead, const size_t nbytes);
/**
* Customized logging output is handled by this method.
* <p>
* We register this function with
* </p>
* <pre>
* xa->log = myLogger;
* </pre>
* @param currLevel The actual log level of the client
* @param level The level of this log entry
* @param location A string describing the code place
* @param fmt The formatting string
* @param ... Other variables to log, corresponds to 'fmt'
* @see xmlBlaster/src/c/msgUtil.c: xmlBlasterDefaultLogging() is the default
* implementation
*/
static void myLogger(void *logUserP,
XMLBLASTER_LOG_LEVEL currLevel,
XMLBLASTER_LOG_LEVEL level,
const char *location, const char *fmt, ...)
{
/* Guess we need no more than 200 bytes. */
int n, size = 200;
char *p = 0;
va_list ap;
org::xmlBlaster::client::protocol::socket::SocketDriver *sd =
(org::xmlBlaster::client::protocol::socket::SocketDriver *)logUserP;
org::xmlBlaster::util::I_Log& log = sd->getLog();
if (level > currLevel) { /* XMLBLASTER_LOG_ERROR, XMLBLASTER_LOG_WARN, XMLBLASTER_LOG_INFO, XMLBLASTER_LOG_TRACE */
return;
}
if ((p = (char *)malloc (size)) == NULL)
return;
for (;;) {
/* Try to print in the allocated space. */
va_start(ap, fmt);
n = VSNPRINTF(p, size, fmt, ap); /* UNIX: vsnprintf(), WINDOWS: _vsnprintf() */
va_end(ap);
/* If that worked, print the string to console. */
if (n > -1 && n < size) {
if (level == XMLBLASTER_LOG_INFO)
log.info(location, p);
else if (level == XMLBLASTER_LOG_WARN)
log.warn(location, p);
else if (level == XMLBLASTER_LOG_ERROR)
log.error(location, p);
else
log.trace(location, p);
free(p);
return;
}
/* Else try again with more space. */
if (n > -1) /* glibc 2.1 */
size = n+1; /* precisely what is needed */
else /* glibc 2.0 */
size *= 2; /* twice the old size */
if ((p = (char *)realloc (p, size)) == NULL) {
return;
}
}
}
/**
* Access the read socket progress.
* You need to register this function pointer if you want to see the progress of huge messages
* on slow connections.
*/
static void callbackProgressListener(void *userP, const size_t currBytesRead, const size_t nbytes) {
org::xmlBlaster::client::protocol::socket::SocketDriver *sd =
(org::xmlBlaster::client::protocol::socket::SocketDriver *)userP;
//org::xmlBlaster::util::I_Log& log = sd->getLog();
//if (log.trace()) log.trace("SocketDriver", "Update data progress currBytesRead=" +
// org::xmlBlaster::util::lexical_cast<std::string>(currBytesRead) +
// " nbytes=" + org::xmlBlaster::util::lexical_cast<std::string>(nbytes));
if (sd != 0 && sd->progressListener_ != 0) {
sd->progressListener_->progress("", currBytesRead, nbytes);
}
}
namespace org {
namespace xmlBlaster {
namespace client {
namespace protocol {
namespace socket {
using namespace std;
using namespace org::xmlBlaster::util;
using namespace org::xmlBlaster::util::qos;
using namespace org::xmlBlaster::util::key;
using namespace org::xmlBlaster::util::thread;
using namespace org::xmlBlaster::client::protocol;
using namespace org::xmlBlaster::client::key;
using namespace org::xmlBlaster::client::qos;
static XMLBLASTER_C_bool myUpdate(::MsgUnitArr *msgUnitArr, void *userData,
::ExceptionStruct *exception);
void SocketDriver::freeResources(bool deleteConnection)
{
if (log_.call()) log_.call(ME, "freeResources("+lexical_cast<std::string>(deleteConnection)+") connection_=" + ((connection_==0)?"0":lexical_cast<std::string>(connection_)));
if (deleteConnection && connection_ != 0) {
freeXmlBlasterAccessUnparsed(connection_);
connection_ = 0;
}
if (deleteConnection && argsStructP_ != 0) {
global_.freeArgs(*argsStructP_);
delete argsStructP_;
argsStructP_ = 0;
}
}
/*
Note on exception handling:
If we throw an exception, our master ConnectionsHandler.cpp will
catch it and to a shutdown() on us. This will cleanup the resources.
*/
#define catch_MACRO(methodName, deleteConnection) \
catch(const XmlBlasterException *ex) { \
freeResources(deleteConnection); \
throw ex; \
} \
catch(XmlBlasterException &ex) { \
freeResources(deleteConnection); \
ex.setLocation(ME + string(methodName)); \
throw ex; \
} \
catch(const ::ExceptionStruct *ex) { \
freeResources(deleteConnection); \
org::xmlBlaster::util::XmlBlasterException xx = convertFromSocketException(*ex); \
delete ex; \
throw xx; \
} \
catch(const ::ExceptionStruct &ex) { \
freeResources(deleteConnection); \
throw convertFromSocketException(ex); \
} \
catch(const exception &ex) { \
freeResources(deleteConnection); \
throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
loginName_, ME + string(methodName), "en", \
global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
string("type='exception', msg='") \
+ ex.what() + "'"); \
} \
catch(const string &ex) { \
freeResources(deleteConnection); \
throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
loginName_, ME + string(methodName), "en", \
global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
string("type='string', msg='") + ex + "'"); \
} \
catch(const char *ex) { \
freeResources(deleteConnection); \
throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
loginName_, ME + string(methodName), "en", \
global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
string("type='char*', msg='") + ex + "'"); \
} \
catch(int ex) { \
freeResources(deleteConnection); \
throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
loginName_, ME + string(methodName), "en", \
global_.getVersion() + " " + global_.getBuildTimestamp(), "", "", \
string("type='int', msg='") + lexical_cast<std::string>(ex) + "'"); \
} \
catch (...) { \
freeResources(deleteConnection); \
throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_UNKNOWN, \
loginName_, ME + string(methodName), "en", \
global_.getVersion() + " " + global_.getBuildTimestamp());\
}
SocketDriver::SocketDriver(const SocketDriver& socketDriver)
: mutex_(socketDriver.mutex_),
ME("SocketDriver"),
argsStructP_(0),
global_(socketDriver.global_),
log_(socketDriver.log_),
statusQosFactory_(socketDriver.global_),
msgKeyFactory_(socketDriver.global_),
msgQosFactory_(socketDriver.global_),
callbackClient_(0),
progressListener_(0)
{
// no instantiation of these since this should never be invoked (just to make it private)
connection_ = NULL;
argsStructP_ = new ArgsStruct_T;
//memset(argsStructP_, '\0', sizeof(ArgsStruct_T));
global_.fillArgs(*argsStructP_);
if (log_.call()) log_.call(ME, string("Copy constructor"));
}
SocketDriver& SocketDriver::operator =(const SocketDriver& /*socketDriver*/)
{
if (log_.call()) log_.call(ME, "operator=()");
return *this;
}
SocketDriver::SocketDriver(Global& global, const string instanceName)
: mutex_(),
instanceName_(instanceName),
connection_(NULL),
ME(string("SocketDriver-") + instanceName),
argsStructP_(0),
global_(global),
log_(global.getLog("org.xmlBlaster.client.protocol.socket")),
statusQosFactory_(global),
msgKeyFactory_(global),
msgQosFactory_(global),
callbackClient_(0),
progressListener_(0)
{
if (log_.call()) log_.call("SocketDriver", string("getInstance for ") + instanceName);
argsStructP_ = new ArgsStruct_T;
if (!global_.getProperty().propertyExists("logLevel")) {
if (log_.trace() || log_.call())
global_.getProperty().setProperty("logLevel", "TRACE");
else if (log_.dump())
global_.getProperty().setProperty("logLevel", "DUMP");
}
global_.fillArgs(*argsStructP_);
try {
connection_ = getXmlBlasterAccessUnparsed((int)argsStructP_->argc, argsStructP_->argv);
if (connection_) {
connection_->userObject = this; // Transports us to the myUpdate() method
connection_->log = myLogger; // Register our own logging function
connection_->logUserP = this; // Pass ourself to myLogger()
if (log_.dump()) {
log_.dump(ME, "C properties:");
::dumpProperties(connection_->props);
}
}
else {
log_.error(ME, "Allocation of C SOCKET library failed");
}
} catch_MACRO("::Constructor", true)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -