⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ndbeventoperationimpl.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 3 页
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <ndb_global.h>#include <kernel_types.h>#include "NdbDictionaryImpl.hpp"#include "API.hpp"#include <NdbOut.hpp>#include "NdbApiSignal.hpp"#include "TransporterFacade.hpp"#include <signaldata/CreateEvnt.hpp>#include <signaldata/SumaImpl.hpp>#include <SimpleProperties.hpp>#include <Bitmask.hpp>#include <AttributeHeader.hpp>#include <AttributeList.hpp>#include <NdbError.hpp>#include <BaseString.hpp>#include <UtilBuffer.hpp>#include <NdbDictionary.hpp>#include <Ndb.hpp>#include "NdbImpl.hpp"#include "DictCache.hpp"#include <portlib/NdbMem.h>#include <NdbRecAttr.hpp>#include <NdbEventOperation.hpp>#include "NdbEventOperationImpl.hpp"/* * Class NdbEventOperationImpl * * *///#define EVENT_DEBUGNdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,					     Ndb *theNdb, 					     const char* eventName, 					     const int bufferLength)   : NdbEventOperation(*this), m_ndb(theNdb),    m_state(EO_ERROR), m_bufferL(bufferLength){  m_eventId = 0;  theFirstPkAttrs[0] = NULL;  theCurrentPkAttrs[0] = NULL;  theFirstPkAttrs[1] = NULL;  theCurrentPkAttrs[1] = NULL;  theFirstDataAttrs[0] = NULL;  theCurrentDataAttrs[0] = NULL;  theFirstDataAttrs[1] = NULL;  theCurrentDataAttrs[1] = NULL;  sdata = NULL;  ptr[0].p = NULL;  ptr[1].p = NULL;  ptr[2].p = NULL;  // we should lookup id in Dictionary, TODO  // also make sure we only have one listener on each event  if (!m_ndb) abort();  NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();  if (!myDict) { m_error.code= m_ndb->getNdbError().code; return; }  const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName);  if (!myEvnt) { m_error.code= myDict->getNdbError().code; return; }  m_eventImpl = &myEvnt->m_impl;  m_eventId = m_eventImpl->m_eventId;  m_bufferHandle = m_ndb->getGlobalEventBufferHandle();  if (m_bufferHandle->m_bufferL > 0)     m_bufferL =m_bufferHandle->m_bufferL;  else    m_bufferHandle->m_bufferL = m_bufferL;  m_state = EO_CREATED;}NdbEventOperationImpl::~NdbEventOperationImpl(){  int i;  if (sdata) NdbMem_Free((char*)sdata);  for (i=0 ; i<2; i++) {    NdbRecAttr *p = theFirstPkAttrs[i];    while (p) {      NdbRecAttr *p_next = p->next();      m_ndb->releaseRecAttr(p);      p = p_next;    }  }  for (i=0 ; i<2; i++) {    NdbRecAttr *p = theFirstDataAttrs[i];    while (p) {      NdbRecAttr *p_next = p->next();      m_ndb->releaseRecAttr(p);      p = p_next;    }  }  if (m_state == EO_EXECUTING) {    stop();    // m_bufferHandle->dropSubscribeEvent(m_bufferId);    ; // We should send stop signal here  }}NdbEventOperation::StateNdbEventOperationImpl::getState(){  return m_state;}NdbRecAttr*NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n){  DBUG_ENTER("NdbEventOperationImpl::getValue");  if (m_state != EO_CREATED) {    ndbout_c("NdbEventOperationImpl::getValue may only be called between instantiation and execute()");    DBUG_RETURN(NULL);  }  NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);  if (tAttrInfo == NULL) {    ndbout_c("NdbEventOperationImpl::getValue attribute %s not found",colName);    DBUG_RETURN(NULL);  }  DBUG_RETURN(NdbEventOperationImpl::getValue(tAttrInfo, aValue, n));}NdbRecAttr*NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, int n){  DBUG_ENTER("NdbEventOperationImpl::getValue");  // Insert Attribute Id into ATTRINFO part.   NdbRecAttr **theFirstAttr;  NdbRecAttr **theCurrentAttr;  if (tAttrInfo->getPrimaryKey())  {    theFirstAttr = &theFirstPkAttrs[n];    theCurrentAttr = &theCurrentPkAttrs[n];  }  else  {    theFirstAttr = &theFirstDataAttrs[n];    theCurrentAttr = &theCurrentDataAttrs[n];  }  /************************************************************************   *	Get a Receive Attribute object and link it into the operation object.   ************************************************************************/  NdbRecAttr *tAttr = m_ndb->getRecAttr();  if (tAttr == NULL) {     exit(-1);    //setErrorCodeAbort(4000);    DBUG_RETURN(NULL);  }  /**********************************************************************   * Now set the attribute identity and the pointer to the data in    * the RecAttr object   * Also set attribute size, array size and attribute type   ********************************************************************/  if (tAttr->setup(tAttrInfo, aValue)) {    //setErrorCodeAbort(4000);    m_ndb->releaseRecAttr(tAttr);    exit(-1);    DBUG_RETURN(NULL);  }  //theErrorLine++;  tAttr->setUNDEFINED();    // We want to keep the list sorted to make data insertion easier later  if (*theFirstAttr == NULL) {    *theFirstAttr = tAttr;    *theCurrentAttr = tAttr;    tAttr->next(NULL);  } else {    Uint32 tAttrId = tAttrInfo->m_attrId;    if (tAttrId > (*theCurrentAttr)->attrId()) { // right order      (*theCurrentAttr)->next(tAttr);      tAttr->next(NULL);      *theCurrentAttr = tAttr;    } else if ((*theFirstAttr)->next() == NULL ||    // only one in list	       (*theFirstAttr)->attrId() > tAttrId) {// or first       tAttr->next(*theFirstAttr);      *theFirstAttr = tAttr;    } else { // at least 2 in list and not first and not last      NdbRecAttr *p = *theFirstAttr;      NdbRecAttr *p_next = p->next();      while (tAttrId > p_next->attrId()) {	p = p_next;	p_next = p->next();      }      if (tAttrId == p_next->attrId()) { // Using same attribute twice	tAttr->release(); // do I need to do this?	m_ndb->releaseRecAttr(tAttr);	exit(-1);	DBUG_RETURN(NULL);      }      // this is it, between p and p_next      p->next(tAttr);      tAttr->next(p_next);    }  }  DBUG_RETURN(tAttr);}intNdbEventOperationImpl::execute(){  DBUG_ENTER("NdbEventOperationImpl::execute");  NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();  if (!myDict) {    m_error.code= m_ndb->getNdbError().code;    DBUG_RETURN(-1);  }  if (theFirstPkAttrs[0] == NULL &&       theFirstDataAttrs[0] == NULL) { // defaults to get all      }  NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict);  int hasSubscriber;  int r= m_bufferHandle->prepareAddSubscribeEvent(this,						  hasSubscriber /*return value*/);  if (r < 0)  {    m_error.code= 4709;    DBUG_RETURN(-1);  }  m_eventImpl->m_bufferId = m_bufferId = (Uint32)r;  r = -1;  if (m_bufferId >= 0) {    // now we check if there's already a subscriber    if (hasSubscriber == 0) { // only excute if there's no other subscribers       r = myDictImpl.executeSubscribeEvent(*m_eventImpl);    } else {      r = 0;    }    if (r) {      //Error      m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId);      m_state = EO_ERROR;    } else {      m_bufferHandle->addSubscribeEvent(m_bufferId, this);      m_state = EO_EXECUTING;    }  } else {    //Error    m_state = EO_ERROR;  }  DBUG_RETURN(r);}intNdbEventOperationImpl::stop(){  DBUG_ENTER("NdbEventOperationImpl::stop");  if (m_state != EO_EXECUTING)  {    DBUG_RETURN(-1);  }  //  ndbout_c("NdbEventOperation::stopping()");  NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();  if (!myDict) {    m_error.code= m_ndb->getNdbError().code;    DBUG_RETURN(-1);  }  NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict);  int hasSubscriber;  int ret =     m_bufferHandle->prepareDropSubscribeEvent(m_bufferId,					      hasSubscriber /* return value */);  if (ret < 0) {    m_error.code= 4712;    DBUG_RETURN(-1);  }  //  m_eventImpl->m_bufferId = m_bufferId;  int r = -1;  if (hasSubscriber == 0) { // only excute if there's no other subscribers    r = myDictImpl.stopSubscribeEvent(*m_eventImpl);#ifdef EVENT_DEBUG    ndbout_c("NdbEventOperation::stopping() done");#endif  } else    r = 0;  if (r) {    //Error    m_bufferHandle->unprepareDropSubscribeEvent(m_bufferId);    m_error.code= myDictImpl.m_error.code;    m_state = EO_ERROR;  } else {#ifdef EVENT_DEBUG    ndbout_c("NdbEventOperation::dropping()");#endif    m_bufferHandle->dropSubscribeEvent(m_bufferId);    m_state = EO_CREATED;  }  DBUG_RETURN(r);}boolNdbEventOperationImpl::isConsistent(){  return sdata->isGCIConsistent();}Uint32NdbEventOperationImpl::getGCI(){  return sdata->gci;}Uint32NdbEventOperationImpl::getLatestGCI(){  return NdbGlobalEventBufferHandle::getLatestGCI();}intNdbEventOperationImpl::next(int *pOverrun){  DBUG_ENTER("NdbEventOperationImpl::next");  int nr = 10000; // a high value  int tmpOverrun = 0;  int *ptmpOverrun;  if (pOverrun) {    ptmpOverrun = &tmpOverrun;  } else    ptmpOverrun = NULL;  while (nr > 0) {    int r=NdbGlobalEventBufferHandle::getDataL(m_bufferId, sdata,					       ptr, pOverrun);    if (pOverrun) {      tmpOverrun += *pOverrun;      *pOverrun = tmpOverrun;    }    if (r <= 0)     {      DBUG_RETURN(r); // no data    }    if (r < nr) r = nr; else nr--; // we don't want to be stuck here forever  #ifdef EVENT_DEBUG    ndbout_c("!!!!!!!sdata->operation %u", (Uint32)sdata->operation);#endif    // now move the data into the RecAttrs    if ((theFirstPkAttrs[0] == NULL) && 	(theFirstPkAttrs[1] == NULL) &&	(theFirstDataAttrs[0] == NULL) && 	(theFirstDataAttrs[1] == NULL))     {      DBUG_RETURN(r);    }    // no copying since no RecAttr's    Uint32 *aAttrPtr = ptr[0].p;    Uint32 *aAttrEndPtr = aAttrPtr + ptr[0].sz;    Uint32 *aDataPtr = ptr[1].p;#ifdef EVENT_DEBUG    int i;    printf("after values sz=%u\n", ptr[1].sz);    for(i=0; i < (int)ptr[1].sz; i++)      printf ("H'%.8X ",ptr[1].p[i]);    printf("\n");    printf("before values sz=%u\n", ptr[2].sz);    for(i=0; i < (int)ptr[2].sz; i++)      printf ("H'%.8X ",ptr[2].p[i]);    printf("\n");#endif    // copy data into the RecAttr's    // we assume that the respective attribute lists are sorted    // first the pk's    {      NdbRecAttr *tAttr= theFirstPkAttrs[0];      while(tAttr)      {	assert(aAttrPtr < aAttrEndPtr);	unsigned tDataSz= AttributeHeader(*aAttrPtr).getDataSize();	assert(tAttr->attrId() ==	       AttributeHeader(*aAttrPtr).getAttributeId());	assert(tAttr->receive_data(aDataPtr, tDataSz));        // next	aAttrPtr++;	aDataPtr+= tDataSz;	tAttr= tAttr->next();      }    }    NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0];    Uint32 tRecAttrId;    Uint32 tAttrId;    Uint32 tDataSz;    int hasSomeData=0;    while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) {      tRecAttrId = tWorkingRecAttr->attrId();      tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();      tDataSz = AttributeHeader(*aAttrPtr).getDataSize();            while (tAttrId > tRecAttrId) {	//printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);	tWorkingRecAttr->setUNDEFINED();	tWorkingRecAttr = tWorkingRecAttr->next();	if (tWorkingRecAttr == NULL)	  break;	tRecAttrId = tWorkingRecAttr->attrId();      }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -