⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 aqqueue.cpp

📁 oracle引用库
💻 CPP
字号:
#define __OCICPP_INTERNAL_USE_
#include <stdio.h>
#include "AQMessage.h"
#include "AQQueue.h"#include "OraError.h"
/*! \class OCICPP::AQQueue  \brief Advanced Queue representation  This class represents an advanced queue from the oracle server. This feature  is available from Oracle 8 on within the Enterprise Edition. The current  implementation does not provide structured payload, so the extensions  available with the Object-option are not supported, currently.*//*!  Create an uninitialized advanced queue. */OCICPP::AQQueue::AQQueue(){	envhp = 0;	svchp = 0;	errhp = 0;	payload_tdo = 0;	Navigation = OCI_DEQ_NEXT_MSG;	DequeueMode = OCI_DEQ_REMOVE;	Wait = OCI_DEQ_WAIT_FOREVER;	Initialized = 0;}void	OCICPP::AQQueue::init(std::string& queue_name, OCIEnv *env, OCISvcCtx *svc, OCIError *err){	this->queue_name.assign(queue_name);	envhp = env;	svchp = svc;	errhp = err;	// get the payload_tdo	CHECKERR(errhp, OCITypeByName(		envhp, errhp, svchp,		(text *) "SYS", strlen("SYS"),		(text *) "RAW", strlen("RAW"),                (text *) 0, 0,		OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &payload_tdo));	Initialized = 1;}void	OCICPP::AQQueue::setNavigation(int Navigation){	this->Navigation = Navigation;}void	OCICPP::AQQueue::setDequeueMode(int DequeueMode){	this->DequeueMode = DequeueMode;}void	OCICPP::AQQueue::setWait(int Wait){	this->Wait = Wait;}/*!  Enqueue a message \a msg in the queue. */string OCICPP::AQQueue::Enqueue(OCICPP::AQMessage &msg){	OCIRaw			*payload = (OCIRaw *) 0;	void			*payload_ptr = NULL;	size_t			payload_sz;	OCIRaw			*msg_id = (OCIRaw *) 0;	string			str_msg_id;	OCIAQMsgProperties	*msg_prop = (OCIAQMsgProperties *) 0;	char			corr_id[129];	char			ex_queue[129];	sb4			msgdelay;	sb4			msgexp;	sb4			msgpriority;	if(!Initialized)		throw(OraError("AQQueue not initialized.  Cannot Enqueue.", OCICPPERROR));	try	{		// set up the message properties		OCIDescriptorAlloc(envhp, (dvoid **)&msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0);		// set a correlation identifier?		if(msg.getCorrelationIdentifier().size() > 0)		{			sprintf(corr_id, "%.128s", msg.getCorrelationIdentifier().c_str());			CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &corr_id, strlen(corr_id), OCI_ATTR_CORRELATION, errhp));		}		// set a delay?		if(msg.getDelay() > 0)		{			msgdelay = msg.getDelay();			CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &msgdelay, sizeof(msgdelay), OCI_ATTR_DELAY, errhp));		}		// set an exception queue?		if(msg.getExceptionQueue().size() > 0)		{			sprintf(ex_queue, "%.128s", msg.getExceptionQueue().c_str());			CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &ex_queue, strlen(ex_queue), OCI_ATTR_EXCEPTION_QUEUE, errhp));		}		// set an expiration?		if(msg.getExpiration() > 0)		{			msgexp = msg.getExpiration();			CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &msgexp, sizeof(msgexp), OCI_ATTR_EXPIRATION, errhp));		}		// set an expiration?		if(msg.getPriority() > 0)		{			msgpriority = msg.getPriority();			CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &msgpriority, sizeof(msgpriority), OCI_ATTR_EXPIRATION, errhp));		}		// prepare the actual payload...		msg.getPayload(&payload_ptr, &payload_sz);		CHECKERR(errhp, OCIRawAssignBytes(envhp, errhp, (ub1 *) payload_ptr, payload_sz, &payload));		// enqueue the message...		CHECKERR(errhp,	OCIAQEnq(			svchp,	                	// service context			errhp,				// error handle			(text *) queue_name.c_str(),	// duh: queue name			(OCIAQEnqOptions *) NULL,  	// currently not supporting Enqueue options, maybe someday...			msg_prop,			// message properties			payload_tdo,			// Type Descriptor Object			(dvoid **) &payload,           	// The actual payload			(dvoid **) NULL,		// payload_ind			&msg_id,			// receivies the message id			OCI_DEFAULT)			// flags		);		// Hopefully, the message id will be a string, and not binary data.		// If it turns out to be binary, this will have to change...		str_msg_id.assign((char *) OCIRawPtr(envhp, msg_id), OCIRawSize(envhp, msg_id));		// free the message properties descriptor...		OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES);		// free the payload pointer		free(payload_ptr);	}	catch(OraError err)	{		OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES);		if(payload_ptr)			free(payload_ptr);		throw;	}	return(str_msg_id);}/*!  Dequeues the next message matching the correlation identifier \a req_corr_id. */void OCICPP::AQQueue::DequeueByCorrelationIdentifier(OCICPP::AQMessage &msg, std::string req_corr_id){	Dequeue(msg, "", req_corr_id);}/*!  Dequeues the next message matching the message id \a req_msg_id. */void OCICPP::AQQueue::DequeueByMessageID(OCICPP::AQMessage &msg, std::string req_msg_id){	Dequeue(msg, req_msg_id, "");}/*!  Dequeues the next message matching the message id \a req_msg_id. and the  correlation identifier \a req_corr_id. */void OCICPP::AQQueue::Dequeue(OCICPP::AQMessage &msg, std::string req_msg_id, std::string req_corr_id){	OCIAQDeqOptions		*deq_opt = (OCIAQDeqOptions *) 0;	OCIAQMsgProperties	*msg_prop = (OCIAQMsgProperties *) 0;	OCIRaw			*raw_req_msg_id = (OCIRaw *) 0;	OCIRaw			*payload = (OCIRaw *) 0;	void			*payload_ptr = NULL;	size_t			payload_sz;	string			str_msg_id;	OCIRaw			*msg_id = (OCIRaw *) 0;	char			corr_id[129];	text			*corr_id_out = NULL;	ub4			corr_id_out_sz = 0;	string			str_corr_id_out;	sb4			priority_out = 0;	if(!Initialized)		throw(OraError("AQQueue not initialized.  Cannot Dequeue.", OCICPPERROR));	try	{		// allocate the dequeue options...		OCIDescriptorAlloc(envhp, (dvoid **)&deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **) 0);		// allocate the message properties...		OCIDescriptorAlloc(envhp, (dvoid **)&msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0);  		// correlation identifier		if(req_corr_id != "")		{			sprintf(corr_id, "%.128s", req_corr_id.c_str());			CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &corr_id, strlen(corr_id), OCI_ATTR_CORRELATION, errhp));		}		// is the user requesting a particular message id?		if(req_msg_id != "")		{			CHECKERR(errhp, OCIRawAssignBytes(envhp, errhp, (ub1 *) req_msg_id.c_str(), req_msg_id.size(), &raw_req_msg_id));			CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &raw_req_msg_id, OCIRawSize(envhp, raw_req_msg_id), OCI_ATTR_DEQ_MSGID, errhp));		}		// dequeue mode		CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &DequeueMode, sizeof(DequeueMode), OCI_ATTR_DEQ_MODE, errhp));		// navigation		CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &Navigation, sizeof(Navigation), OCI_ATTR_NAVIGATION, errhp));		// wait		CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &Wait, sizeof(Wait), OCI_ATTR_WAIT, errhp));		// dequeue the message...		CHECKERR(errhp,	OCIAQDeq(			svchp, errhp,			(ub1 *) queue_name.c_str(),			deq_opt,			msg_prop,			payload_tdo, 			(dvoid **) &payload,			(dvoid **) NULL, 			&msg_id, OCI_DEFAULT)		);		// todo: what if we timed out without getting a message???		// get the payload		if(payload != NULL)		{			payload_ptr = OCIRawPtr(envhp, payload);			payload_sz = OCIRawSize(envhp, payload);			msg.setPayload(payload_ptr, payload_sz);		}		// get the message id		if(msg_id != NULL)		{			str_msg_id.assign((char *) OCIRawPtr(envhp, msg_id), OCIRawSize(envhp, msg_id));			msg.setMessageID(str_msg_id);		}		// get the message's correlation identifier		CHECKERR(errhp, OCIAttrGet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &corr_id_out, &corr_id_out_sz, OCI_ATTR_CORRELATION, errhp));		if(corr_id_out)		{			str_corr_id_out.assign((char *)corr_id_out, corr_id_out_sz);			msg.setCorrelationIdentifier(str_corr_id_out);		}		// get the message's priority		CHECKERR(errhp, OCIAttrGet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &priority_out, NULL, OCI_ATTR_PRIORITY, errhp));		msg.setPriority(priority_out);		OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES);		OCIDescriptorFree(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS);	}	catch(OraError err)	{		OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES);		OCIDescriptorFree(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS);		throw;	}	return;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -