📄 suma.cpp
字号:
convertNameToId(subPtr, signal);}void SumaParticipant::execGET_TABLEID_REF(Signal * signal){ jamEntry(); GetTableIdRef const * ref = (GetTableIdRef *)signal->getDataPtr(); Uint32 senderData = ref->senderData; // Uint32 err = ref->err; SubscriptionPtr subPtr; c_subscriptions.getPtr(subPtr, senderData); Uint32 subData = subPtr.p->m_subscriberData; SubCreateRef * reff = (SubCreateRef*)ref; /** * @todo: map ref->err to GrepError. */ reff->err = GrepError::SELECTED_TABLE_NOT_FOUND; reff->subscriberData = subData; sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_CREATE_REF, signal, SubCreateRef::SignalLength, JBB);}#endif/************************************************************* * * Creation of subscription id's * ************************************************************/void Suma::execCREATE_SUBID_REQ(Signal* signal) { jamEntry(); CRASH_INSERTION(13001); CreateSubscriptionIdReq const * req = (CreateSubscriptionIdReq*)signal->getDataPtr(); SubscriberPtr subbPtr; if(!c_subscriberPool.seize(subbPtr)){ jam(); sendSubIdRef(signal, GrepError::SUBSCRIPTION_ID_NOMEM); return; } subbPtr.p->m_subscriberRef = signal->getSendersBlockRef(); subbPtr.p->m_senderData = req->senderData; subbPtr.p->m_subscriberData = subbPtr.i; UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend(); utilReq->senderData = subbPtr.p->m_subscriberData; utilReq->sequenceId = SUMA_SEQUENCE; utilReq->requestType = UtilSequenceReq::NextVal; sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, signal, UtilSequenceReq::SignalLength, JBB);}voidSuma::execUTIL_SEQUENCE_CONF(Signal* signal){ jamEntry(); DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF"); CRASH_INSERTION(13002); UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr(); if(conf->requestType == UtilSequenceReq::Create) { jam(); createSequenceReply(signal, conf, NULL); DBUG_VOID_RETURN; } Uint64 subId; memcpy(&subId,conf->sequenceValue,8); Uint32 subData = conf->senderData; SubscriberPtr subbPtr; c_subscriberPool.getPtr(subbPtr,subData); CreateSubscriptionIdConf * subconf = (CreateSubscriptionIdConf*)conf; subconf->subscriptionId = (Uint32)subId; subconf->subscriptionKey =(getOwnNodeId() << 16) | (Uint32)(subId & 0xFFFF); subconf->subscriberData = subbPtr.p->m_senderData; sendSignal(subbPtr.p->m_subscriberRef, GSN_CREATE_SUBID_CONF, signal, CreateSubscriptionIdConf::SignalLength, JBB); c_subscriberPool.release(subbPtr); DBUG_VOID_RETURN;}voidSuma::execUTIL_SEQUENCE_REF(Signal* signal){ jamEntry(); DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF"); UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr(); if(ref->requestType == UtilSequenceReq::Create) { jam(); createSequenceReply(signal, NULL, ref); DBUG_VOID_RETURN; } Uint32 subData = ref->senderData; SubscriberPtr subbPtr; c_subscriberPool.getPtr(subbPtr,subData); sendSubIdRef(signal, GrepError::SEQUENCE_ERROR); c_subscriberPool.release(subbPtr); DBUG_VOID_RETURN;}//execUTIL_SEQUENCE_REF()voidSumaParticipant::sendSubIdRef(Signal* signal, Uint32 errCode){ jam(); CreateSubscriptionIdRef * ref = (CreateSubscriptionIdRef *)signal->getDataPtrSend(); ref->err = errCode; sendSignal(signal->getSendersBlockRef(), GSN_CREATE_SUBID_REF, signal, CreateSubscriptionIdRef::SignalLength, JBB); releaseSections(signal); return;}/********************************************************** * Suma participant interface * * Creation of subscriptions */voidSumaParticipant::execSUB_CREATE_REQ(Signal* signal) {#ifdef NODEFAIL_DEBUG ndbout_c("SumaParticipant::execSUB_CREATE_REQ");#endif jamEntry(); CRASH_INSERTION(13003); const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr(); const Uint32 subId = req.subscriptionId; const Uint32 subKey = req.subscriptionKey; const Uint32 subRef = req.subscriberRef; const Uint32 subData = req.subscriberData; const Uint32 type = req.subscriptionType & SubCreateReq::RemoveFlags; const Uint32 flags = req.subscriptionType & SubCreateReq::GetFlags; const bool addTableFlag = (flags & SubCreateReq::AddTableFlag) != 0; const bool restartFlag = (flags & SubCreateReq::RestartFlag) != 0; const Uint32 sender = signal->getSendersBlockRef(); Subscription key; key.m_subscriptionId = subId; key.m_subscriptionKey = subKey; SubscriptionPtr subPtr; Ptr<SyncRecord> syncPtr; if (addTableFlag) { ndbrequire(restartFlag); //TODO remove this if(!c_subscriptions.find(subPtr, key)) { jam(); sendSubCreateRef(signal, req, GrepError::SUBSCRIPTION_NOT_FOUND); return; } jam(); c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); } else { // Check that id/key is unique if(c_subscriptions.find(subPtr, key)) { jam(); sendSubCreateRef(signal, req, GrepError::SUBSCRIPTION_ID_NOT_UNIQUE); return; } if(!c_subscriptions.seize(subPtr)) { jam(); sendSubCreateRef(signal, req, GrepError::NOSPACE_IN_POOL); return; } if(!c_syncPool.seize(syncPtr)) { jam(); sendSubCreateRef(signal, req, GrepError::NOSPACE_IN_POOL); return; } jam(); subPtr.p->m_subscriberRef = subRef; subPtr.p->m_subscriberData = subData; subPtr.p->m_subscriptionId = subId; subPtr.p->m_subscriptionKey = subKey; subPtr.p->m_subscriptionType = type; /** * ok to memset? Support on all compilers * @todo find out if memset is supported by all compilers */ memset(subPtr.p->m_tables,0,MAX_TABLES); subPtr.p->m_maxTables = 0; subPtr.p->m_currentTable = 0; subPtr.p->m_syncPtrI = syncPtr.i; subPtr.p->m_markRemove = false; subPtr.p->m_nSubscribers = 0; c_subscriptions.add(subPtr); syncPtr.p->m_subscriptionPtrI = subPtr.i; syncPtr.p->m_doSendSyncData = true; syncPtr.p->ptrI = syncPtr.i; syncPtr.p->m_locked = false; syncPtr.p->m_error = false; } if (restartFlag || type == SubCreateReq::TableEvent) { syncPtr.p->m_doSendSyncData = false; ndbrequire(type != SubCreateReq::SingleTableScan); jam(); if (subPtr.p->m_tables[req.tableId] != 0) { ndbrequire(false); //TODO remove jam(); sendSubCreateRef(signal, req, GrepError::SELECTED_TABLE_ALREADY_ADDED); return; } if (addTableFlag) { ndbrequire(type != SubCreateReq::TableEvent); jam(); } subPtr.p->m_maxTables++; addTableId(req.tableId, subPtr, syncPtr.p); } else { switch(type){ case SubCreateReq::SingleTableScan: { jam(); syncPtr.p->m_tableList.append(&req.tableId, 1); if(signal->getNoOfSections() > 0){ SegmentedSectionPtr ptr; signal->getSection(ptr, SubCreateReq::ATTRIBUTE_LIST); LocalDataBuffer<15> attrBuf(c_dataBufferPool,syncPtr.p->m_attributeList); append(attrBuf, ptr, getSectionSegmentPool()); } } break;#if 0 case SubCreateReq::SelectiveTableSnapshot: /** * Tables specified by the user that does not exist * in the database are just ignored. No error message * is given, nor does the db nodes crash * @todo: Memory is not release here (used tableBuf) */ { if(signal->getNoOfSections() == 0 ){ jam(); sendSubCreateRef(signal, req, GrepError::WRONG_NO_OF_SECTIONS); return; } jam(); SegmentedSectionPtr ptr; signal->getSection(ptr,0);// SubCreateReq::TABLE_LIST); SimplePropertiesSectionReader r0(ptr, getSectionSegmentPool()); Uint32 i=0; char table[MAX_TAB_NAME_SIZE]; r0.reset(); r0.first(); while(true){ if ((r0.getValueType() != SimpleProperties::StringValue) || (r0.getValueLen() <= 0)) { releaseSections(signal); ndbrequire(false); } r0.getString(table); strcpy(subPtr.p->m_tableNames[i],table); i++; if(!r0.next()) break; } releaseSections(signal); subPtr.p->m_maxTables = i; subPtr.p->m_currentTable = 0; releaseSections(signal); convertNameToId(subPtr, signal); return; } break;#endif case SubCreateReq::DatabaseSnapshot: { jam(); } break; default: ndbrequire(false); } } sendSubCreateConf(signal, sender, subPtr); return;}voidSumaParticipant::sendSubCreateConf(Signal* signal, Uint32 sender, SubscriptionPtr subPtr){ SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend(); conf->subscriptionId = subPtr.p->m_subscriptionId; conf->subscriptionKey = subPtr.p->m_subscriptionKey; conf->subscriberData = subPtr.p->m_subscriberData; sendSignal(sender, GSN_SUB_CREATE_CONF, signal, SubCreateConf::SignalLength, JBB);}voidSumaParticipant::sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint32 errCode){ jam(); SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend(); ref->subscriberRef = reference(); ref->subscriberData = req.subscriberData; ref->err = errCode; releaseSections(signal); sendSignal(signal->getSendersBlockRef(), GSN_SUB_CREATE_REF, signal, SubCreateRef::SignalLength, JBB); return;}/********************************************************** * * Setting upp trigger for subscription * */void SumaParticipant::execSUB_SYNC_REQ(Signal* signal) { jamEntry(); CRASH_INSERTION(13004);#ifdef EVENT_PH3_DEBUG ndbout_c("SumaParticipant::execSUB_SYNC_REQ");#endif SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr(); SubscriptionPtr subPtr; Subscription key; key.m_subscriptionId = req->subscriptionId; key.m_subscriptionKey = req->subscriptionKey; if(!c_subscriptions.find(subPtr, key)){ jam(); sendSubSyncRef(signal, GrepError::SUBSCRIPTION_ID_NOT_FOUND); return; } /** * @todo Tomas, do you really need to do this? */ if(subPtr.p->m_subscriptionType == SubCreateReq::TableEvent) { jam(); subPtr.p->m_subscriberData = req->subscriberData; } bool ok = false; SubscriptionData::Part part = (SubscriptionData::Part)req->part; Ptr<SyncRecord> syncPtr; c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); switch(part){ case SubscriptionData::MetaData: ok = true; jam(); syncPtr.p->startMeta(signal); break; case SubscriptionData::TableData: { ok = true; jam(); syncPtr.p->startScan(signal); break; } } ndbrequire(ok);}voidSumaParticipant::sendSubSyncRef(Signal* signal, Uint32 errCode){ jam(); SubSyncRef * ref = (SubSyncRef *)signal->getDataPtrSend(); ref->err = errCode; sendSignal(signal->getSendersBlockRef(), GSN_SUB_SYNC_REF, signal, SubSyncRef::SignalLength, JBB); releaseSections(signal); return;}/********************************************************** * Dict interface */voidSumaParticipant::execGET_TABINFOREF(Signal* signal){ jamEntry(); GetTabInfoRef* const ref = (GetTabInfoRef*)signal->getDataPtr(); SyncRecord* tmp = c_syncPool.getPtr(ref->senderData); tmp->runGET_TABINFOREF(signal);}voidSumaParticipant::execGET_TABINFO_CONF(Signal* signal){ jamEntry(); CRASH_INSERTION(13006); if(!assembleFragments(signal)){ return; } GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr(); Uint32 tableId = conf->tableId; Uint32 senderData = conf->senderData; SyncRecord* tmp = c_syncPool.getPtr(senderData); ndbrequire(parseTable(signal, conf, tableId, tmp)); tmp->runGET_TABINFO_CONF(signal);}boolSumaParticipant::parseTable(Signal* signal, GetTabInfoConf* conf, Uint32 tableId, SyncRecord* syncPtr_p){ SegmentedSectionPtr ptr; signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -