📄 ndbeventoperationimpl.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h>#include <kernel_types.h>#include "NdbDictionaryImpl.hpp"#include "API.hpp"#include <NdbOut.hpp>#include "NdbApiSignal.hpp"#include "TransporterFacade.hpp"#include <signaldata/CreateEvnt.hpp>#include <signaldata/SumaImpl.hpp>#include <SimpleProperties.hpp>#include <Bitmask.hpp>#include <AttributeHeader.hpp>#include <AttributeList.hpp>#include <NdbError.hpp>#include <BaseString.hpp>#include <UtilBuffer.hpp>#include <NdbDictionary.hpp>#include <Ndb.hpp>#include "NdbImpl.hpp"#include "DictCache.hpp"#include <portlib/NdbMem.h>#include <NdbRecAttr.hpp>#include <NdbEventOperation.hpp>#include "NdbEventOperationImpl.hpp"/* * Class NdbEventOperationImpl * * *///#define EVENT_DEBUGNdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, Ndb *theNdb, const char* eventName, const int bufferLength) : NdbEventOperation(*this), m_ndb(theNdb), m_state(EO_ERROR), m_bufferL(bufferLength){ m_eventId = 0; theFirstPkAttrs[0] = NULL; theCurrentPkAttrs[0] = NULL; theFirstPkAttrs[1] = NULL; theCurrentPkAttrs[1] = NULL; theFirstDataAttrs[0] = NULL; theCurrentDataAttrs[0] = NULL; theFirstDataAttrs[1] = NULL; theCurrentDataAttrs[1] = NULL; sdata = NULL; ptr[0].p = NULL; ptr[1].p = NULL; ptr[2].p = NULL; // we should lookup id in Dictionary, TODO // also make sure we only have one listener on each event if (!m_ndb) abort(); NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); if (!myDict) { m_error.code= m_ndb->getNdbError().code; return; } const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName); if (!myEvnt) { m_error.code= myDict->getNdbError().code; return; } m_eventImpl = &myEvnt->m_impl; m_eventId = m_eventImpl->m_eventId; m_bufferHandle = m_ndb->getGlobalEventBufferHandle(); if (m_bufferHandle->m_bufferL > 0) m_bufferL =m_bufferHandle->m_bufferL; else m_bufferHandle->m_bufferL = m_bufferL; m_state = EO_CREATED;}NdbEventOperationImpl::~NdbEventOperationImpl(){ int i; if (sdata) NdbMem_Free((char*)sdata); for (i=0 ; i<2; i++) { NdbRecAttr *p = theFirstPkAttrs[i]; while (p) { NdbRecAttr *p_next = p->next(); m_ndb->releaseRecAttr(p); p = p_next; } } for (i=0 ; i<2; i++) { NdbRecAttr *p = theFirstDataAttrs[i]; while (p) { NdbRecAttr *p_next = p->next(); m_ndb->releaseRecAttr(p); p = p_next; } } if (m_state == EO_EXECUTING) { stop(); // m_bufferHandle->dropSubscribeEvent(m_bufferId); ; // We should send stop signal here }}NdbEventOperation::StateNdbEventOperationImpl::getState(){ return m_state;}NdbRecAttr*NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n){ DBUG_ENTER("NdbEventOperationImpl::getValue"); if (m_state != EO_CREATED) { ndbout_c("NdbEventOperationImpl::getValue may only be called between instantiation and execute()"); DBUG_RETURN(NULL); } NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName); if (tAttrInfo == NULL) { ndbout_c("NdbEventOperationImpl::getValue attribute %s not found",colName); DBUG_RETURN(NULL); } DBUG_RETURN(NdbEventOperationImpl::getValue(tAttrInfo, aValue, n));}NdbRecAttr*NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, int n){ DBUG_ENTER("NdbEventOperationImpl::getValue"); // Insert Attribute Id into ATTRINFO part. NdbRecAttr **theFirstAttr; NdbRecAttr **theCurrentAttr; if (tAttrInfo->getPrimaryKey()) { theFirstAttr = &theFirstPkAttrs[n]; theCurrentAttr = &theCurrentPkAttrs[n]; } else { theFirstAttr = &theFirstDataAttrs[n]; theCurrentAttr = &theCurrentDataAttrs[n]; } /************************************************************************ * Get a Receive Attribute object and link it into the operation object. ************************************************************************/ NdbRecAttr *tAttr = m_ndb->getRecAttr(); if (tAttr == NULL) { exit(-1); //setErrorCodeAbort(4000); DBUG_RETURN(NULL); } /********************************************************************** * Now set the attribute identity and the pointer to the data in * the RecAttr object * Also set attribute size, array size and attribute type ********************************************************************/ if (tAttr->setup(tAttrInfo, aValue)) { //setErrorCodeAbort(4000); m_ndb->releaseRecAttr(tAttr); exit(-1); DBUG_RETURN(NULL); } //theErrorLine++; tAttr->setUNDEFINED(); // We want to keep the list sorted to make data insertion easier later if (*theFirstAttr == NULL) { *theFirstAttr = tAttr; *theCurrentAttr = tAttr; tAttr->next(NULL); } else { Uint32 tAttrId = tAttrInfo->m_attrId; if (tAttrId > (*theCurrentAttr)->attrId()) { // right order (*theCurrentAttr)->next(tAttr); tAttr->next(NULL); *theCurrentAttr = tAttr; } else if ((*theFirstAttr)->next() == NULL || // only one in list (*theFirstAttr)->attrId() > tAttrId) {// or first tAttr->next(*theFirstAttr); *theFirstAttr = tAttr; } else { // at least 2 in list and not first and not last NdbRecAttr *p = *theFirstAttr; NdbRecAttr *p_next = p->next(); while (tAttrId > p_next->attrId()) { p = p_next; p_next = p->next(); } if (tAttrId == p_next->attrId()) { // Using same attribute twice tAttr->release(); // do I need to do this? m_ndb->releaseRecAttr(tAttr); exit(-1); DBUG_RETURN(NULL); } // this is it, between p and p_next p->next(tAttr); tAttr->next(p_next); } } DBUG_RETURN(tAttr);}intNdbEventOperationImpl::execute(){ DBUG_ENTER("NdbEventOperationImpl::execute"); NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); if (!myDict) { m_error.code= m_ndb->getNdbError().code; DBUG_RETURN(-1); } if (theFirstPkAttrs[0] == NULL && theFirstDataAttrs[0] == NULL) { // defaults to get all } NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict); int hasSubscriber; int r= m_bufferHandle->prepareAddSubscribeEvent(this, hasSubscriber /*return value*/); if (r < 0) { m_error.code= 4709; DBUG_RETURN(-1); } m_eventImpl->m_bufferId = m_bufferId = (Uint32)r; r = -1; if (m_bufferId >= 0) { // now we check if there's already a subscriber if (hasSubscriber == 0) { // only excute if there's no other subscribers r = myDictImpl.executeSubscribeEvent(*m_eventImpl); } else { r = 0; } if (r) { //Error m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId); m_state = EO_ERROR; } else { m_bufferHandle->addSubscribeEvent(m_bufferId, this); m_state = EO_EXECUTING; } } else { //Error m_state = EO_ERROR; } DBUG_RETURN(r);}intNdbEventOperationImpl::stop(){ DBUG_ENTER("NdbEventOperationImpl::stop"); if (m_state != EO_EXECUTING) { DBUG_RETURN(-1); } // ndbout_c("NdbEventOperation::stopping()"); NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); if (!myDict) { m_error.code= m_ndb->getNdbError().code; DBUG_RETURN(-1); } NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict); int hasSubscriber; int ret = m_bufferHandle->prepareDropSubscribeEvent(m_bufferId, hasSubscriber /* return value */); if (ret < 0) { m_error.code= 4712; DBUG_RETURN(-1); } // m_eventImpl->m_bufferId = m_bufferId; int r = -1; if (hasSubscriber == 0) { // only excute if there's no other subscribers r = myDictImpl.stopSubscribeEvent(*m_eventImpl);#ifdef EVENT_DEBUG ndbout_c("NdbEventOperation::stopping() done");#endif } else r = 0; if (r) { //Error m_bufferHandle->unprepareDropSubscribeEvent(m_bufferId); m_error.code= myDictImpl.m_error.code; m_state = EO_ERROR; } else {#ifdef EVENT_DEBUG ndbout_c("NdbEventOperation::dropping()");#endif m_bufferHandle->dropSubscribeEvent(m_bufferId); m_state = EO_CREATED; } DBUG_RETURN(r);}boolNdbEventOperationImpl::isConsistent(){ return sdata->isGCIConsistent();}Uint32NdbEventOperationImpl::getGCI(){ return sdata->gci;}Uint32NdbEventOperationImpl::getLatestGCI(){ return NdbGlobalEventBufferHandle::getLatestGCI();}intNdbEventOperationImpl::next(int *pOverrun){ DBUG_ENTER("NdbEventOperationImpl::next"); int nr = 10000; // a high value int tmpOverrun = 0; int *ptmpOverrun; if (pOverrun) { ptmpOverrun = &tmpOverrun; } else ptmpOverrun = NULL; while (nr > 0) { int r=NdbGlobalEventBufferHandle::getDataL(m_bufferId, sdata, ptr, pOverrun); if (pOverrun) { tmpOverrun += *pOverrun; *pOverrun = tmpOverrun; } if (r <= 0) { DBUG_RETURN(r); // no data } if (r < nr) r = nr; else nr--; // we don't want to be stuck here forever #ifdef EVENT_DEBUG ndbout_c("!!!!!!!sdata->operation %u", (Uint32)sdata->operation);#endif // now move the data into the RecAttrs if ((theFirstPkAttrs[0] == NULL) && (theFirstPkAttrs[1] == NULL) && (theFirstDataAttrs[0] == NULL) && (theFirstDataAttrs[1] == NULL)) { DBUG_RETURN(r); } // no copying since no RecAttr's Uint32 *aAttrPtr = ptr[0].p; Uint32 *aAttrEndPtr = aAttrPtr + ptr[0].sz; Uint32 *aDataPtr = ptr[1].p;#ifdef EVENT_DEBUG int i; printf("after values sz=%u\n", ptr[1].sz); for(i=0; i < (int)ptr[1].sz; i++) printf ("H'%.8X ",ptr[1].p[i]); printf("\n"); printf("before values sz=%u\n", ptr[2].sz); for(i=0; i < (int)ptr[2].sz; i++) printf ("H'%.8X ",ptr[2].p[i]); printf("\n");#endif // copy data into the RecAttr's // we assume that the respective attribute lists are sorted // first the pk's { NdbRecAttr *tAttr= theFirstPkAttrs[0]; while(tAttr) { assert(aAttrPtr < aAttrEndPtr); unsigned tDataSz= AttributeHeader(*aAttrPtr).getDataSize(); assert(tAttr->attrId() == AttributeHeader(*aAttrPtr).getAttributeId()); assert(tAttr->receive_data(aDataPtr, tDataSz)); // next aAttrPtr++; aDataPtr+= tDataSz; tAttr= tAttr->next(); } } NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0]; Uint32 tRecAttrId; Uint32 tAttrId; Uint32 tDataSz; int hasSomeData=0; while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) { tRecAttrId = tWorkingRecAttr->attrId(); tAttrId = AttributeHeader(*aAttrPtr).getAttributeId(); tDataSz = AttributeHeader(*aAttrPtr).getDataSize(); while (tAttrId > tRecAttrId) { //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); tWorkingRecAttr->setUNDEFINED(); tWorkingRecAttr = tWorkingRecAttr->next(); if (tWorkingRecAttr == NULL) break; tRecAttrId = tWorkingRecAttr->attrId(); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -