⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 suma.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 5 页
字号:
      jam();      sendSubCreateRef(signal, req, GrepError::NOSPACE_IN_POOL);      return;    }    if(!c_syncPool.seize(syncPtr)) {      jam();      sendSubCreateRef(signal, req, GrepError::NOSPACE_IN_POOL);      return;    }    jam();    subPtr.p->m_subscriberRef    = subRef;    subPtr.p->m_subscriberData   = subData;    subPtr.p->m_subscriptionId   = subId;    subPtr.p->m_subscriptionKey  = subKey;    subPtr.p->m_subscriptionType = type;      /**     * ok to memset? Support on all compilers     * @todo find out if memset is supported by all compilers     */    memset(subPtr.p->m_tables,0,MAX_TABLES);    subPtr.p->m_maxTables    = 0;    subPtr.p->m_currentTable = 0;    subPtr.p->m_syncPtrI   = syncPtr.i;    subPtr.p->m_markRemove = false;    subPtr.p->m_nSubscribers = 0;    c_subscriptions.add(subPtr);    syncPtr.p->m_subscriptionPtrI = subPtr.i;    syncPtr.p->m_doSendSyncData   = true;    syncPtr.p->ptrI               = syncPtr.i;    syncPtr.p->m_locked           = false;    syncPtr.p->m_error            = false;  }  if (restartFlag ||       type == SubCreateReq::TableEvent) {    syncPtr.p->m_doSendSyncData = false;    ndbrequire(type != SubCreateReq::SingleTableScan);    jam();    if (subPtr.p->m_tables[req.tableId] != 0) {      ndbrequire(false); //TODO remove      jam();      sendSubCreateRef(signal, req, GrepError::SELECTED_TABLE_ALREADY_ADDED);      return;    }    if (addTableFlag) {      ndbrequire(type != SubCreateReq::TableEvent);      jam();    }    subPtr.p->m_maxTables++;    addTableId(req.tableId, subPtr, syncPtr.p);  } else {    switch(type){    case SubCreateReq::SingleTableScan:      {	jam();	syncPtr.p->m_tableList.append(&req.tableId, 1);	if(signal->getNoOfSections() > 0){	  SegmentedSectionPtr ptr;	  signal->getSection(ptr, SubCreateReq::ATTRIBUTE_LIST);	  LocalDataBuffer<15> attrBuf(c_dataBufferPool,syncPtr.p->m_attributeList);	  append(attrBuf, ptr, getSectionSegmentPool());	}      }    break;#if 0    case SubCreateReq::SelectiveTableSnapshot:      /**       * Tables specified by the user that does not exist       * in the database are just ignored. No error message       * is given, nor does the db nodes crash       * @todo: Memory is not release here (used tableBuf)       */      {	if(signal->getNoOfSections() == 0 ){	  jam();	  sendSubCreateRef(signal, req, GrepError::WRONG_NO_OF_SECTIONS);	  return;	}	jam();      	SegmentedSectionPtr ptr;	signal->getSection(ptr,0);// SubCreateReq::TABLE_LIST);	SimplePropertiesSectionReader r0(ptr, getSectionSegmentPool());	Uint32 i=0;	char table[MAX_TAB_NAME_SIZE];	r0.reset();	r0.first();	while(true){	  if ((r0.getValueType() != SimpleProperties::StringValue) ||	      (r0.getValueLen() <= 0)) {	    releaseSections(signal);	    ndbrequire(false);	  }	  r0.getString(table);	  strcpy(subPtr.p->m_tableNames[i],table);	  i++;	  if(!r0.next())	    break;	}	releaseSections(signal);	subPtr.p->m_maxTables    = i;	subPtr.p->m_currentTable = 0;	releaseSections(signal);	convertNameToId(subPtr, signal);	return;      }    break;#endif    case SubCreateReq::DatabaseSnapshot:      {	jam();      }    break;    default:      ndbrequire(false);    }  }  sendSubCreateConf(signal, sender, subPtr);  return;}voidSumaParticipant::sendSubCreateConf(Signal* signal, Uint32 sender,				   SubscriptionPtr subPtr){  SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();        conf->subscriptionId       = subPtr.p->m_subscriptionId;  conf->subscriptionKey      = subPtr.p->m_subscriptionKey;  conf->subscriberData       = subPtr.p->m_subscriberData;  sendSignal(sender, GSN_SUB_CREATE_CONF, signal,	     SubCreateConf::SignalLength, JBB);}voidSumaParticipant::sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint32 errCode){  jam();  SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend();  ref->subscriberRef  = reference();  ref->subscriberData = req.subscriberData;  ref->err = errCode;  releaseSections(signal);  sendSignal(signal->getSendersBlockRef(), GSN_SUB_CREATE_REF, signal, 	     SubCreateRef::SignalLength, JBB);  return;}Uint32SumaParticipant::getFirstGCI(Signal* signal) {  if (c_lastCompleteGCI == RNIL) {    ndbout_c("WARNING: c_lastCompleteGCI == RNIL");    return 0;  }  return c_lastCompleteGCI+3;}/********************************************************** * * Setting upp trigger for subscription * */void SumaParticipant::execSUB_SYNC_REQ(Signal* signal) {  jamEntry();  CRASH_INSERTION(13004);#ifdef EVENT_PH3_DEBUG  ndbout_c("SumaParticipant::execSUB_SYNC_REQ");#endif  SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr();  SubscriptionPtr subPtr;  Subscription key;   key.m_subscriptionId = req->subscriptionId;  key.m_subscriptionKey = req->subscriptionKey;    if(!c_subscriptions.find(subPtr, key)){    jam();    sendSubSyncRef(signal, GrepError::SUBSCRIPTION_ID_NOT_FOUND);    return;  }  /**   * @todo Tomas, do you really need to do this?   */  if(subPtr.p->m_subscriptionType == SubCreateReq::TableEvent) {    jam();    subPtr.p->m_subscriberData = req->subscriberData;  }  bool ok = false;  SubscriptionData::Part part = (SubscriptionData::Part)req->part;    Ptr<SyncRecord> syncPtr;  c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);  switch(part){  case SubscriptionData::MetaData:    ok = true;    jam();    if (subPtr.p->m_subscriptionType == SubCreateReq::DatabaseSnapshot) {      TableList::DataBufferIterator it;      syncPtr.p->m_tableList.first(it);      if(it.isNull()) {	/**	 * Get all tables from dict	 */	ListTablesReq * req = (ListTablesReq*)signal->getDataPtrSend();	req->senderRef   = reference();	req->senderData  = syncPtr.i;	req->requestData = 0;	/**	 * @todo: accomodate scan of index tables?	 */	req->setTableType(DictTabInfo::UserTable);	sendSignal(DBDICT_REF, GSN_LIST_TABLES_REQ, signal, 		   ListTablesReq::SignalLength, JBB);	break;      }    }    syncPtr.p->startMeta(signal);    break;  case SubscriptionData::TableData: {    ok = true;    jam();    syncPtr.p->startScan(signal);    break;  }  }  ndbrequire(ok);}voidSumaParticipant::sendSubSyncRef(Signal* signal, Uint32 errCode){  jam();  SubSyncRef  * ref =     (SubSyncRef *)signal->getDataPtrSend();  ref->err = errCode;  sendSignal(signal->getSendersBlockRef(), 	     GSN_SUB_SYNC_REF, 	     signal, 	     SubSyncRef::SignalLength,	     JBB);	       releaseSections(signal);    return;}/********************************************************** * Dict interface */voidSumaParticipant::execLIST_TABLES_CONF(Signal* signal){  jamEntry();  CRASH_INSERTION(13005);  ListTablesConf* const conf = (ListTablesConf*)signal->getDataPtr();  SyncRecord* tmp = c_syncPool.getPtr(conf->senderData);  tmp->runLIST_TABLES_CONF(signal);}voidSumaParticipant::execGET_TABINFOREF(Signal* signal){  jamEntry();  GetTabInfoRef* const ref = (GetTabInfoRef*)signal->getDataPtr();  SyncRecord* tmp = c_syncPool.getPtr(ref->senderData);  tmp->runGET_TABINFOREF(signal);}voidSumaParticipant::execGET_TABINFO_CONF(Signal* signal){  jamEntry();  CRASH_INSERTION(13006);  if(!assembleFragments(signal)){    return;  }    GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr();    Uint32 tableId = conf->tableId;  Uint32 senderData = conf->senderData;  SyncRecord* tmp = c_syncPool.getPtr(senderData);  ndbrequire(parseTable(signal, conf, tableId, tmp));  tmp->runGET_TABINFO_CONF(signal);}boolSumaParticipant::parseTable(Signal* signal, GetTabInfoConf* conf, Uint32 tableId,			    SyncRecord* syncPtr_p){  SegmentedSectionPtr ptr;  signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);    SimplePropertiesSectionReader it(ptr, getSectionSegmentPool());    SimpleProperties::UnpackStatus s;  DictTabInfo::Table tableDesc; tableDesc.init();  s = SimpleProperties::unpack(it, &tableDesc, 			       DictTabInfo::TableMapping, 			       DictTabInfo::TableMappingSize, 			       true, true);    ndbrequire(s == SimpleProperties::Break);  TablePtr tabPtr;  c_tables.find(tabPtr, tableId);    if(!tabPtr.isNull() &&     tabPtr.p->m_schemaVersion != tableDesc.TableVersion){    jam();    tabPtr.p->release(* this);    // oops wrong schema version in stored tabledesc    // we need to find all subscriptions with old table desc    // and all subscribers to this    // hopefully none    c_tables.release(tabPtr);    tabPtr.setNull();    DLHashTable<SumaParticipant::Subscription>::Iterator i_subPtr;    c_subscriptions.first(i_subPtr);    SubscriptionPtr subPtr;    for(;!i_subPtr.isNull();c_subscriptions.next(i_subPtr)){      jam();      c_subscriptions.getPtr(subPtr, i_subPtr.curr.i);      SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);      if (tmp == syncPtr_p) {	jam();	continue;      }      if (subPtr.p->m_tables[tableId]) {	jam();	subPtr.p->m_tables[tableId] = 0; // remove this old table reference	TableList::DataBufferIterator it;	for(tmp->m_tableList.first(it);!it.isNull();tmp->m_tableList.next(it)) {	  jam();	  if (*it.data == tableId){	    jam();	    Uint32 *pdata = it.data;	    tmp->m_tableList.next(it);	    for(;!it.isNull();tmp->m_tableList.next(it)) {	      jam();	      *pdata = *it.data;	      pdata = it.data;	    }	    *pdata = RNIL; // todo remove this last item...	    break;	  }	}      }    }  }  if (tabPtr.isNull()) {    jam();    /**     * Uninitialized table record     */    ndbrequire(c_tables.seize(tabPtr));    new (tabPtr.p) Table;    tabPtr.p->m_schemaVersion = RNIL;    tabPtr.p->m_tableId = tableId;    tabPtr.p->m_hasTriggerDefined[0] = 0;    tabPtr.p->m_hasTriggerDefined[1] = 0;    tabPtr.p->m_hasTriggerDefined[2] = 0;    tabPtr.p->m_triggerIds[0] = ILLEGAL_TRIGGER_ID;    tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID;    tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;#if 0    ndbout_c("Get tab info conf %d", tableId);#endif    c_tables.add(tabPtr);  }  if(tabPtr.p->m_attributes.getSize() != 0){    jam();    return true;  }  /**   * Initialize table object   */  Uint32 noAttribs = tableDesc.NoOfAttributes;  Uint32 notFixed = (tableDesc.NoOfNullable+tableDesc.NoOfVariable);  tabPtr.p->m_schemaVersion = tableDesc.TableVersion;    // The attribute buffer  LocalDataBuffer<15> attrBuf(c_dataBufferPool, tabPtr.p->m_attributes);    // Temporary buffer  DataBuffer<15> theRest(c_dataBufferPool);  if(!attrBuf.seize(noAttribs)){    ndbrequire(false);    return false;  }    if(!theRest.seize(notFixed)){    ndbrequire(false);    return false;  }    DataBuffer<15>::DataBufferIterator attrIt; // Fixed not nullable  DataBuffer<15>::DataBufferIterator restIt; // variable + nullable  attrBuf.first(attrIt);  theRest.first(restIt);    for(Uint32 i = 0; i < noAttribs; i++) {    DictTabInfo::Attribute attrDesc; attrDesc.init();    s = SimpleProperties::unpack(it, &attrDesc, 				 DictTabInfo::AttributeMapping, 				 DictTabInfo::AttributeMappingSize, 				 true, true);    ndbrequire(s == SimpleProperties::Break);    if (!attrDesc.AttributeNullableFlag 	/* && !attrDesc.AttributeVariableFlag */) {      jam();      * attrIt.data = attrDesc.AttributeId;      attrBuf.next(attrIt);    } else {      jam();      * restIt.data = attrDesc.AttributeId;      theRest.next(restIt);    }        // Move to next attribute    it.next();  }  /**   * Put the rest in end of attrBuf   */  theRest.first(restIt);  for(; !restIt.isNull(); theRest.next(restIt)){    * attrIt.data = * restIt.data;    attrBuf.next(attrIt);  }  theRest.release();    return true;}voidSumaParticipant::execDI_FCOUNTCONF(Signal* signal){  jamEntry();    CRASH_INSERTION(13007);  const Uint32 senderData = signal->theData[3];  SyncRecord* tmp = c_syncPool.getPtr(senderData);  tmp->runDI_FCOUNTCONF(signal);}void SumaParticipant::execDIGETPRIMCONF(Signal* signal){  jamEntry();    CRASH_INSERTION(13008);  const Uint32 senderData = signal->theData[1];  SyncRecord* tmp = c_syncPool.getPtr(senderData);  tmp->runDIGETPRIMCONF(signal);}voidSumaParticipant::execCREATE_TRIG_CONF(Signal* signal){  jamEntry();  DBUG_ENTER("SumaParticipant::execCREATE_TRIG_CONF");  CRASH_INSERTION(13009);  CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr();  const Uint32 senderData = conf->getConnectionPtr();  SyncRecord* tmp = c_syncPool.getPtr(senderData);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -