⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ndbeventoperationimpl.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 3 页
字号:
  assert(m_max == MAX_NUMBER_ACTIVE_EVENTS);  // TODO make sure we don't hit roof  m_handlers.push_back(h);  DBUG_VOID_RETURN;}voidNdbGlobalEventBuffer::real_remove(NdbGlobalEventBufferHandle *h){  DBUG_ENTER("NdbGlobalEventBuffer::real_remove");  DBUG_PRINT("enter",("m_handles.size()=%u %u", m_handlers.size(), h));  for (Uint32 i=0 ; i < m_handlers.size(); i++)  {    DBUG_PRINT("info",("m_handlers[%u] %u", i, m_handlers[i]));    if (m_handlers[i] == h)    {      m_handlers.erase(i);      if (m_handlers.size() == 0)      {	DBUG_PRINT("info",("last to go"));	delete[] m_buf;	m_buf = NULL;      }      DBUG_VOID_RETURN;    }  }  ndbout_c("NdbGlobalEventBuffer::real_remove() non-existing handle");  DBUG_PRINT("error",("non-existing handle"));  abort();  DBUG_VOID_RETURN;}intNdbGlobalEventBuffer::real_prepareAddSubscribeEvent(NdbGlobalEventBufferHandle *aHandle, NdbEventOperationImpl *eventOp, int& hasSubscriber){  DBUG_ENTER("NdbGlobalEventBuffer::real_prepareAddSubscribeEvent");  int i;  int bufferId= -1;  Uint32 eventId= eventOp->m_eventId;  DBUG_PRINT("enter",("eventId: %u", eventId));  //  add_drop_lock(); // only one thread can do add or drop at a time  // Find place where eventId already set  for (i=0; i<m_no; i++) {    if (m_buf[i].gId == eventId) {      bufferId= i;      break;    }  }  if (bufferId < 0) {    // find space for new bufferId    for (i=0; i<m_no; i++) {      if (m_buf[i].gId == 0) {	bufferId= i; // we found an empty spot	goto found_bufferId;      }    }    if (bufferId < 0 &&	m_no < m_max) {      // room for more so get that      bufferId= m_no;      m_buf[m_no].gId= 0;      m_no++;    } else {       //      add_drop_unlock();      DBUG_PRINT("error",("Can't accept more subscribers:"			  " bufferId=%d, m_no=%d, m_max=%d",			  bufferId, m_no, m_max));      DBUG_RETURN(-1);    }  }found_bufferId:  BufItem &b= m_buf[ID(bufferId)];  if (b.gId == 0) { // first subscriber needs some initialization    bufferId= NO_ID(0, bufferId);    b.gId= eventId;    b.eventType= (Uint32)eventOp->m_eventImpl->mi_type;    if ((b.p_buf_mutex= NdbMutex_Create()) == NULL) {      ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed");      abort();    }    b.subs= 0;    b.f= 0;    b.sz= 0;    b.max_sz= aHandle->m_bufferL;    b.data=       (BufItem::Data *)NdbMem_Allocate(b.max_sz*sizeof(BufItem::Data));    for (int i = 0; i < b.max_sz; i++) {      b.data[i].sdata= NULL;      b.data[i].ptr[0].p= NULL;      b.data[i].ptr[1].p= NULL;      b.data[i].ptr[2].p= NULL;    }  } else {    DBUG_PRINT("info",	       ("TRYING handle one subscriber per event b.subs=%u",b.subs));    int ni = -1;    for(int i=0; i < b.subs;i++) {      if (b.ps[i].theHandle == NULL) {	ni = i;	break;      }    }    if (ni < 0) {      if (b.subs < MAX_SUBSCRIBERS_PER_EVENT) {	ni = b.subs;      } else {	DBUG_PRINT("error",		   ("Can't accept more subscribers: b.subs=%d",b.subs));	//	add_drop_unlock();	DBUG_RETURN(-1);      }    }    bufferId = NO_ID(ni, bufferId);  }  // initialize BufItem::Ps  {    int n = NO(bufferId);    NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n];    e.theHandle = aHandle;    e.b=0;    e.bufferempty = 1;    e.overrun=0; // set to -1 to handle first insert  }  if (b.subs > 0)    hasSubscriber = 1;  else    hasSubscriber = 0;  DBUG_PRINT("info",("handed out bufferId=%d for eventId=%d hasSubscriber=%d",		     bufferId, eventId, hasSubscriber));  /* we now have a lock on the prepare so that no one can mess with this   * unlock comes in unprepareAddSubscribeEvent or addSubscribeEvent   */  DBUG_RETURN(bufferId);}voidNdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId){  DBUG_ENTER("NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent");  BufItem &b = m_buf[ID(bufferId)];  int n = NO(bufferId);  DBUG_PRINT("enter", ("bufferId=%d,ID(bufferId)=%d,NO(bufferId)=%d",		       bufferId, ID(bufferId), NO(bufferId)));  b.ps[n].theHandle = NULL;  // remove subscribers from the end,  // we have to keep gaps since the position  // has been handed out in bufferId  for (int i = b.subs-1; i >= 0; i--)    if (b.ps[i].theHandle == NULL)      b.subs--;    else      break;  if (b.subs == 0) {    DBUG_PRINT("info",("no more subscribers left on eventId %d", b.gId));    b.gId= 0;  // We don't have any subscribers, reuse BufItem    if (b.data) {      NdbMem_Free((void *)b.data);      b.data = NULL;    }    if (b.p_buf_mutex) {      NdbMutex_Destroy(b.p_buf_mutex);      b.p_buf_mutex = NULL;    }  }  //  add_drop_unlock();  DBUG_VOID_RETURN;}voidNdbGlobalEventBuffer::real_addSubscribeEvent(int bufferId, 					     void *ndbEventOperation){  DBUG_ENTER("NdbGlobalEventBuffer::real_addSubscribeEvent");  BufItem &b = m_buf[ID(bufferId)];  int n = NO(bufferId);  b.subs++;  b.ps[n].theHandle->addBufferId(bufferId);  //  add_drop_unlock();  DBUG_PRINT("info",("added bufferId %d", bufferId));  DBUG_VOID_RETURN;}voidNdbGlobalEventBuffer::real_unprepareDropSubscribeEvent(int bufferId){  //  add_drop_unlock(); // only one thread can do add or drop at a time}int NdbGlobalEventBuffer::real_prepareDropSubscribeEvent(int bufferId,						     int& hasSubscriber){  DBUG_ENTER("NdbGlobalEventBuffer::real_prepareDropSubscribeEvent");  //  add_drop_lock(); // only one thread can do add or drop at a time  BufItem &b = m_buf[ID(bufferId)];  int n = 0;  for(int i=0; i < b.subs;i++) {    if (b.ps[i].theHandle != NULL)      n++;  }  if (n > 1)    hasSubscriber = 1;  else if (n == 1)    hasSubscriber = 0;  else  {    DBUG_RETURN(-1);  }  DBUG_RETURN(0);}voidNdbGlobalEventBuffer::real_dropSubscribeEvent(int bufferId){  DBUG_ENTER("NdbGlobalEventBuffer::real_dropSubscribeEvent");  //  add_drop_lock(); // only one thread can do add-drop at a time  BufItem &b = m_buf[ID(bufferId)];  int n = NO(bufferId);  b.ps[n].overrun=0;  b.ps[n].bufferempty=1;  b.ps[n].b=0;  b.ps[n].theHandle->dropBufferId(bufferId);  real_unprepareAddSubscribeEvent(bufferId); // does add_drop_unlock();#ifdef EVENT_DEBUG  ndbout_c("dropSubscribeEvent:: dropped bufferId %d", bufferId);#endif  DBUG_VOID_RETURN;}voidNdbGlobalEventBuffer::real_latestGCI(int bufferId, Uint32 gci){  if (gci > m_latestGCI)    m_latestGCI = gci;  else if ((m_latestGCI-gci) > 0xffff) // If NDB stays up :-)    m_latestGCI = gci;}Uint32NdbGlobalEventBuffer::real_getLatestGCI(){  return m_latestGCI;}intNdbGlobalEventBuffer::real_insertDataL(int bufferId, 				       const SubTableData * const sdata, 				       LinearSectionPtr ptr[3]){  DBUG_ENTER("NdbGlobalEventBuffer::real_insertDataL");  BufItem &b = m_buf[ID(bufferId)];#ifdef EVENT_DEBUG  int n = NO(bufferId);#endif  if ( b.eventType & (1 << (Uint32)sdata->operation) )  {    if (b.subs) {#ifdef EVENT_DEBUG      ndbout_c("data insertion in buffer %d with eventId %d", bufferId, b.gId);#endif      // move front forward      if (copy_data_alloc(sdata, ptr,			  b.data[b.f].sdata, b.data[b.f].ptr))      {	DBUG_RETURN(-1);      }      for (int i=0; i < b.subs; i++) {	NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[i];	if (e.theHandle) { // active subscriber	  if (b.f == e.b) { // next-to-read == written	    if (e.bufferempty == 0) {	      e.overrun++; // another item has been overwritten	      e.b++; // move next-to-read next since old item was overwritten	      if (e.b == b.max_sz) e.b= 0; // start from beginning	    }	  }	  e.bufferempty = 0;	  // signal subscriber that there's more to get	  NdbCondition_Signal(e.theHandle->p_cond);	}      }      b.f++; // move next-to-write      if (b.f == b.max_sz) b.f = 0; // start from beginning#ifdef EVENT_DEBUG      ndbout_c("Front= %d Back = %d overun = %d", b.f,	       b.ps[n].b, b.ps[n].overrun);#endif    } else {#ifdef EVENT_DEBUG      ndbout_c("Data arrived before ready eventId", b.gId);#endif    }  }  else  {#ifdef EVENT_DEBUG    ndbout_c("skipped");#endif  }  DBUG_RETURN(0);}int NdbGlobalEventBuffer::hasData(int bufferId) {  DBUG_ENTER("NdbGlobalEventBuffer::hasData");  BufItem &b = m_buf[ID(bufferId)];  int n = NO(bufferId);  NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n];  if(e.bufferempty)  {    DBUG_RETURN(0);  }  if (b.f <= e.b)  {    DBUG_RETURN(b.max_sz-e.b + b.f);  }  else  {    DBUG_RETURN(b.f-e.b);  }}int NdbGlobalEventBuffer::real_getDataL(const int bufferId,					SubTableData * &sdata,					LinearSectionPtr ptr[3],					int *pOverrun){  DBUG_ENTER("NdbGlobalEventBuffer::real_getDataL");  BufItem &b = m_buf[ID(bufferId)];  int n = NO(bufferId);  NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n];  if (pOverrun) {    *pOverrun = e.overrun;    e.overrun = 0; // if pOverrun is returned to user reset e.overrun  }  if (e.bufferempty)  {    DBUG_RETURN(0); // nothing to get  }  DBUG_PRINT("info",("ID(bufferId) %d NO(bufferId) %d e.b %d",		     ID(bufferId), NO(bufferId), e.b));  if (copy_data_alloc(b.data[e.b].sdata, b.data[e.b].ptr,		      sdata, ptr))  {    DBUG_RETURN(-1);  }  e.b++; if (e.b == b.max_sz) e.b= 0; // move next-to-read forward  if (b.f == e.b) // back has cought up with front    e.bufferempty = 1;#ifdef EVENT_DEBUG  ndbout_c("getting data from buffer %d with eventId %d", bufferId, b.gId);#endif  DBUG_RETURN(hasData(bufferId)+1);}int NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,				      LinearSectionPtr f_ptr[3],				      SubTableData * &t_sdata,				      LinearSectionPtr t_ptr[3]){  DBUG_ENTER("NdbGlobalEventBuffer::copy_data_alloc");  unsigned sz4= (sizeof(SubTableData)+3)>>2;  Uint32 *ptr= (Uint32*)NdbMem_Allocate((sz4 +					 f_ptr[0].sz +					 f_ptr[1].sz +					 f_ptr[2].sz) * sizeof(Uint32));  if (t_sdata)    NdbMem_Free((char*)t_sdata);  t_sdata= (SubTableData *)ptr;  memcpy(t_sdata,f_sdata,sizeof(SubTableData));  ptr+= sz4;  for (int i = 0; i < 3; i++) {    LinearSectionPtr & f_p = f_ptr[i];    LinearSectionPtr & t_p = t_ptr[i];    if (f_p.sz > 0) {      t_p.p= (Uint32 *)ptr;      memcpy(t_p.p, f_p.p, sizeof(Uint32)*f_p.sz);      ptr+= f_p.sz;      t_p.sz= f_p.sz;    } else {      t_p.p= NULL;      t_p.sz= 0;    }  }  DBUG_RETURN(0);}intNdbGlobalEventBuffer::real_wait(NdbGlobalEventBufferHandle *h,				int aMillisecondNumber){  DBUG_ENTER("NdbGlobalEventBuffer::real_wait");  // check if there are anything in any of the buffers  int i;  int n = 0;  for (i = 0; i < h->m_nids; i++)    n += hasData(h->m_bufferIds[i]);  if (n)   {    DBUG_RETURN(n);  }  int r = NdbCondition_WaitTimeout(h->p_cond, ndb_global_event_buffer_mutex,				   aMillisecondNumber);  if (r > 0)  {    DBUG_RETURN(-1);  }  n = 0;  for (i = 0; i < h->m_nids; i++)    n += hasData(h->m_bufferIds[i]);  DBUG_RETURN(n);}template class Vector<NdbGlobalEventBufferHandle*>;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -