📄 rf_parityloggingdags.c
字号:
pda = asmap->physInfo; for (i=0; i < nWndNodes; i++) { rf_InitNode(&wndNodes[i], rf_wait, RF_TRUE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList); RF_ASSERT(pda != NULL); wndNodes[i].params[0].p = pda; wndNodes[i].params[1].p = pda->bufPtr; wndNodes[i].params[2].v = parityStripeID; wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); pda = pda->next; } /* initialize the redundancy node */ rf_InitNode(xorNode, rf_wait, RF_TRUE, redFunc, rf_NullNodeUndoFunc, NULL, 1, 1, 2*(nWndNodes+nRodNodes)+1, 1, dag_h, "Xr ", allocList); xorNode->flags |= RF_DAGNODE_FLAG_YIELD; for (i=0; i < nWndNodes; i++) { xorNode->params[2*i+0] = wndNodes[i].params[0]; /* pda */ xorNode->params[2*i+1] = wndNodes[i].params[1]; /* buf ptr */ } for (i=0; i < nRodNodes; i++) { xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0]; /* pda */ xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1]; /* buf ptr */ } xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr; /* xor node needs to get at RAID information */ /* look for an Rod node that reads a complete SU. If none, alloc a buffer to receive the parity info. * Note that we can't use a new data buffer because it will not have gotten written when the xor occurs. */ for (i = 0; i < nRodNodes; i++) if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit) break; if (i == nRodNodes) { RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList); } else { xorNode->results[0] = rodNodes[i].params[1].p; } /* initialize the Lpo node */ rf_InitNode(lpoNode, rf_wait, RF_FALSE, rf_ParityLogOverwriteFunc, rf_ParityLogOverwriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Lpo", allocList); lpoNode->params[0].p = asmap->parityInfo; lpoNode->params[1].p = xorNode->results[0]; RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must describe entire parity unit */ /* connect nodes to form graph */ /* connect dag header to block node */ RF_ASSERT(dag_h->numSuccedents == 1); RF_ASSERT(blockNode->numAntecedents == 0); dag_h->succedents[0] = blockNode; /* connect the block node to the Rod nodes */ RF_ASSERT(blockNode->numSuccedents == nRodNodes + 1); for (i = 0; i < nRodNodes; i++) { RF_ASSERT(rodNodes[i].numAntecedents == 1); blockNode->succedents[i] = &rodNodes[i]; rodNodes[i].antecedents[0] = blockNode; rodNodes[i].antType[0] = rf_control; } /* connect the block node to the sync node */ /* necessary if nRodNodes == 0 */ RF_ASSERT(syncNode->numAntecedents == nRodNodes + 1); blockNode->succedents[nRodNodes] = syncNode; syncNode->antecedents[0] = blockNode; syncNode->antType[0] = rf_control; /* connect the Rod nodes to the syncNode */ for (i = 0; i < nRodNodes; i++) { rodNodes[i].succedents[0] = syncNode; syncNode->antecedents[1 + i] = &rodNodes[i]; syncNode->antType[1 + i] = rf_control; } /* connect the sync node to the xor node */ RF_ASSERT(syncNode->numSuccedents == nWndNodes + 1); RF_ASSERT(xorNode->numAntecedents == 1); syncNode->succedents[0] = xorNode; xorNode->antecedents[0] = syncNode; xorNode->antType[0] = rf_trueData; /* carry forward from sync */ /* connect the sync node to the Wnd nodes */ for (i = 0; i < nWndNodes; i++) { RF_ASSERT(wndNodes->numAntecedents == 1); syncNode->succedents[1 + i] = &wndNodes[i]; wndNodes[i].antecedents[0] = syncNode; wndNodes[i].antType[0] = rf_control; } /* connect the xor node to the Lpo node */ RF_ASSERT(xorNode->numSuccedents == 1); RF_ASSERT(lpoNode->numAntecedents == 1); xorNode->succedents[0] = lpoNode; lpoNode->antecedents[0]= xorNode; lpoNode->antType[0] = rf_trueData; /* connect the Wnd nodes to the unblock node */ RF_ASSERT(unblockNode->numAntecedents == nWndNodes + 1); for (i = 0; i < nWndNodes; i++) { RF_ASSERT(wndNodes->numSuccedents == 1); wndNodes[i].succedents[0] = unblockNode; unblockNode->antecedents[i] = &wndNodes[i]; unblockNode->antType[i] = rf_control; } /* connect the Lpo node to the unblock node */ RF_ASSERT(lpoNode->numSuccedents == 1); lpoNode->succedents[0] = unblockNode; unblockNode->antecedents[nWndNodes] = lpoNode; unblockNode->antType[nWndNodes] = rf_control; /* connect unblock node to terminator */ RF_ASSERT(unblockNode->numSuccedents == 1); RF_ASSERT(termNode->numAntecedents == 1); RF_ASSERT(termNode->numSuccedents == 0); unblockNode->succedents[0] = termNode; termNode->antecedents[0] = unblockNode; termNode->antType[0] = rf_control;}/****************************************************************************** * * creates a DAG to perform a small-write operation (either raid 5 or pq), which is as follows: * * Header * | * Block * / | ... \ \ * / | \ \ * Rod Rod Rod Rop * | \ /| \ / | \/ | * | | | /\ | * Wnd Wnd Wnd X * | \ / | * | \ / | * \ \ / Lpo * \ \ / / * +-> Unblock <-+ * | * T * * * R = Read, W = Write, X = Xor, o = old, n = new, d = data, p = parity. * When the access spans a stripe unit boundary and is less than one SU in size, there will * be two Rop -- X -- Wnp branches. I call this the "double-XOR" case. * The second output from each Rod node goes to the X node. In the double-XOR * case, there are exactly 2 Rod nodes, and each sends one output to one X node. * There is one Rod -- Wnd -- T branch for each stripe unit being updated. * * The block and unblock nodes are unused. See comment above CreateFaultFreeReadDAG. * * Note: this DAG ignores all the optimizations related to making the RMWs atomic. * it also has the nasty property that none of the buffers allocated for reading * old data & parity can be freed until the XOR node fires. Need to fix this. * * A null qfuncs indicates single fault tolerant *****************************************************************************/void rf_CommonCreateParityLoggingSmallWriteDAG( RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags, RF_AllocListElem_t *allocList, RF_RedFuncs_t *pfuncs, RF_RedFuncs_t *qfuncs){ RF_DagNode_t *xorNodes, *blockNode, *unblockNode, *nodes; RF_DagNode_t *readDataNodes, *readParityNodes; RF_DagNode_t *writeDataNodes, *lpuNodes; RF_DagNode_t *unlockDataNodes, *termNode; RF_PhysDiskAddr_t *pda = asmap->physInfo; int numDataNodes = asmap->numStripeUnitsAccessed; int numParityNodes = (asmap->parityInfo->next) ? 2 : 1; int i, j, nNodes, totalNumNodes; RF_ReconUnitNum_t which_ru; int (*func)(), (*undoFunc)(); int (*qfunc)(); char *name, *qname; RF_StripeNum_t parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru); long nfaults = qfuncs ? 2 : 1; int lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */ if (rf_dagDebug) printf("[Creating parity-logging small-write DAG]\n"); RF_ASSERT(numDataNodes > 0); RF_ASSERT(nfaults == 1); dag_h->creator = "ParityLoggingSmallWriteDAG"; /* DAG creation occurs in three steps: 1. count the number of nodes in the DAG 2. create the nodes 3. initialize the nodes 4. connect the nodes */ /* Step 1. compute number of nodes in the graph */ /* number of nodes: a read and write for each data unit a redundancy computation node for each parity node a read and Lpu for each parity unit a block and unblock node (2) a terminator node if atomic RMW an unlock node for each data unit, redundancy unit */ totalNumNodes = (2 * numDataNodes) + numParityNodes + (2 * numParityNodes) + 3; if (lu_flag) totalNumNodes += numDataNodes; nNodes = numDataNodes + numParityNodes; dag_h->numCommitNodes = numDataNodes + numParityNodes; dag_h->numCommits = 0; dag_h->numSuccedents = 1; /* Step 2. create the nodes */ RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); i = 0; blockNode = &nodes[i]; i += 1; unblockNode = &nodes[i]; i += 1; readDataNodes = &nodes[i]; i += numDataNodes; readParityNodes = &nodes[i]; i += numParityNodes; writeDataNodes = &nodes[i]; i += numDataNodes; lpuNodes = &nodes[i]; i += numParityNodes; xorNodes = &nodes[i]; i += numParityNodes; termNode = &nodes[i]; i += 1; if (lu_flag) { unlockDataNodes = &nodes[i]; i += numDataNodes; } RF_ASSERT(i == totalNumNodes); /* Step 3. initialize the nodes */ /* initialize block node (Nil) */ rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList); /* initialize unblock node (Nil) */ rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, nNodes, 0, 0, dag_h, "Nil", allocList); /* initialize terminatory node (Trm) */ rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList); /* initialize nodes which read old data (Rod) */ for (i = 0; i < numDataNodes; i++) { rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, nNodes, 1, 4, 0, dag_h, "Rod", allocList); RF_ASSERT(pda != NULL);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -