⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 aqqueue.cpp

📁 在动态库中实现异步导出大数据量的oracle数据
💻 CPP
字号:
#include "stdafx.h"

#define __OCICPP_INTERNAL_USE_

#include <stdio.h>
#include "AQMessage.h"
#include "AQQueue.h"
#include "OraError.h"

/*! \class OCICPP::AQQueue

  \brief Advanced Queue representation

  This class represents an advanced queue from the oracle server. This feature
  is available from Oracle 8 on within the Enterprise Edition. The current
  implementation does not provide structured payload, so the extensions
  available with the Object-option are not supported, currently.
*/

/*!
  Create an uninitialized advanced queue.
 */
OCICPP::AQQueue::AQQueue()
{
	envhp = 0;
	svchp = 0;
	errhp = 0;
	payload_tdo = 0;
	Navigation = OCI_DEQ_NEXT_MSG;
	DequeueMode = OCI_DEQ_REMOVE;
	Wait = OCI_DEQ_WAIT_FOREVER;
	Initialized = 0;
}

void	OCICPP::AQQueue::init(std::string& queue_name, OCIEnv *env, OCISvcCtx *svc, OCIError *err)
{
	this->queue_name.assign(queue_name);
	envhp = env;
	svchp = svc;
	errhp = err;

	// get the payload_tdo
	CHECKERR(errhp, OCITypeByName(
		envhp, errhp, svchp,
		(text *) "SYS", strlen("SYS"),
		(text *) "RAW", strlen("RAW"),
                (text *) 0, 0,
		OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &payload_tdo));

	Initialized = 1;
}

void	OCICPP::AQQueue::setNavigation(int Navigation)
{
	this->Navigation = Navigation;

}
void	OCICPP::AQQueue::setDequeueMode(int DequeueMode)
{
	this->DequeueMode = DequeueMode;

}
void	OCICPP::AQQueue::setWait(int Wait)
{
	this->Wait = Wait;
}

/*!
  Enqueue a message \a msg in the queue.
 */
string OCICPP::AQQueue::Enqueue(OCICPP::AQMessage &msg)
{
	OCIRaw			*payload = (OCIRaw *) 0;
	void			*payload_ptr = NULL;
	size_t			payload_sz;
	OCIRaw			*msg_id = (OCIRaw *) 0;
	string			str_msg_id;
	OCIAQMsgProperties	*msg_prop = (OCIAQMsgProperties *) 0;
	char			corr_id[129];
	char			ex_queue[129];
	sb4			msgdelay;
	sb4			msgexp;
	sb4			msgpriority;

	if(!Initialized)
		throw(OraError("AQQueue not initialized.  Cannot Enqueue.", OCICPPERROR));

	try
	{
		// set up the message properties
		OCIDescriptorAlloc(envhp, (dvoid **)&msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0);

		// set a correlation identifier?
		if(msg.getCorrelationIdentifier().size() > 0)
		{
			sprintf(corr_id, "%.128s", msg.getCorrelationIdentifier().c_str());
			CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &corr_id, strlen(corr_id), OCI_ATTR_CORRELATION, errhp));
		}

		// set a delay?
		if(msg.getDelay() > 0)
		{
			msgdelay = msg.getDelay();
			CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &msgdelay, sizeof(msgdelay), OCI_ATTR_DELAY, errhp));
		}

		// set an exception queue?
		if(msg.getExceptionQueue().size() > 0)
		{
			sprintf(ex_queue, "%.128s", msg.getExceptionQueue().c_str());
			CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &ex_queue, strlen(ex_queue), OCI_ATTR_EXCEPTION_QUEUE, errhp));
		}

		// set an expiration?
		if(msg.getExpiration() > 0)
		{
			msgexp = msg.getExpiration();
			CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &msgexp, sizeof(msgexp), OCI_ATTR_EXPIRATION, errhp));
		}

		// set an expiration?
		if(msg.getPriority() > 0)
		{
			msgpriority = msg.getPriority();
			CHECKERR(errhp, OCIAttrSet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &msgpriority, sizeof(msgpriority), OCI_ATTR_EXPIRATION, errhp));
		}

		// prepare the actual payload...
		msg.getPayload(&payload_ptr, &payload_sz);
		CHECKERR(errhp, OCIRawAssignBytes(envhp, errhp, (ub1 *) payload_ptr, payload_sz, &payload));

		// enqueue the message...
		CHECKERR(errhp,	OCIAQEnq(
			svchp,	                	// service context
			errhp,				// error handle
			(text *) queue_name.c_str(),	// duh: queue name
			(OCIAQEnqOptions *) NULL,  	// currently not supporting Enqueue options, maybe someday...
			msg_prop,			// message properties
			payload_tdo,			// Type Descriptor Object
			(dvoid **) &payload,           	// The actual payload
			(dvoid **) NULL,		// payload_ind
			&msg_id,			// receivies the message id
			OCI_DEFAULT)			// flags
		);

		// Hopefully, the message id will be a string, and not binary data.
		// If it turns out to be binary, this will have to change...
		str_msg_id.assign((char *) OCIRawPtr(envhp, msg_id), OCIRawSize(envhp, msg_id));

		// free the message properties descriptor...
		OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES);

		// free the payload pointer
		free(payload_ptr);
	}

	catch(OraError err)
	{
		OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES);
		if(payload_ptr)
			free(payload_ptr);
		throw;
	}

	return(str_msg_id);
}

/*!
  Dequeues the next message matching the correlation identifier \a req_corr_id.
 */
void OCICPP::AQQueue::DequeueByCorrelationIdentifier(OCICPP::AQMessage &msg, std::string req_corr_id)
{
	Dequeue(msg, "", req_corr_id);
}

/*!
  Dequeues the next message matching the message id \a req_msg_id.
 */
void OCICPP::AQQueue::DequeueByMessageID(OCICPP::AQMessage &msg, std::string req_msg_id)
{
	Dequeue(msg, req_msg_id, "");
}

/*!
  Dequeues the next message matching the message id \a req_msg_id. and the
  correlation identifier \a req_corr_id.
 */
void OCICPP::AQQueue::Dequeue(OCICPP::AQMessage &msg, std::string req_msg_id, std::string req_corr_id)
{
	OCIAQDeqOptions		*deq_opt = (OCIAQDeqOptions *) 0;
	OCIAQMsgProperties	*msg_prop = (OCIAQMsgProperties *) 0;
	OCIRaw			*raw_req_msg_id = (OCIRaw *) 0;
	OCIRaw			*payload = (OCIRaw *) 0;
	void			*payload_ptr = NULL;
	size_t			payload_sz;
	string			str_msg_id;
	OCIRaw			*msg_id = (OCIRaw *) 0;
	char			corr_id[129];
	text			*corr_id_out = NULL;
	ub4			corr_id_out_sz = 0;
	string			str_corr_id_out;
	sb4			priority_out = 0;

	if(!Initialized)
		throw(OraError("AQQueue not initialized.  Cannot Dequeue.", OCICPPERROR));

	try
	{
		// allocate the dequeue options...
		OCIDescriptorAlloc(envhp, (dvoid **)&deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **) 0);

		// allocate the message properties...
		OCIDescriptorAlloc(envhp, (dvoid **)&msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0);

  		// correlation identifier
		if(req_corr_id != "")
		{
			sprintf(corr_id, "%.128s", req_corr_id.c_str());
			CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &corr_id, strlen(corr_id), OCI_ATTR_CORRELATION, errhp));
		}

		// is the user requesting a particular message id?
		if(req_msg_id != "")
		{
			CHECKERR(errhp, OCIRawAssignBytes(envhp, errhp, (ub1 *) req_msg_id.c_str(), req_msg_id.size(), &raw_req_msg_id));
			CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &raw_req_msg_id, OCIRawSize(envhp, raw_req_msg_id), OCI_ATTR_DEQ_MSGID, errhp));
		}

		// dequeue mode
		CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &DequeueMode, sizeof(DequeueMode), OCI_ATTR_DEQ_MODE, errhp));

		// navigation
		CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &Navigation, sizeof(Navigation), OCI_ATTR_NAVIGATION, errhp));

		// wait
		CHECKERR(errhp, OCIAttrSet(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *) &Wait, sizeof(Wait), OCI_ATTR_WAIT, errhp));

		// dequeue the message...
		CHECKERR(errhp,	OCIAQDeq(
			svchp, errhp,
			(ub1 *) queue_name.c_str(),
			deq_opt,
			msg_prop,
			payload_tdo, 
			(dvoid **) &payload,
			(dvoid **) NULL, 
			&msg_id, OCI_DEFAULT)
		);

		// todo: what if we timed out without getting a message???

		// get the payload
		if(payload != NULL)
		{
			payload_ptr = OCIRawPtr(envhp, payload);
			payload_sz = OCIRawSize(envhp, payload);
			msg.setPayload(payload_ptr, payload_sz);
		}

		// get the message id
		if(msg_id != NULL)
		{
			str_msg_id.assign((char *) OCIRawPtr(envhp, msg_id), OCIRawSize(envhp, msg_id));
			msg.setMessageID(str_msg_id);
		}

		// get the message's correlation identifier
		CHECKERR(errhp, OCIAttrGet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &corr_id_out, &corr_id_out_sz, OCI_ATTR_CORRELATION, errhp));
		if(corr_id_out)
		{
			str_corr_id_out.assign((char *)corr_id_out, corr_id_out_sz);
			msg.setCorrelationIdentifier(str_corr_id_out);
		}

		// get the message's priority
		CHECKERR(errhp, OCIAttrGet(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *) &priority_out, NULL, OCI_ATTR_PRIORITY, errhp));
		msg.setPriority(priority_out);

		OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES);
		OCIDescriptorFree(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS);
	}

	catch(OraError err)
	{
		OCIDescriptorFree(msg_prop, OCI_DTYPE_AQMSG_PROPERTIES);
		OCIDescriptorFree(deq_opt, OCI_DTYPE_AQDEQ_OPTIONS);
		throw;
	}

	return;
}


⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -