📄 suma.cpp
字号:
jam(); sendSubCreateRef(signal, req, GrepError::NOSPACE_IN_POOL); return; } if(!c_syncPool.seize(syncPtr)) { jam(); sendSubCreateRef(signal, req, GrepError::NOSPACE_IN_POOL); return; } jam(); subPtr.p->m_subscriberRef = subRef; subPtr.p->m_subscriberData = subData; subPtr.p->m_subscriptionId = subId; subPtr.p->m_subscriptionKey = subKey; subPtr.p->m_subscriptionType = type; /** * ok to memset? Support on all compilers * @todo find out if memset is supported by all compilers */ memset(subPtr.p->m_tables,0,MAX_TABLES); subPtr.p->m_maxTables = 0; subPtr.p->m_currentTable = 0; subPtr.p->m_syncPtrI = syncPtr.i; subPtr.p->m_markRemove = false; subPtr.p->m_nSubscribers = 0; c_subscriptions.add(subPtr); syncPtr.p->m_subscriptionPtrI = subPtr.i; syncPtr.p->m_doSendSyncData = true; syncPtr.p->ptrI = syncPtr.i; syncPtr.p->m_locked = false; syncPtr.p->m_error = false; } if (restartFlag || type == SubCreateReq::TableEvent) { syncPtr.p->m_doSendSyncData = false; ndbrequire(type != SubCreateReq::SingleTableScan); jam(); if (subPtr.p->m_tables[req.tableId] != 0) { ndbrequire(false); //TODO remove jam(); sendSubCreateRef(signal, req, GrepError::SELECTED_TABLE_ALREADY_ADDED); return; } if (addTableFlag) { ndbrequire(type != SubCreateReq::TableEvent); jam(); } subPtr.p->m_maxTables++; addTableId(req.tableId, subPtr, syncPtr.p); } else { switch(type){ case SubCreateReq::SingleTableScan: { jam(); syncPtr.p->m_tableList.append(&req.tableId, 1); if(signal->getNoOfSections() > 0){ SegmentedSectionPtr ptr; signal->getSection(ptr, SubCreateReq::ATTRIBUTE_LIST); LocalDataBuffer<15> attrBuf(c_dataBufferPool,syncPtr.p->m_attributeList); append(attrBuf, ptr, getSectionSegmentPool()); } } break;#if 0 case SubCreateReq::SelectiveTableSnapshot: /** * Tables specified by the user that does not exist * in the database are just ignored. No error message * is given, nor does the db nodes crash * @todo: Memory is not release here (used tableBuf) */ { if(signal->getNoOfSections() == 0 ){ jam(); sendSubCreateRef(signal, req, GrepError::WRONG_NO_OF_SECTIONS); return; } jam(); SegmentedSectionPtr ptr; signal->getSection(ptr,0);// SubCreateReq::TABLE_LIST); SimplePropertiesSectionReader r0(ptr, getSectionSegmentPool()); Uint32 i=0; char table[MAX_TAB_NAME_SIZE]; r0.reset(); r0.first(); while(true){ if ((r0.getValueType() != SimpleProperties::StringValue) || (r0.getValueLen() <= 0)) { releaseSections(signal); ndbrequire(false); } r0.getString(table); strcpy(subPtr.p->m_tableNames[i],table); i++; if(!r0.next()) break; } releaseSections(signal); subPtr.p->m_maxTables = i; subPtr.p->m_currentTable = 0; releaseSections(signal); convertNameToId(subPtr, signal); return; } break;#endif case SubCreateReq::DatabaseSnapshot: { jam(); } break; default: ndbrequire(false); } } sendSubCreateConf(signal, sender, subPtr); return;}voidSumaParticipant::sendSubCreateConf(Signal* signal, Uint32 sender, SubscriptionPtr subPtr){ SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend(); conf->subscriptionId = subPtr.p->m_subscriptionId; conf->subscriptionKey = subPtr.p->m_subscriptionKey; conf->subscriberData = subPtr.p->m_subscriberData; sendSignal(sender, GSN_SUB_CREATE_CONF, signal, SubCreateConf::SignalLength, JBB);}voidSumaParticipant::sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint32 errCode){ jam(); SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend(); ref->subscriberRef = reference(); ref->subscriberData = req.subscriberData; ref->err = errCode; releaseSections(signal); sendSignal(signal->getSendersBlockRef(), GSN_SUB_CREATE_REF, signal, SubCreateRef::SignalLength, JBB); return;}Uint32SumaParticipant::getFirstGCI(Signal* signal) { if (c_lastCompleteGCI == RNIL) { ndbout_c("WARNING: c_lastCompleteGCI == RNIL"); return 0; } return c_lastCompleteGCI+3;}/********************************************************** * * Setting upp trigger for subscription * */void SumaParticipant::execSUB_SYNC_REQ(Signal* signal) { jamEntry(); CRASH_INSERTION(13004);#ifdef EVENT_PH3_DEBUG ndbout_c("SumaParticipant::execSUB_SYNC_REQ");#endif SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr(); SubscriptionPtr subPtr; Subscription key; key.m_subscriptionId = req->subscriptionId; key.m_subscriptionKey = req->subscriptionKey; if(!c_subscriptions.find(subPtr, key)){ jam(); sendSubSyncRef(signal, GrepError::SUBSCRIPTION_ID_NOT_FOUND); return; } /** * @todo Tomas, do you really need to do this? */ if(subPtr.p->m_subscriptionType == SubCreateReq::TableEvent) { jam(); subPtr.p->m_subscriberData = req->subscriberData; } bool ok = false; SubscriptionData::Part part = (SubscriptionData::Part)req->part; Ptr<SyncRecord> syncPtr; c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); switch(part){ case SubscriptionData::MetaData: ok = true; jam(); if (subPtr.p->m_subscriptionType == SubCreateReq::DatabaseSnapshot) { TableList::DataBufferIterator it; syncPtr.p->m_tableList.first(it); if(it.isNull()) { /** * Get all tables from dict */ ListTablesReq * req = (ListTablesReq*)signal->getDataPtrSend(); req->senderRef = reference(); req->senderData = syncPtr.i; req->requestData = 0; /** * @todo: accomodate scan of index tables? */ req->setTableType(DictTabInfo::UserTable); sendSignal(DBDICT_REF, GSN_LIST_TABLES_REQ, signal, ListTablesReq::SignalLength, JBB); break; } } syncPtr.p->startMeta(signal); break; case SubscriptionData::TableData: { ok = true; jam(); syncPtr.p->startScan(signal); break; } } ndbrequire(ok);}voidSumaParticipant::sendSubSyncRef(Signal* signal, Uint32 errCode){ jam(); SubSyncRef * ref = (SubSyncRef *)signal->getDataPtrSend(); ref->err = errCode; sendSignal(signal->getSendersBlockRef(), GSN_SUB_SYNC_REF, signal, SubSyncRef::SignalLength, JBB); releaseSections(signal); return;}/********************************************************** * Dict interface */voidSumaParticipant::execLIST_TABLES_CONF(Signal* signal){ jamEntry(); CRASH_INSERTION(13005); ListTablesConf* const conf = (ListTablesConf*)signal->getDataPtr(); SyncRecord* tmp = c_syncPool.getPtr(conf->senderData); tmp->runLIST_TABLES_CONF(signal);}voidSumaParticipant::execGET_TABINFOREF(Signal* signal){ jamEntry(); GetTabInfoRef* const ref = (GetTabInfoRef*)signal->getDataPtr(); SyncRecord* tmp = c_syncPool.getPtr(ref->senderData); tmp->runGET_TABINFOREF(signal);}voidSumaParticipant::execGET_TABINFO_CONF(Signal* signal){ jamEntry(); CRASH_INSERTION(13006); if(!assembleFragments(signal)){ return; } GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr(); Uint32 tableId = conf->tableId; Uint32 senderData = conf->senderData; SyncRecord* tmp = c_syncPool.getPtr(senderData); ndbrequire(parseTable(signal, conf, tableId, tmp)); tmp->runGET_TABINFO_CONF(signal);}boolSumaParticipant::parseTable(Signal* signal, GetTabInfoConf* conf, Uint32 tableId, SyncRecord* syncPtr_p){ SegmentedSectionPtr ptr; signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO); SimplePropertiesSectionReader it(ptr, getSectionSegmentPool()); SimpleProperties::UnpackStatus s; DictTabInfo::Table tableDesc; tableDesc.init(); s = SimpleProperties::unpack(it, &tableDesc, DictTabInfo::TableMapping, DictTabInfo::TableMappingSize, true, true); ndbrequire(s == SimpleProperties::Break); TablePtr tabPtr; c_tables.find(tabPtr, tableId); if(!tabPtr.isNull() && tabPtr.p->m_schemaVersion != tableDesc.TableVersion){ jam(); tabPtr.p->release(* this); // oops wrong schema version in stored tabledesc // we need to find all subscriptions with old table desc // and all subscribers to this // hopefully none c_tables.release(tabPtr); tabPtr.setNull(); DLHashTable<SumaParticipant::Subscription>::Iterator i_subPtr; c_subscriptions.first(i_subPtr); SubscriptionPtr subPtr; for(;!i_subPtr.isNull();c_subscriptions.next(i_subPtr)){ jam(); c_subscriptions.getPtr(subPtr, i_subPtr.curr.i); SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI); if (tmp == syncPtr_p) { jam(); continue; } if (subPtr.p->m_tables[tableId]) { jam(); subPtr.p->m_tables[tableId] = 0; // remove this old table reference TableList::DataBufferIterator it; for(tmp->m_tableList.first(it);!it.isNull();tmp->m_tableList.next(it)) { jam(); if (*it.data == tableId){ jam(); Uint32 *pdata = it.data; tmp->m_tableList.next(it); for(;!it.isNull();tmp->m_tableList.next(it)) { jam(); *pdata = *it.data; pdata = it.data; } *pdata = RNIL; // todo remove this last item... break; } } } } } if (tabPtr.isNull()) { jam(); /** * Uninitialized table record */ ndbrequire(c_tables.seize(tabPtr)); new (tabPtr.p) Table; tabPtr.p->m_schemaVersion = RNIL; tabPtr.p->m_tableId = tableId; tabPtr.p->m_hasTriggerDefined[0] = 0; tabPtr.p->m_hasTriggerDefined[1] = 0; tabPtr.p->m_hasTriggerDefined[2] = 0; tabPtr.p->m_triggerIds[0] = ILLEGAL_TRIGGER_ID; tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID; tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;#if 0 ndbout_c("Get tab info conf %d", tableId);#endif c_tables.add(tabPtr); } if(tabPtr.p->m_attributes.getSize() != 0){ jam(); return true; } /** * Initialize table object */ Uint32 noAttribs = tableDesc.NoOfAttributes; Uint32 notFixed = (tableDesc.NoOfNullable+tableDesc.NoOfVariable); tabPtr.p->m_schemaVersion = tableDesc.TableVersion; // The attribute buffer LocalDataBuffer<15> attrBuf(c_dataBufferPool, tabPtr.p->m_attributes); // Temporary buffer DataBuffer<15> theRest(c_dataBufferPool); if(!attrBuf.seize(noAttribs)){ ndbrequire(false); return false; } if(!theRest.seize(notFixed)){ ndbrequire(false); return false; } DataBuffer<15>::DataBufferIterator attrIt; // Fixed not nullable DataBuffer<15>::DataBufferIterator restIt; // variable + nullable attrBuf.first(attrIt); theRest.first(restIt); for(Uint32 i = 0; i < noAttribs; i++) { DictTabInfo::Attribute attrDesc; attrDesc.init(); s = SimpleProperties::unpack(it, &attrDesc, DictTabInfo::AttributeMapping, DictTabInfo::AttributeMappingSize, true, true); ndbrequire(s == SimpleProperties::Break); if (!attrDesc.AttributeNullableFlag /* && !attrDesc.AttributeVariableFlag */) { jam(); * attrIt.data = attrDesc.AttributeId; attrBuf.next(attrIt); } else { jam(); * restIt.data = attrDesc.AttributeId; theRest.next(restIt); } // Move to next attribute it.next(); } /** * Put the rest in end of attrBuf */ theRest.first(restIt); for(; !restIt.isNull(); theRest.next(restIt)){ * attrIt.data = * restIt.data; attrBuf.next(attrIt); } theRest.release(); return true;}voidSumaParticipant::execDI_FCOUNTCONF(Signal* signal){ jamEntry(); CRASH_INSERTION(13007); const Uint32 senderData = signal->theData[3]; SyncRecord* tmp = c_syncPool.getPtr(senderData); tmp->runDI_FCOUNTCONF(signal);}void SumaParticipant::execDIGETPRIMCONF(Signal* signal){ jamEntry(); CRASH_INSERTION(13008); const Uint32 senderData = signal->theData[1]; SyncRecord* tmp = c_syncPool.getPtr(senderData); tmp->runDIGETPRIMCONF(signal);}voidSumaParticipant::execCREATE_TRIG_CONF(Signal* signal){ jamEntry(); DBUG_ENTER("SumaParticipant::execCREATE_TRIG_CONF"); CRASH_INSERTION(13009); CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr(); const Uint32 senderData = conf->getConnectionPtr(); SyncRecord* tmp = c_syncPool.getPtr(senderData);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -