📄 aqqueue.cpp
字号:
#include "stdafx.h"
#define __OCICPP_INTERNAL_USE_
#include <stdio.h>
#include "AQMessage.h"
#include "AQQueue.h"
#include "OraError.h"
/*! \class OCICPP::AQQueue
\brief Advanced Queue representation
This class represents an advanced queue from the oracle server. This feature
is available from Oracle 8 on within the Enterprise Edition. The current
implementation does not provide structured payload, so the extensions
available with the Object-option are not supported, currently.
*/
/*!
Create an uninitialized advanced queue.
*/
OCICPP::AQQueue::AQQueue()
{
envhp = 0;
svchp = 0;
errhp = 0;
payload_tdo = 0;
Navigation = OCI_DEQ_NEXT_MSG;
DequeueMode = OCI_DEQ_REMOVE;
Wait = OCI_DEQ_WAIT_FOREVER;
Initialized = 0;
}
void OCICPP::AQQueue::init(std::string& queue_name, OCIEnv *env, OCISvcCtx *svc, OCIError *err)
{
this->queue_name.assign(queue_name);
envhp = env;
svchp = svc;
errhp = err;
// get the payload_tdo
CHECKERR(errhp, OCITypeByName(
envhp, errhp, svchp,
(text *) "SYS", strlen("SYS"),
(text *) "RAW", strlen("RAW"),
(text *) 0, 0,
OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &payload_tdo));
Initialized = 1;
}
void OCICPP::AQQueue::setNavigation(int Navigation)
{
this->Navigation = Navigation;
}
void OCICPP::AQQueue::setDequeueMode(int DequeueMode)
{
this->DequeueMode = DequeueMode;
}
void OCICPP::AQQueue::setWait(int Wait)
{
this->Wait = Wait;
}
/*!
Enqueue a message \a msg in the queue.
*/
string OCICPP::AQQueue::Enqueue(OCICPP::AQMessage &msg)
{
OCIRaw *payload = (OCIRaw *) 0;
void *payload_ptr = NULL;
size_t payload_sz;
OCIRaw *msg_id = (OCIRaw *) 0;
string str_msg_id;
OCIAQMsgProperties *msg_prop = (OCIAQMsgProperties *) 0;
char corr_id[129];
char ex_queue[129];
sb4 msgdelay;
sb4 msgexp;
sb4 msgpriority;
if(!Initialized)
throw(OraError("AQQueue not initialized. Cannot Enqueue.", OCICPPERROR));
try
{
// set up the message properties
OCIDescriptorAlloc(envhp, (dvoid **)&msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0);
// set a correlation identifier?
if(msg.getCorrelationIdentifier().size() > 0)
{
sprintf(corr_id, "%.128s", msg.getCorrelationIdentifier().c_str());
CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &corr_id, strlen(corr_id), OCI_ATTR_CORRELATION, errhp));
}
// set a delay?
if(msg.getDelay() > 0)
{
msgdelay = msg.getDelay();
CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &msgdelay, sizeof(msgdelay), OCI_ATTR_DELAY, errhp));
}
// set an exception queue?
if(msg.getExceptionQueue().size() > 0)
{
sprintf(ex_queue, "%.128s", msg.getExceptionQueue().c_str());
CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &ex_queue, strlen(ex_queue), OCI_ATTR_EXCEPTION_QUEUE, errhp));
}
// set an expiration?
if(msg.getExpiration() > 0)
{
msgexp = msg.getExpiration();
CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &msgexp, sizeof(msgexp), OCI_ATTR_EXPIRATION, errhp));
}
// set an expiration?
if(msg.getPriority() > 0)
{
msgpriority = msg.getPriority();
CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &msgpriority, sizeof(msgpriority), OCI_ATTR_EXPIRATION, errhp));
}
// prepare the actual payload...
msg.getPayload(&payload_ptr, &payload_sz);
CHECKERR(errhp, OCIRawAssignBytes(envhp, errhp, (ub1 *) payload_ptr, payload_sz, &payload));
// enqueue the message...
CHECKERR(errhp, OCIAQEnq(
svchp, // service context
errhp, // error handle
(text *) queue_name.c_str(), // duh: queue name
(OCIAQEnqOptions *) NULL, // currently not supporting Enqueue options, maybe someday...
msg_prop, // message properties
payload_tdo, // Type Descriptor Object
(dvoid **) &payload, // The actual payload
(dvoid **) NULL, // payload_ind
&msg_id, // receivies the message id
OCI_DEFAULT) // flags
);
// Hopefully, the message id will be a string, and not binary data.
// If it turns out to be binary, this will have to change...
str_msg_id.assign((char *) OCIRawPtr(envhp, msg_id), OCIRawSize(envhp, msg_id));
// free the message properties descriptor...
OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES);
// free the payload pointer
free(payload_ptr);
}
catch(OraError err)
{
OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES);
if(payload_ptr)
free(payload_ptr);
throw;
}
return(str_msg_id);
}
/*!
Dequeues the next message matching the correlation identifier \a req_corr_id.
*/
void OCICPP::AQQueue::DequeueByCorrelationIdentifier(OCICPP::AQMessage &msg, std::string req_corr_id)
{
Dequeue(msg, "", req_corr_id);
}
/*!
Dequeues the next message matching the message id \a req_msg_id.
*/
void OCICPP::AQQueue::DequeueByMessageID(OCICPP::AQMessage &msg, std::string req_msg_id)
{
Dequeue(msg, req_msg_id, "");
}
/*!
Dequeues the next message matching the message id \a req_msg_id. and the
correlation identifier \a req_corr_id.
*/
void OCICPP::AQQueue::Dequeue(OCICPP::AQMessage &msg, std::string req_msg_id, std::string req_corr_id)
{
OCIAQDeqOptions *deq_opt = (OCIAQDeqOptions *) 0;
OCIAQMsgProperties *msg_prop = (OCIAQMsgProperties *) 0;
OCIRaw *raw_req_msg_id = (OCIRaw *) 0;
OCIRaw *payload = (OCIRaw *) 0;
void *payload_ptr = NULL;
size_t payload_sz;
string str_msg_id;
OCIRaw *msg_id = (OCIRaw *) 0;
char corr_id[129];
text *corr_id_out = NULL;
ub4 corr_id_out_sz = 0;
string str_corr_id_out;
sb4 priority_out = 0;
if(!Initialized)
throw(OraError("AQQueue not initialized. Cannot Dequeue.", OCICPPERROR));
try
{
// allocate the dequeue options...
OCIDescriptorAlloc(envhp, (dvoid **)&deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **) 0);
// allocate the message properties...
OCIDescriptorAlloc(envhp, (dvoid **)&msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0);
// correlation identifier
if(req_corr_id != "")
{
sprintf(corr_id, "%.128s", req_corr_id.c_str());
CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &corr_id, strlen(corr_id), OCI_ATTR_CORRELATION, errhp));
}
// is the user requesting a particular message id?
if(req_msg_id != "")
{
CHECKERR(errhp, OCIRawAssignBytes(envhp, errhp, (ub1 *) req_msg_id.c_str(), req_msg_id.size(), &raw_req_msg_id));
CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &raw_req_msg_id, OCIRawSize(envhp, raw_req_msg_id), OCI_ATTR_DEQ_MSGID, errhp));
}
// dequeue mode
CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &DequeueMode, sizeof(DequeueMode), OCI_ATTR_DEQ_MODE, errhp));
// navigation
CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &Navigation, sizeof(Navigation), OCI_ATTR_NAVIGATION, errhp));
// wait
CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &Wait, sizeof(Wait), OCI_ATTR_WAIT, errhp));
// dequeue the message...
CHECKERR(errhp, OCIAQDeq(
svchp, errhp,
(ub1 *) queue_name.c_str(),
deq_opt,
msg_prop,
payload_tdo,
(dvoid **) &payload,
(dvoid **) NULL,
&msg_id, OCI_DEFAULT)
);
// todo: what if we timed out without getting a message???
// get the payload
if(payload != NULL)
{
payload_ptr = OCIRawPtr(envhp, payload);
payload_sz = OCIRawSize(envhp, payload);
msg.setPayload(payload_ptr, payload_sz);
}
// get the message id
if(msg_id != NULL)
{
str_msg_id.assign((char *) OCIRawPtr(envhp, msg_id), OCIRawSize(envhp, msg_id));
msg.setMessageID(str_msg_id);
}
// get the message's correlation identifier
CHECKERR(errhp, OCIAttrGet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &corr_id_out, &corr_id_out_sz, OCI_ATTR_CORRELATION, errhp));
if(corr_id_out)
{
str_corr_id_out.assign((char *)corr_id_out, corr_id_out_sz);
msg.setCorrelationIdentifier(str_corr_id_out);
}
// get the message's priority
CHECKERR(errhp, OCIAttrGet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &priority_out, NULL, OCI_ATTR_PRIORITY, errhp));
msg.setPriority(priority_out);
OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES);
OCIDescriptorFree(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS);
}
catch(OraError err)
{
OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES);
OCIDescriptorFree(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS);
throw;
}
return;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -