📄 aqqueue.cpp
字号:
#define __OCICPP_INTERNAL_USE_
#include <stdio.h>
#include "AQMessage.h"
#include "AQQueue.h"#include "OraError.h"
/*! \class OCICPP::AQQueue \brief Advanced Queue representation This class represents an advanced queue from the oracle server. This feature is available from Oracle 8 on within the Enterprise Edition. The current implementation does not provide structured payload, so the extensions available with the Object-option are not supported, currently.*//*! Create an uninitialized advanced queue. */OCICPP::AQQueue::AQQueue(){ envhp = 0; svchp = 0; errhp = 0; payload_tdo = 0; Navigation = OCI_DEQ_NEXT_MSG; DequeueMode = OCI_DEQ_REMOVE; Wait = OCI_DEQ_WAIT_FOREVER; Initialized = 0;}void OCICPP::AQQueue::init(std::string& queue_name, OCIEnv *env, OCISvcCtx *svc, OCIError *err){ this->queue_name.assign(queue_name); envhp = env; svchp = svc; errhp = err; // get the payload_tdo CHECKERR(errhp, OCITypeByName( envhp, errhp, svchp, (text *) "SYS", strlen("SYS"), (text *) "RAW", strlen("RAW"), (text *) 0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &payload_tdo)); Initialized = 1;}void OCICPP::AQQueue::setNavigation(int Navigation){ this->Navigation = Navigation;}void OCICPP::AQQueue::setDequeueMode(int DequeueMode){ this->DequeueMode = DequeueMode;}void OCICPP::AQQueue::setWait(int Wait){ this->Wait = Wait;}/*! Enqueue a message \a msg in the queue. */string OCICPP::AQQueue::Enqueue(OCICPP::AQMessage &msg){ OCIRaw *payload = (OCIRaw *) 0; void *payload_ptr = NULL; size_t payload_sz; OCIRaw *msg_id = (OCIRaw *) 0; string str_msg_id; OCIAQMsgProperties *msg_prop = (OCIAQMsgProperties *) 0; char corr_id[129]; char ex_queue[129]; sb4 msgdelay; sb4 msgexp; sb4 msgpriority; if(!Initialized) throw(OraError("AQQueue not initialized. Cannot Enqueue.", OCICPPERROR)); try { // set up the message properties OCIDescriptorAlloc(envhp, (dvoid **)&msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0); // set a correlation identifier? if(msg.getCorrelationIdentifier().size() > 0) { sprintf(corr_id, "%.128s", msg.getCorrelationIdentifier().c_str()); CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &corr_id, strlen(corr_id), OCI_ATTR_CORRELATION, errhp)); } // set a delay? if(msg.getDelay() > 0) { msgdelay = msg.getDelay(); CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &msgdelay, sizeof(msgdelay), OCI_ATTR_DELAY, errhp)); } // set an exception queue? if(msg.getExceptionQueue().size() > 0) { sprintf(ex_queue, "%.128s", msg.getExceptionQueue().c_str()); CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &ex_queue, strlen(ex_queue), OCI_ATTR_EXCEPTION_QUEUE, errhp)); } // set an expiration? if(msg.getExpiration() > 0) { msgexp = msg.getExpiration(); CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &msgexp, sizeof(msgexp), OCI_ATTR_EXPIRATION, errhp)); } // set an expiration? if(msg.getPriority() > 0) { msgpriority = msg.getPriority(); CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &msgpriority, sizeof(msgpriority), OCI_ATTR_EXPIRATION, errhp)); } // prepare the actual payload... msg.getPayload(&payload_ptr, &payload_sz); CHECKERR(errhp, OCIRawAssignBytes(envhp, errhp, (ub1 *) payload_ptr, payload_sz, &payload)); // enqueue the message... CHECKERR(errhp, OCIAQEnq( svchp, // service context errhp, // error handle (text *) queue_name.c_str(), // duh: queue name (OCIAQEnqOptions *) NULL, // currently not supporting Enqueue options, maybe someday... msg_prop, // message properties payload_tdo, // Type Descriptor Object (dvoid **) &payload, // The actual payload (dvoid **) NULL, // payload_ind &msg_id, // receivies the message id OCI_DEFAULT) // flags ); // Hopefully, the message id will be a string, and not binary data. // If it turns out to be binary, this will have to change... str_msg_id.assign((char *) OCIRawPtr(envhp, msg_id), OCIRawSize(envhp, msg_id)); // free the message properties descriptor... OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES); // free the payload pointer free(payload_ptr); } catch(OraError err) { OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES); if(payload_ptr) free(payload_ptr); throw; } return(str_msg_id);}/*! Dequeues the next message matching the correlation identifier \a req_corr_id. */void OCICPP::AQQueue::DequeueByCorrelationIdentifier(OCICPP::AQMessage &msg, std::string req_corr_id){ Dequeue(msg, "", req_corr_id);}/*! Dequeues the next message matching the message id \a req_msg_id. */void OCICPP::AQQueue::DequeueByMessageID(OCICPP::AQMessage &msg, std::string req_msg_id){ Dequeue(msg, req_msg_id, "");}/*! Dequeues the next message matching the message id \a req_msg_id. and the correlation identifier \a req_corr_id. */void OCICPP::AQQueue::Dequeue(OCICPP::AQMessage &msg, std::string req_msg_id, std::string req_corr_id){ OCIAQDeqOptions *deq_opt = (OCIAQDeqOptions *) 0; OCIAQMsgProperties *msg_prop = (OCIAQMsgProperties *) 0; OCIRaw *raw_req_msg_id = (OCIRaw *) 0; OCIRaw *payload = (OCIRaw *) 0; void *payload_ptr = NULL; size_t payload_sz; string str_msg_id; OCIRaw *msg_id = (OCIRaw *) 0; char corr_id[129]; text *corr_id_out = NULL; ub4 corr_id_out_sz = 0; string str_corr_id_out; sb4 priority_out = 0; if(!Initialized) throw(OraError("AQQueue not initialized. Cannot Dequeue.", OCICPPERROR)); try { // allocate the dequeue options... OCIDescriptorAlloc(envhp, (dvoid **)&deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **) 0); // allocate the message properties... OCIDescriptorAlloc(envhp, (dvoid **)&msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0); // correlation identifier if(req_corr_id != "") { sprintf(corr_id, "%.128s", req_corr_id.c_str()); CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &corr_id, strlen(corr_id), OCI_ATTR_CORRELATION, errhp)); } // is the user requesting a particular message id? if(req_msg_id != "") { CHECKERR(errhp, OCIRawAssignBytes(envhp, errhp, (ub1 *) req_msg_id.c_str(), req_msg_id.size(), &raw_req_msg_id)); CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &raw_req_msg_id, OCIRawSize(envhp, raw_req_msg_id), OCI_ATTR_DEQ_MSGID, errhp)); } // dequeue mode CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &DequeueMode, sizeof(DequeueMode), OCI_ATTR_DEQ_MODE, errhp)); // navigation CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &Navigation, sizeof(Navigation), OCI_ATTR_NAVIGATION, errhp)); // wait CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &Wait, sizeof(Wait), OCI_ATTR_WAIT, errhp)); // dequeue the message... CHECKERR(errhp, OCIAQDeq( svchp, errhp, (ub1 *) queue_name.c_str(), deq_opt, msg_prop, payload_tdo, (dvoid **) &payload, (dvoid **) NULL, &msg_id, OCI_DEFAULT) ); // todo: what if we timed out without getting a message??? // get the payload if(payload != NULL) { payload_ptr = OCIRawPtr(envhp, payload); payload_sz = OCIRawSize(envhp, payload); msg.setPayload(payload_ptr, payload_sz); } // get the message id if(msg_id != NULL) { str_msg_id.assign((char *) OCIRawPtr(envhp, msg_id), OCIRawSize(envhp, msg_id)); msg.setMessageID(str_msg_id); } // get the message's correlation identifier CHECKERR(errhp, OCIAttrGet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &corr_id_out, &corr_id_out_sz, OCI_ATTR_CORRELATION, errhp)); if(corr_id_out) { str_corr_id_out.assign((char *)corr_id_out, corr_id_out_sz); msg.setCorrelationIdentifier(str_corr_id_out); } // get the message's priority CHECKERR(errhp, OCIAttrGet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &priority_out, NULL, OCI_ATTR_PRIORITY, errhp)); msg.setPriority(priority_out); OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES); OCIDescriptorFree(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS); } catch(OraError err) { OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES); OCIDescriptorFree(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS); throw; } return;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -