📄 ndbeventoperationimpl.cpp
字号:
assert(m_max == MAX_NUMBER_ACTIVE_EVENTS); // TODO make sure we don't hit roof m_handlers.push_back(h); DBUG_VOID_RETURN;}voidNdbGlobalEventBuffer::real_remove(NdbGlobalEventBufferHandle *h){ DBUG_ENTER("NdbGlobalEventBuffer::real_remove"); DBUG_PRINT("enter",("m_handles.size()=%u %u", m_handlers.size(), h)); for (Uint32 i=0 ; i < m_handlers.size(); i++) { DBUG_PRINT("info",("m_handlers[%u] %u", i, m_handlers[i])); if (m_handlers[i] == h) { m_handlers.erase(i); if (m_handlers.size() == 0) { DBUG_PRINT("info",("last to go")); delete[] m_buf; m_buf = NULL; } DBUG_VOID_RETURN; } } ndbout_c("NdbGlobalEventBuffer::real_remove() non-existing handle"); DBUG_PRINT("error",("non-existing handle")); abort(); DBUG_VOID_RETURN;}intNdbGlobalEventBuffer::real_prepareAddSubscribeEvent(NdbGlobalEventBufferHandle *aHandle, NdbEventOperationImpl *eventOp, int& hasSubscriber){ DBUG_ENTER("NdbGlobalEventBuffer::real_prepareAddSubscribeEvent"); int i; int bufferId= -1; Uint32 eventId= eventOp->m_eventId; DBUG_PRINT("enter",("eventId: %u", eventId)); // add_drop_lock(); // only one thread can do add or drop at a time // Find place where eventId already set for (i=0; i<m_no; i++) { if (m_buf[i].gId == eventId) { bufferId= i; break; } } if (bufferId < 0) { // find space for new bufferId for (i=0; i<m_no; i++) { if (m_buf[i].gId == 0) { bufferId= i; // we found an empty spot goto found_bufferId; } } if (bufferId < 0 && m_no < m_max) { // room for more so get that bufferId= m_no; m_buf[m_no].gId= 0; m_no++; } else { // add_drop_unlock(); DBUG_PRINT("error",("Can't accept more subscribers:" " bufferId=%d, m_no=%d, m_max=%d", bufferId, m_no, m_max)); DBUG_RETURN(-1); } }found_bufferId: BufItem &b= m_buf[ID(bufferId)]; if (b.gId == 0) { // first subscriber needs some initialization bufferId= NO_ID(0, bufferId); b.gId= eventId; b.eventType= (Uint32)eventOp->m_eventImpl->mi_type; if ((b.p_buf_mutex= NdbMutex_Create()) == NULL) { ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed"); abort(); } b.subs= 0; b.f= 0; b.sz= 0; b.max_sz= aHandle->m_bufferL; b.data= (BufItem::Data *)NdbMem_Allocate(b.max_sz*sizeof(BufItem::Data)); for (int i = 0; i < b.max_sz; i++) { b.data[i].sdata= NULL; b.data[i].ptr[0].p= NULL; b.data[i].ptr[1].p= NULL; b.data[i].ptr[2].p= NULL; } } else { DBUG_PRINT("info", ("TRYING handle one subscriber per event b.subs=%u",b.subs)); int ni = -1; for(int i=0; i < b.subs;i++) { if (b.ps[i].theHandle == NULL) { ni = i; break; } } if (ni < 0) { if (b.subs < MAX_SUBSCRIBERS_PER_EVENT) { ni = b.subs; } else { DBUG_PRINT("error", ("Can't accept more subscribers: b.subs=%d",b.subs)); // add_drop_unlock(); DBUG_RETURN(-1); } } bufferId = NO_ID(ni, bufferId); } // initialize BufItem::Ps { int n = NO(bufferId); NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n]; e.theHandle = aHandle; e.b=0; e.bufferempty = 1; e.overrun=0; // set to -1 to handle first insert } if (b.subs > 0) hasSubscriber = 1; else hasSubscriber = 0; DBUG_PRINT("info",("handed out bufferId=%d for eventId=%d hasSubscriber=%d", bufferId, eventId, hasSubscriber)); /* we now have a lock on the prepare so that no one can mess with this * unlock comes in unprepareAddSubscribeEvent or addSubscribeEvent */ DBUG_RETURN(bufferId);}voidNdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId){ DBUG_ENTER("NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent"); BufItem &b = m_buf[ID(bufferId)]; int n = NO(bufferId); DBUG_PRINT("enter", ("bufferId=%d,ID(bufferId)=%d,NO(bufferId)=%d", bufferId, ID(bufferId), NO(bufferId))); b.ps[n].theHandle = NULL; // remove subscribers from the end, // we have to keep gaps since the position // has been handed out in bufferId for (int i = b.subs-1; i >= 0; i--) if (b.ps[i].theHandle == NULL) b.subs--; else break; if (b.subs == 0) { DBUG_PRINT("info",("no more subscribers left on eventId %d", b.gId)); b.gId= 0; // We don't have any subscribers, reuse BufItem if (b.data) { NdbMem_Free((void *)b.data); b.data = NULL; } if (b.p_buf_mutex) { NdbMutex_Destroy(b.p_buf_mutex); b.p_buf_mutex = NULL; } } // add_drop_unlock(); DBUG_VOID_RETURN;}voidNdbGlobalEventBuffer::real_addSubscribeEvent(int bufferId, void *ndbEventOperation){ DBUG_ENTER("NdbGlobalEventBuffer::real_addSubscribeEvent"); BufItem &b = m_buf[ID(bufferId)]; int n = NO(bufferId); b.subs++; b.ps[n].theHandle->addBufferId(bufferId); // add_drop_unlock(); DBUG_PRINT("info",("added bufferId %d", bufferId)); DBUG_VOID_RETURN;}voidNdbGlobalEventBuffer::real_unprepareDropSubscribeEvent(int bufferId){ // add_drop_unlock(); // only one thread can do add or drop at a time}int NdbGlobalEventBuffer::real_prepareDropSubscribeEvent(int bufferId, int& hasSubscriber){ DBUG_ENTER("NdbGlobalEventBuffer::real_prepareDropSubscribeEvent"); // add_drop_lock(); // only one thread can do add or drop at a time BufItem &b = m_buf[ID(bufferId)]; int n = 0; for(int i=0; i < b.subs;i++) { if (b.ps[i].theHandle != NULL) n++; } if (n > 1) hasSubscriber = 1; else if (n == 1) hasSubscriber = 0; else { DBUG_RETURN(-1); } DBUG_RETURN(0);}voidNdbGlobalEventBuffer::real_dropSubscribeEvent(int bufferId){ DBUG_ENTER("NdbGlobalEventBuffer::real_dropSubscribeEvent"); // add_drop_lock(); // only one thread can do add-drop at a time BufItem &b = m_buf[ID(bufferId)]; int n = NO(bufferId); b.ps[n].overrun=0; b.ps[n].bufferempty=1; b.ps[n].b=0; b.ps[n].theHandle->dropBufferId(bufferId); real_unprepareAddSubscribeEvent(bufferId); // does add_drop_unlock();#ifdef EVENT_DEBUG ndbout_c("dropSubscribeEvent:: dropped bufferId %d", bufferId);#endif DBUG_VOID_RETURN;}voidNdbGlobalEventBuffer::real_latestGCI(int bufferId, Uint32 gci){ if (gci > m_latestGCI) m_latestGCI = gci; else if ((m_latestGCI-gci) > 0xffff) // If NDB stays up :-) m_latestGCI = gci;}Uint32NdbGlobalEventBuffer::real_getLatestGCI(){ return m_latestGCI;}intNdbGlobalEventBuffer::real_insertDataL(int bufferId, const SubTableData * const sdata, LinearSectionPtr ptr[3]){ DBUG_ENTER("NdbGlobalEventBuffer::real_insertDataL"); BufItem &b = m_buf[ID(bufferId)];#ifdef EVENT_DEBUG int n = NO(bufferId);#endif if ( b.eventType & (1 << (Uint32)sdata->operation) ) { if (b.subs) {#ifdef EVENT_DEBUG ndbout_c("data insertion in buffer %d with eventId %d", bufferId, b.gId);#endif // move front forward if (copy_data_alloc(sdata, ptr, b.data[b.f].sdata, b.data[b.f].ptr)) { DBUG_RETURN(-1); } for (int i=0; i < b.subs; i++) { NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[i]; if (e.theHandle) { // active subscriber if (b.f == e.b) { // next-to-read == written if (e.bufferempty == 0) { e.overrun++; // another item has been overwritten e.b++; // move next-to-read next since old item was overwritten if (e.b == b.max_sz) e.b= 0; // start from beginning } } e.bufferempty = 0; // signal subscriber that there's more to get NdbCondition_Signal(e.theHandle->p_cond); } } b.f++; // move next-to-write if (b.f == b.max_sz) b.f = 0; // start from beginning#ifdef EVENT_DEBUG ndbout_c("Front= %d Back = %d overun = %d", b.f, b.ps[n].b, b.ps[n].overrun);#endif } else {#ifdef EVENT_DEBUG ndbout_c("Data arrived before ready eventId", b.gId);#endif } } else {#ifdef EVENT_DEBUG ndbout_c("skipped");#endif } DBUG_RETURN(0);}int NdbGlobalEventBuffer::hasData(int bufferId) { DBUG_ENTER("NdbGlobalEventBuffer::hasData"); BufItem &b = m_buf[ID(bufferId)]; int n = NO(bufferId); NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n]; if(e.bufferempty) { DBUG_RETURN(0); } if (b.f <= e.b) { DBUG_RETURN(b.max_sz-e.b + b.f); } else { DBUG_RETURN(b.f-e.b); }}int NdbGlobalEventBuffer::real_getDataL(const int bufferId, SubTableData * &sdata, LinearSectionPtr ptr[3], int *pOverrun){ DBUG_ENTER("NdbGlobalEventBuffer::real_getDataL"); BufItem &b = m_buf[ID(bufferId)]; int n = NO(bufferId); NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n]; if (pOverrun) { *pOverrun = e.overrun; e.overrun = 0; // if pOverrun is returned to user reset e.overrun } if (e.bufferempty) { DBUG_RETURN(0); // nothing to get } DBUG_PRINT("info",("ID(bufferId) %d NO(bufferId) %d e.b %d", ID(bufferId), NO(bufferId), e.b)); if (copy_data_alloc(b.data[e.b].sdata, b.data[e.b].ptr, sdata, ptr)) { DBUG_RETURN(-1); } e.b++; if (e.b == b.max_sz) e.b= 0; // move next-to-read forward if (b.f == e.b) // back has cought up with front e.bufferempty = 1;#ifdef EVENT_DEBUG ndbout_c("getting data from buffer %d with eventId %d", bufferId, b.gId);#endif DBUG_RETURN(hasData(bufferId)+1);}int NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata, LinearSectionPtr f_ptr[3], SubTableData * &t_sdata, LinearSectionPtr t_ptr[3]){ DBUG_ENTER("NdbGlobalEventBuffer::copy_data_alloc"); unsigned sz4= (sizeof(SubTableData)+3)>>2; Uint32 *ptr= (Uint32*)NdbMem_Allocate((sz4 + f_ptr[0].sz + f_ptr[1].sz + f_ptr[2].sz) * sizeof(Uint32)); if (t_sdata) NdbMem_Free((char*)t_sdata); t_sdata= (SubTableData *)ptr; memcpy(t_sdata,f_sdata,sizeof(SubTableData)); ptr+= sz4; for (int i = 0; i < 3; i++) { LinearSectionPtr & f_p = f_ptr[i]; LinearSectionPtr & t_p = t_ptr[i]; if (f_p.sz > 0) { t_p.p= (Uint32 *)ptr; memcpy(t_p.p, f_p.p, sizeof(Uint32)*f_p.sz); ptr+= f_p.sz; t_p.sz= f_p.sz; } else { t_p.p= NULL; t_p.sz= 0; } } DBUG_RETURN(0);}intNdbGlobalEventBuffer::real_wait(NdbGlobalEventBufferHandle *h, int aMillisecondNumber){ DBUG_ENTER("NdbGlobalEventBuffer::real_wait"); // check if there are anything in any of the buffers int i; int n = 0; for (i = 0; i < h->m_nids; i++) n += hasData(h->m_bufferIds[i]); if (n) { DBUG_RETURN(n); } int r = NdbCondition_WaitTimeout(h->p_cond, ndb_global_event_buffer_mutex, aMillisecondNumber); if (r > 0) { DBUG_RETURN(-1); } n = 0; for (i = 0; i < h->m_nids; i++) n += hasData(h->m_bufferIds[i]); DBUG_RETURN(n);}template class Vector<NdbGlobalEventBufferHandle*>;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -