📄 sqlite3queue.c.todo
字号:
if (checkArgs(queueP, "fillCache", true, exception) == false ) return true;
dbInfo = getDbInfo(queueP);
SNPRINTF(queryString, LEN512,
"SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'",
dbInfo->prop.tablePrefix, dbInfo->prop.queueName);
stateOk = compilePreparedQuery(queueP, "fillCache",
&dbInfo->pVm_fillCache, queryString, exception);
if (stateOk) { /* start the query, calls parseCacheInfo() */
int32_t currIndex = getResultRows(queueP, "fillCache",
dbInfo->pVm_fillCache, parseCacheInfo,
0, false, exception);
stateOk = currIndex > 0;
}
LOG __FILE__, "fillCache() numOfEntries=%d numOfBytes=%s", dbInfo->numOfEntries, int64ToStr(int64Str, dbInfo->numOfBytes));
return stateOk;
}
static bool persistentQueueEmpty(I_Queue *queueP)
{
return getNumOfEntries(queueP) <= 0;
}
static int32_t getNumOfEntries(I_Queue *queueP)
{
DbInfo *dbInfo;
bool stateOk = true;
ExceptionStruct exception;
if (checkArgs(queueP, "getNumOfEntries", false, &exception) == false ) return -1;
dbInfo = getDbInfo(queueP);
if (dbInfo->numOfEntries == -1) {
stateOk = fillCache(queueP, &exception);
}
return (stateOk) ? (int32_t)dbInfo->numOfEntries : -1;
}
static int32_t getMaxNumOfEntries(I_Queue *queueP)
{
DbInfo *dbInfo;
ExceptionStruct exception;
if (checkArgs(queueP, "getMaxNumOfEntries", false, &exception) == false ) return -1;
dbInfo = getDbInfo(queueP);
return dbInfo->prop.maxNumOfEntries;
}
static int64_t getNumOfBytes(I_Queue *queueP)
{
DbInfo *dbInfo;
ExceptionStruct exception;
bool stateOk = true;
if (checkArgs(queueP, "getNumOfBytes", false, &exception) == false ) return -1;
dbInfo = getDbInfo(queueP);
if (dbInfo->numOfBytes == -1) {
stateOk = fillCache(queueP, &exception);
}
return (stateOk) ? dbInfo->numOfBytes : -1;
}
static int64_t getMaxNumOfBytes(I_Queue *queueP)
{
DbInfo *dbInfo;
ExceptionStruct exception;
if (checkArgs(queueP, "getMaxNumOfBytes", false, &exception) == false ) return -1;
dbInfo = getDbInfo(queueP);
return dbInfo->prop.maxNumOfBytes;
}
/**
* Shutdown without destroying any entry.
* Clears all open DB resources.
*/
static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception)
{
I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
if (checkArgs(queueP, "shutdown", false, exception) == false ) return;
shutdownInternal(queuePP, exception);
freeQueue(queuePP);
}
/**
* Shutdown used internally without calling freeQueue().
*/
static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception)
{
I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
if (checkArgs(queueP, "shutdown", false, exception) == false ) return;
{
DbInfo *dbInfo = getDbInfo(queueP);
queueP->isInitialized = false;
if(dbInfo) {
if (dbInfo->pVm_put) {
sqlite3_finalize(dbInfo->pVm_put);
dbInfo->pVm_put = 0;
}
if (dbInfo->pVm_peekWithSamePriority) {
sqlite3_finalize(dbInfo->pVm_peekWithSamePriority);
dbInfo->pVm_peekWithSamePriority = 0;
}
if (dbInfo->pVm_fillCache) {
sqlite3_finalize(dbInfo->pVm_fillCache);
dbInfo->pVm_fillCache = 0;
}
if (dbInfo->db) {
sqlite3_close(dbInfo->db);
dbInfo->db = 0;
}
LOG __FILE__, "shutdown() done");
}
}
}
/**
* Frees everything inside QueueEntryArr and the struct QueueEntryArr itself
* @param queueEntryArr The struct to free, passing NULL is OK
*/
Dll_Export void freeQueueEntryArr(QueueEntryArr *queueEntryArr)
{
if (queueEntryArr == (QueueEntryArr *)0) return;
freeQueueEntryArrInternal(queueEntryArr);
free(queueEntryArr);
}
/**
* Frees everything inside QueueEntryArr but NOT the struct QueueEntryArr itself
* @param queueEntryArr The struct internals to free, passing NULL is OK
*/
Dll_Export void freeQueueEntryArrInternal(QueueEntryArr *queueEntryArr)
{
size_t i;
if (queueEntryArr == (QueueEntryArr *)0) return;
for (i=0; i<queueEntryArr->len; i++) {
freeQueueEntryData(&queueEntryArr->queueEntryArr[i]);
}
free(queueEntryArr->queueEntryArr);
queueEntryArr->len = 0;
}
/**
* Does not free the queueEntry itself
*/
static void freeQueueEntryData(QueueEntry *queueEntry)
{
if (queueEntry == (QueueEntry *)0) return;
if (queueEntry->embeddedBlob.data != 0) {
free((char *)queueEntry->embeddedBlob.data);
queueEntry->embeddedBlob.data = 0;
}
queueEntry->embeddedBlob.dataLen = 0;
}
/**
* Frees the internal blob and the queueEntry itself.
* @param queueEntry Its memory is freed, it is not usable anymore after this call
*/
Dll_Export void freeQueueEntry(QueueEntry *queueEntry)
{
if (queueEntry == (QueueEntry *)0) return;
freeQueueEntryData(queueEntry);
free(queueEntry);
}
/**
* NOTE: You need to free the returned pointer with xmlBlasterFree() (which calls free())!
*
* @param queueEntry The data to put to the queue
* @param maxContentDumpLen for -1 get the complete content, else limit the
* content to the given number of bytes
* @return A ASCII XML formatted entry or NULL if out of memory
*/
Dll_Export char *queueEntryToXml(QueueEntry *queueEntry, int maxContentDumpLen)
{
if (queueEntry == (QueueEntry *)0) return 0;
{
char *contentStr = strFromBlobAlloc(queueEntry->embeddedBlob.data, queueEntry->embeddedBlob.dataLen);
const size_t blobLen = (maxContentDumpLen >= 0) ? maxContentDumpLen : queueEntry->embeddedBlob.dataLen;
const size_t len = 200 + QUEUE_ENTRY_EMBEDDEDTYPE_LEN + blobLen;
char *xml = (char *)calloc(len, sizeof(char));
if (xml == 0) {
free(contentStr);
return 0;
}
if (maxContentDumpLen == 0)
*contentStr = 0;
else if (maxContentDumpLen > 0 && queueEntry->embeddedBlob.dataLen > 5 &&
(size_t)maxContentDumpLen < (queueEntry->embeddedBlob.dataLen-5))
strcpy(contentStr+maxContentDumpLen, " ...");
SNPRINTF(xml, len, "\n <QueueEntry id='%s' priority='%hd' persistent='%s' type='%s'>"
"\n <content size='%lu'><![CDATA[%s]]></content>"
"\n <QueueEntry>",
int64ToStr(int64Str, queueEntry->uniqueId), queueEntry->priority,
queueEntry->isPersistent?"true":"false",
queueEntry->embeddedType,
(unsigned long)queueEntry->embeddedBlob.dataLen, contentStr);
free(contentStr);
return xml;
}
}
Dll_Export void freeEntryDump(char *entryDump)
{
if (entryDump) free(entryDump);
}
/**
* Checks the given arguments to be valid.
* @param queueP The queue instance
* @param methodName For logging
* @param checkIsConnected If true does check the connection state as well
* @param exception Transporting errors
* @return false if the parameters are not usable,
* in this case 'exception' is filled with detail informations
*/
static bool checkArgs(I_Queue *queueP, const char *methodName,
bool checkIsConnected, ExceptionStruct *exception)
{
if (queueP == 0) {
if (exception == 0) {
printf("[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to %s()\n",
__FILE__, __LINE__, methodName);
}
else {
strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] Please provide a valid I_Queue pointer to %.16s()",
__FILE__, __LINE__, methodName);
LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
}
return false;
}
if (exception == 0) {
LOG __FILE__, "[%s:%d] Please provide valid exception pointer to %s()", __FILE__, __LINE__, methodName);
return false;
}
if (checkIsConnected) {
if (queueP->privateObject==0 ||
((DbInfo *)(queueP->privateObject))->db==0 ||
!queueP->isInitialized) {
strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
"[%.100s:%d] Not connected to database, %s() failed",
__FILE__, __LINE__, methodName);
LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
return false;
}
}
initializeExceptionStruct(exception);
LOG __FILE__, "%s() entering ...", methodName);
return true;
}
/*=================== TESTCODE =======================*/
# ifdef QUEUE_MAIN
#include <stdio.h>
static void testRun(int argc, char **argv) {
ExceptionStruct exception;
QueueEntryArr *entries = 0;
QueueProperties queueProperties;
I_Queue *queueP = 0;
memset(&queueProperties, 0, sizeof(QueueProperties));
strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX);
strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX);
strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX);
queueProperties.maxNumOfEntries = 10000000L;
queueProperties.maxNumOfBytes = 1000000000LL;
queueProperties.logFp = xmlBlasterDefaultLogging;
queueProperties.logLevel = XMLBLASTER_LOG_TRACE;
queueProperties.userObject = 0;
queueP = createQueue(&queueProperties, &exception);
/* DbInfo *dbInfo = (DbInfo *)queueP->privateObject; */
if (argc || argv) {} /* to avoid compiler warning */
printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
{
int64_t idArr[] = { 1081492136826000000ll, 1081492136856000000ll, 1081492136876000000ll };
int16_t prioArr[] = { 5 , 1 , 5 };
char *data[] = { "Hello" , " World" , "!!!" };
size_t i;
for (i=0; i<sizeof(idArr)/sizeof(int64_t); i++) {
QueueEntry queueEntry;
memset(&queueEntry, 0, sizeof(QueueEntry));
queueEntry.priority = prioArr[i];
queueEntry.isPersistent = true;
queueEntry.uniqueId = idArr[i];
strncpy0(queueEntry.embeddedType, "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;
queueEntry.embeddedBlob.data = data[i];
queueEntry.embeddedBlob.dataLen = strlen(queueEntry.embeddedBlob.data);
queueP->put(queueP, &queueEntry, &exception);
if (*exception.errorCode != 0) {
LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
}
}
}
entries = queueP->peekWithSamePriority(queueP, -1, 6, &exception);
if (*exception.errorCode != 0) {
LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
}
if (entries != 0) {
size_t i;
printf("testRun after peekWithSamePriority() dump %lu entries:\n", (unsigned long)entries->len);
for (i=0; i<entries->len; i++) {
QueueEntry *queueEntry = &entries->queueEntryArr[i];
char *dump = queueEntryToXml(queueEntry, 200);
printf("%s\n", dump);
free(dump);
}
}
printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
queueP->randomRemove(queueP, entries, &exception);
if (*exception.errorCode != 0) {
LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
}
freeQueueEntryArr(entries);
printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
queueP->clear(queueP, &exception);
printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
queueP->shutdown(&queueP, &exception);
}
int main(int argc, char **argv) {
int i;
for (i=0; i<1; i++) {
testRun(argc, argv);
}
return 0;
}
#endif /*QUEUE_MAIN*/
/*=================== TESTCODE =======================*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -