📄 ndbeventoperationimpl.cpp
字号:
if (tWorkingRecAttr == NULL) break; //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); if (tAttrId == tRecAttrId) { hasSomeData++; //printf("set!\n"); assert(tWorkingRecAttr->receive_data(aDataPtr, tDataSz)); tWorkingRecAttr = tWorkingRecAttr->next(); } aAttrPtr++; aDataPtr += tDataSz; } while (tWorkingRecAttr != NULL) { tRecAttrId = tWorkingRecAttr->attrId(); //printf("set undefined [%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); tWorkingRecAttr->setUNDEFINED(); tWorkingRecAttr = tWorkingRecAttr->next(); } tWorkingRecAttr = theFirstDataAttrs[1]; aDataPtr = ptr[2].p; Uint32 *aDataEndPtr = aDataPtr + ptr[2].sz; while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) { tRecAttrId = tWorkingRecAttr->attrId(); tAttrId = AttributeHeader(*aDataPtr).getAttributeId(); tDataSz = AttributeHeader(*aDataPtr).getDataSize(); aDataPtr++; while (tAttrId > tRecAttrId) { tWorkingRecAttr->setUNDEFINED(); tWorkingRecAttr = tWorkingRecAttr->next(); if (tWorkingRecAttr == NULL) break; tRecAttrId = tWorkingRecAttr->attrId(); } if (tWorkingRecAttr == NULL) break; if (tAttrId == tRecAttrId) { assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey()); hasSomeData++; assert(tWorkingRecAttr->receive_data(aDataPtr, tDataSz)); tWorkingRecAttr = tWorkingRecAttr->next(); } aDataPtr += tDataSz; } while (tWorkingRecAttr != NULL) { tWorkingRecAttr->setUNDEFINED(); tWorkingRecAttr = tWorkingRecAttr->next(); } if (hasSomeData) { DBUG_RETURN(r); } } DBUG_RETURN(0);}NdbDictionary::Event::TableEvent NdbEventOperationImpl::getEventType(){ switch (sdata->operation) { case TriggerEvent::TE_INSERT: return NdbDictionary::Event::TE_INSERT; case TriggerEvent::TE_DELETE: return NdbDictionary::Event::TE_DELETE; case TriggerEvent::TE_UPDATE: return NdbDictionary::Event::TE_UPDATE; default: return NdbDictionary::Event::TE_ALL; }}voidNdbEventOperationImpl::print(){ int i; ndbout << "EventId " << m_eventId << "\n"; for (i = 0; i < 2; i++) { NdbRecAttr *p = theFirstPkAttrs[i]; ndbout << " %u " << i; while (p) { ndbout << " : " << p->attrId() << " = " << *p; p = p->next(); } ndbout << "\n"; } for (i = 0; i < 2; i++) { NdbRecAttr *p = theFirstDataAttrs[i]; ndbout << " %u " << i; while (p) { ndbout << " : " << p->attrId() << " = " << *p; p = p->next(); } ndbout << "\n"; }}voidNdbEventOperationImpl::printAll(){ Uint32 *aAttrPtr = ptr[0].p; Uint32 *aAttrEndPtr = aAttrPtr + ptr[0].sz; Uint32 *aDataPtr = ptr[1].p; //tRecAttr->setup(tAttrInfo, aValue)) { Uint32 tAttrId; Uint32 tDataSz; for (; aAttrPtr < aAttrEndPtr; ) { tAttrId = AttributeHeader(*aAttrPtr).getAttributeId(); tDataSz = AttributeHeader(*aAttrPtr).getDataSize(); aAttrPtr++; aDataPtr += tDataSz; }}int NdbEventOperationImpl::wait(void *p, int aMillisecondNumber){ return ((NdbGlobalEventBufferHandle*)p)->wait(aMillisecondNumber);}/* * Global variable ndbGlobalEventBuffer * Class NdbGlobalEventBufferHandle * Class NdbGlobalEventBuffer * */#define ADD_DROP_LOCK_GUARDR(TYPE, FN) \{ \ ndbGlobalEventBuffer->add_drop_lock(); \ ndbGlobalEventBuffer->lock(); \ TYPE r = ndbGlobalEventBuffer->FN; \ ndbGlobalEventBuffer->unlock(); \ if (r < 0) { \ ndbGlobalEventBuffer->add_drop_unlock(); \ } \ return r;\}#define GUARDR(TYPE, FN) \{ \ ndbGlobalEventBuffer->lock(); \ TYPE r = ndbGlobalEventBuffer->FN; \ ndbGlobalEventBuffer->unlock(); \ return r;\}#define GUARD(FN) \{ \ ndbGlobalEventBuffer->lock(); \ ndbGlobalEventBuffer->FN; \ ndbGlobalEventBuffer->unlock(); \}#define ADD_DROP_UNLOCK_GUARD(FN) \{ \ GUARD(FN); \ ndbGlobalEventBuffer->add_drop_unlock(); \}#define GUARDBLOCK(BLOCK) \{ \ ndbGlobalEventBuffer->lock(); \ BLOCK \ ndbGlobalEventBuffer->unlock(); \}/* * Global variable ndbGlobalEventBuffer * */extern NdbMutex * ndb_global_event_buffer_mutex;static NdbGlobalEventBuffer *ndbGlobalEventBuffer=NULL;/* * Class NdbGlobalEventBufferHandle * Each Ndb object has a Handle. This Handle is used to access the * global NdbGlobalEventBuffer instance ndbGlobalEventBuffer */NdbGlobalEventBufferHandle *NdbGlobalEventBuffer_init(int n) { return new NdbGlobalEventBufferHandle(n); // return NdbGlobalEventBufferHandle::init(n);}voidNdbGlobalEventBuffer_drop(NdbGlobalEventBufferHandle *h) { delete h;}NdbGlobalEventBufferHandle::NdbGlobalEventBufferHandle(int MAX_NUMBER_ACTIVE_EVENTS) : m_bufferL(0), m_nids(0){ if ((p_cond = NdbCondition_Create()) == NULL) { ndbout_c("NdbGlobalEventBufferHandle: NdbCondition_Create() failed"); exit(-1); } NdbMutex_Lock(ndb_global_event_buffer_mutex); if (ndbGlobalEventBuffer == NULL) { if (ndbGlobalEventBuffer == NULL) { ndbGlobalEventBuffer = new NdbGlobalEventBuffer(); if (!ndbGlobalEventBuffer) { NdbMutex_Unlock(ndb_global_event_buffer_mutex); ndbout_c("NdbGlobalEventBufferHandle:: failed to allocate ndbGlobalEventBuffer"); exit(-1); } } } NdbMutex_Unlock(ndb_global_event_buffer_mutex); GUARD(real_init(this,MAX_NUMBER_ACTIVE_EVENTS));}NdbGlobalEventBufferHandle::~NdbGlobalEventBufferHandle(){ NdbCondition_Destroy(p_cond); ndbGlobalEventBuffer->lock(); ndbGlobalEventBuffer->real_remove(this); ndbGlobalEventBuffer->unlock(); NdbMutex_Lock(ndb_global_event_buffer_mutex); if (ndbGlobalEventBuffer->m_handlers.size() == 0) { delete ndbGlobalEventBuffer; ndbGlobalEventBuffer = NULL; } NdbMutex_Unlock(ndb_global_event_buffer_mutex);}voidNdbGlobalEventBufferHandle::addBufferId(int bufferId){ DBUG_ENTER("NdbGlobalEventBufferHandle::addBufferId"); DBUG_PRINT("enter",("bufferId=%d",bufferId)); if (m_nids >= NDB_MAX_ACTIVE_EVENTS) { ndbout_c("NdbGlobalEventBufferHandle::addBufferId error in paramerer setting"); exit(-1); } m_bufferIds[m_nids] = bufferId; m_nids++; DBUG_VOID_RETURN;}voidNdbGlobalEventBufferHandle::dropBufferId(int bufferId){ DBUG_ENTER("NdbGlobalEventBufferHandle::dropBufferId"); DBUG_PRINT("enter",("bufferId=%d",bufferId)); for (int i = 0; i < m_nids; i++) if (m_bufferIds[i] == bufferId) { m_nids--; for (; i < m_nids; i++) m_bufferIds[i] = m_bufferIds[i+1]; DBUG_VOID_RETURN; } ndbout_c("NdbGlobalEventBufferHandle::dropBufferId %d does not exist", bufferId); exit(-1);}/*NdbGlobalEventBufferHandle *NdbGlobalEventBufferHandle::init (int MAX_NUMBER_ACTIVE_EVENTS){ return new NdbGlobalEventBufferHandle();}voidNdbGlobalEventBufferHandle::drop(NdbGlobalEventBufferHandle *handle){ delete handle;}*/int NdbGlobalEventBufferHandle::prepareAddSubscribeEvent(NdbEventOperationImpl *eventOp, int& hasSubscriber){ ADD_DROP_LOCK_GUARDR(int,real_prepareAddSubscribeEvent(this, eventOp, hasSubscriber));}voidNdbGlobalEventBufferHandle::addSubscribeEvent(int bufferId, NdbEventOperationImpl *ndbEventOperationImpl){ ADD_DROP_UNLOCK_GUARD(real_addSubscribeEvent(bufferId, ndbEventOperationImpl));}voidNdbGlobalEventBufferHandle::unprepareAddSubscribeEvent(int bufferId){ ADD_DROP_UNLOCK_GUARD(real_unprepareAddSubscribeEvent(bufferId));}int NdbGlobalEventBufferHandle::prepareDropSubscribeEvent(int bufferId, int& hasSubscriber){ ADD_DROP_LOCK_GUARDR(int,real_prepareDropSubscribeEvent(bufferId, hasSubscriber));}voidNdbGlobalEventBufferHandle::unprepareDropSubscribeEvent(int bufferId){ ADD_DROP_UNLOCK_GUARD(real_unprepareDropSubscribeEvent(bufferId));}void NdbGlobalEventBufferHandle::dropSubscribeEvent(int bufferId){ ADD_DROP_UNLOCK_GUARD(real_dropSubscribeEvent(bufferId));}int NdbGlobalEventBufferHandle::insertDataL(int bufferId, const SubTableData * const sdata, LinearSectionPtr ptr[3]){ GUARDR(int,real_insertDataL(bufferId,sdata,ptr));} voidNdbGlobalEventBufferHandle::latestGCI(int bufferId, Uint32 gci){ GUARD(real_latestGCI(bufferId,gci));} Uint32NdbGlobalEventBufferHandle::getLatestGCI(){ GUARDR(Uint32, real_getLatestGCI());} inline voidNdbGlobalEventBufferHandle::group_lock(){ ndbGlobalEventBuffer->group_lock();}inline voidNdbGlobalEventBufferHandle::group_unlock(){ ndbGlobalEventBuffer->group_unlock();}intNdbGlobalEventBufferHandle::wait(int aMillisecondNumber){ GUARDR(int, real_wait(this, aMillisecondNumber));}int NdbGlobalEventBufferHandle::getDataL(const int bufferId, SubTableData * &sdata, LinearSectionPtr ptr[3], int *pOverrun){ GUARDR(int,real_getDataL(bufferId,sdata,ptr,pOverrun));}/* * Class NdbGlobalEventBuffer * * */voidNdbGlobalEventBuffer::lock(){ if (!m_group_lock_flag) NdbMutex_Lock(ndb_global_event_buffer_mutex);}voidNdbGlobalEventBuffer::unlock(){ if (!m_group_lock_flag) NdbMutex_Unlock(ndb_global_event_buffer_mutex);}voidNdbGlobalEventBuffer::add_drop_lock(){ NdbMutex_Lock(p_add_drop_mutex);}voidNdbGlobalEventBuffer::add_drop_unlock(){ NdbMutex_Unlock(p_add_drop_mutex);}inline voidNdbGlobalEventBuffer::group_lock(){ lock(); m_group_lock_flag = 1;}inline voidNdbGlobalEventBuffer::group_unlock(){ m_group_lock_flag = 0; unlock();}voidNdbGlobalEventBuffer::lockB(int bufferId){ NdbMutex_Lock(m_buf[ID(bufferId)].p_buf_mutex);}voidNdbGlobalEventBuffer::unlockB(int bufferId){ NdbMutex_Lock(m_buf[ID(bufferId)].p_buf_mutex);}// Private methodsNdbGlobalEventBuffer::NdbGlobalEventBuffer() : m_handlers(), m_group_lock_flag(0), m_latestGCI(0), m_no(0) // must start at ZERO!{ if ((p_add_drop_mutex = NdbMutex_Create()) == NULL) { ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed"); exit(-1); }}NdbGlobalEventBuffer::~NdbGlobalEventBuffer(){ NdbMutex_Destroy(p_add_drop_mutex); // NdbMem_Deallocate(m_eventBufferIdToEventId);}voidNdbGlobalEventBuffer::real_init (NdbGlobalEventBufferHandle *h, int MAX_NUMBER_ACTIVE_EVENTS){ DBUG_ENTER("NdbGlobalEventBuffer::real_init"); DBUG_PRINT("enter",("m_handles.size()=%u %u", m_handlers.size(), h)); if (m_handlers.size() == 0) { // First init DBUG_PRINT("info",("first to come")); m_max = MAX_NUMBER_ACTIVE_EVENTS; m_buf = new BufItem[m_max]; for (int i=0; i<m_max; i++) { m_buf[i].gId= 0; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -