⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 trix.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 3 页
字号:
  buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);}void Trix::execSUB_SYNC_CONTINUE_REQ(Signal* signal){  SubSyncContinueReq  * subSyncContinueReq =     (SubSyncContinueReq *) signal->getDataPtr();    SubscriptionRecPtr subRecPtr;  SubscriptionRecord* subRec;  subRecPtr.i = subSyncContinueReq->subscriberData;  if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {    printf("Trix::execSUB_SYNC_CONTINUE_REQ: Failed to find subscription data %u\n", subRecPtr.i);    return;  }  subRecPtr.p = subRec;  subRec->pendingSubSyncContinueConf = true;  checkParallelism(signal, subRec);}void Trix::execSUB_META_DATA(Signal* signal){  jamEntry();}void Trix::execSUB_TABLE_DATA(Signal* signal){  jamEntry();  SubTableData * subTableData = (SubTableData *)signal->getDataPtr();  SubscriptionRecPtr subRecPtr;  SubscriptionRecord* subRec;  subRecPtr.i = subTableData->subscriberData;  if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {    printf("Trix::execSUB_TABLE_DATA: Failed to find subscription data %u\n", subRecPtr.i);    return;  }  subRecPtr.p = subRec;  SegmentedSectionPtr headerPtr, dataPtr;  if (!signal->getSection(headerPtr, 0)) {    printf("Trix::execSUB_TABLE_DATA: Failed to get header section\n");  }  if (!signal->getSection(dataPtr, 1)) {    printf("Trix::execSUB_TABLE_DATA: Failed to get data section\n");  }  executeInsertTransaction(signal, subRecPtr, headerPtr, dataPtr);}void Trix::setupSubscription(Signal* signal, SubscriptionRecPtr subRecPtr){  Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2];  SubCreateReq * subCreateReq = (SubCreateReq *)signal->getDataPtrSend();  SubscriptionRecord* subRec = subRecPtr.p;//  Uint32 listLen = subRec->noOfIndexColumns + subRec->noOfKeyColumns;  AttrOrderBuffer::DataBufferIterator iter;  Uint32 i = 0;  jam();  bool moreAttributes = subRec->attributeOrder.first(iter);  while (moreAttributes) {    attributeList[i++] = *iter.data;    moreAttributes = subRec->attributeOrder.next(iter);  }  // Merge index and key column segments  struct LinearSectionPtr orderPtr[3];  orderPtr[0].p = attributeList;  orderPtr[0].sz = subRec->attributeOrder.getSize();  subCreateReq->subscriberRef = reference();  subCreateReq->subscriberData = subRecPtr.i;  subCreateReq->subscriptionId = subRec->subscriptionId;  subCreateReq->subscriptionKey = subRec->subscriptionKey;  subCreateReq->tableId = subRec->sourceTableId;  subCreateReq->subscriptionType = SubCreateReq::SingleTableScan;    sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ, 	     signal, SubCreateReq::SignalLength+1, JBB, orderPtr, 1);}void Trix::setupTableScan(Signal* signal, SubscriptionRecPtr subRecPtr){  SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();  jam();  subSyncReq->subscriptionId = subRecPtr.i;  subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey;  subSyncReq->part = SubscriptionData::MetaData;  sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, 	     signal, SubSyncReq::SignalLength, JBB);}void Trix::startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr){  jam();  subRecPtr.p->expectedConf = 1;  SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();  subSyncReq->subscriptionId = subRecPtr.i;  subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey;  subSyncReq->part = SubscriptionData::TableData;  sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, 	     signal, SubSyncReq::SignalLength, JBB);}void Trix::prepareInsertTransactions(Signal* signal, 				     SubscriptionRecPtr subRecPtr){  SubscriptionRecord* subRec = subRecPtr.p;  UtilPrepareReq * utilPrepareReq =     (UtilPrepareReq *)signal->getDataPtrSend();    jam();  utilPrepareReq->senderRef = reference();  utilPrepareReq->senderData = subRecPtr.i;  const Uint32 pageSizeInWords = 128;  Uint32 propPage[pageSizeInWords];  LinearWriter w(&propPage[0],128);  w.first();  w.add(UtilPrepareReq::NoOfOperations, 1);  w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);  w.add(UtilPrepareReq::TableId, subRec->targetTableId);  // Add index attributes in increasing order and one PK attribute  for(Uint32 i = 0; i < subRec->noOfIndexColumns + 1; i++)    w.add(UtilPrepareReq::AttributeId, i);#if 0  // Debugging  SimplePropertiesLinearReader reader(propPage, w.getWordsUsed());  printf("Trix::prepareInsertTransactions: Sent SimpleProperties:\n");  reader.printAll(ndbout);#endif  struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];  sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;  sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();  sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,	     UtilPrepareReq::SignalLength, JBB, 	     sectionsPtr, UtilPrepareReq::NoOfSections);}void Trix::executeInsertTransaction(Signal* signal, 				    SubscriptionRecPtr subRecPtr,				    SegmentedSectionPtr headerPtr,				    SegmentedSectionPtr dataPtr){  jam();  SubscriptionRecord* subRec = subRecPtr.p;  UtilExecuteReq * utilExecuteReq =     (UtilExecuteReq *)signal->getDataPtrSend();  Uint32* headerBuffer = signal->theData + 25;  Uint32*  dataBuffer = headerBuffer + headerPtr.sz;  utilExecuteReq->senderRef = reference();  utilExecuteReq->senderData = subRecPtr.i;  utilExecuteReq->prepareId = subRec->prepareId;#if 0  printf("Header size %u\n", headerPtr.sz);  for(int i = 0; i < headerPtr.sz; i++)    printf("H'%.8x ", headerBuffer[i]);  printf("\n");    printf("Data size %u\n", dataPtr.sz);  for(int i = 0; i < dataPtr.sz; i++)    printf("H'%.8x ", dataBuffer[i]);  printf("\n");#endif  // Save scan result in linear buffers  copy(headerBuffer, headerPtr);  copy(dataBuffer, dataPtr);  // Calculate packed key size  Uint32 noOfKeyData = 0;  for(Uint32 i = 0; i < headerPtr.sz; i++) {    AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i;    // Filter out NULL attributes    if (keyAttrHead->isNULL())      return;    if (i < subRec->noOfIndexColumns)      // Renumber index attributes in consequtive order      keyAttrHead->setAttributeId(i);    else      // Calculate total size of PK attribute      noOfKeyData += keyAttrHead->getDataSize();  }  // Increase expected CONF count  subRec->expectedConf++;  // Pack key attributes  AttributeHeader::init(headerBuffer + subRec->noOfIndexColumns,			subRec->noOfIndexColumns,			noOfKeyData);  struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections];  sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer;  sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz =     subRec->noOfIndexColumns + 1;  sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer;  sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz;  sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,	     UtilExecuteReq::SignalLength, JBB,	     sectionsPtr, UtilExecuteReq::NoOfSections);}void Trix::buildComplete(Signal* signal, SubscriptionRecPtr subRecPtr){  SubRemoveReq * const req = (SubRemoveReq*)signal->getDataPtrSend();  req->senderRef       = reference();  req->senderData      = subRecPtr.i;  req->subscriptionId  = subRecPtr.p->subscriptionId;  req->subscriptionKey = subRecPtr.p->subscriptionKey;  sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal,	     SubRemoveReq::SignalLength, JBB);}void Trix::buildFailed(Signal* signal, 		       SubscriptionRecPtr subRecPtr, 		       BuildIndxRef::ErrorCode errorCode){  SubscriptionRecord* subRec = subRecPtr.p;  subRec->errorCode = errorCode;  // Continue accumulating since we currently cannot stop SUMA  subRec->expectedConf--;  checkParallelism(signal, subRec);  if (subRec->expectedConf == 0)    buildComplete(signal, subRecPtr);}voidTrix::execSUB_REMOVE_REF(Signal* signal){  jamEntry();  //@todo  ndbrequire(false);}voidTrix::execSUB_REMOVE_CONF(Signal* signal){  jamEntry();  SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();  SubscriptionRecPtr subRecPtr;  c_theSubscriptions.getPtr(subRecPtr, conf->senderData);  if(subRecPtr.p->prepareId != RNIL){    jam();    UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend();    req->prepareId = subRecPtr.p->prepareId;    req->senderData = subRecPtr.i;    sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal, 	       UtilReleaseReq::SignalLength , JBB);      return;  }    {    UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();    conf->senderData = subRecPtr.i;    execUTIL_RELEASE_CONF(signal);  }}voidTrix::execUTIL_RELEASE_REF(Signal* signal){  jamEntry();  ndbrequire(false);}voidTrix::execUTIL_RELEASE_CONF(Signal* signal){    UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();    SubscriptionRecPtr subRecPtr;  c_theSubscriptions.getPtr(subRecPtr, conf->senderData);    if(subRecPtr.p->errorCode == BuildIndxRef::NoError){    // Build is complete, reply to original sender    BuildIndxConf * buildIndxConf = (BuildIndxConf *)signal->getDataPtrSend();    buildIndxConf->setUserRef(subRecPtr.p->userReference);    buildIndxConf->setConnectionPtr(subRecPtr.p->connectionPtr);    buildIndxConf->setRequestType(BuildIndxReq::RT_TRIX);    buildIndxConf->setIndexType(subRecPtr.p->indexType);    buildIndxConf->setTableId(subRecPtr.p->sourceTableId);    buildIndxConf->setIndexId(subRecPtr.p->targetTableId);        sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXCONF, signal, 	       BuildIndxConf::SignalLength , JBB);    } else {    // Build failed, reply to original sender    BuildIndxRef * buildIndxRef = (BuildIndxRef *)signal->getDataPtrSend();    buildIndxRef->setUserRef(subRecPtr.p->userReference);    buildIndxRef->setConnectionPtr(subRecPtr.p->connectionPtr);    buildIndxRef->setRequestType(BuildIndxReq::RT_TRIX);    buildIndxRef->setIndexType(subRecPtr.p->indexType);    buildIndxRef->setTableId(subRecPtr.p->sourceTableId);    buildIndxRef->setIndexId(subRecPtr.p->targetTableId);    buildIndxRef->setErrorCode(subRecPtr.p->errorCode);        sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXREF, signal, 	       BuildIndxRef::SignalLength , JBB);    }  // Release subscription record  subRecPtr.p->attributeOrder.release();  c_theSubscriptions.release(subRecPtr.i);}void Trix::checkParallelism(Signal* signal, SubscriptionRecord* subRec){  if ((subRec->pendingSubSyncContinueConf) &&      (subRec->expectedConf < subRec->parallelism)) {    SubSyncContinueConf  * subSyncContinueConf =       (SubSyncContinueConf *) signal->getDataPtrSend();    subSyncContinueConf->subscriptionId = subRec->subscriptionId;    subSyncContinueConf->subscriptionKey = subRec->subscriptionKey;    sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal, 	       SubSyncContinueConf::SignalLength , JBB);      subRec->pendingSubSyncContinueConf = false;  }}BLOCK_FUNCTIONS(Trix)template void append(DataBuffer<15>&,SegmentedSectionPtr,SectionSegmentPool&);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -