📄 rf_dagffwr.c
字号:
xorNodes[i].succedents[0] = commitNode; commitNode->antecedents[i] = &xorNodes[i]; commitNode->antType[i] = rf_control; } /* connect q nodes to commit node */ if (nfaults == 2) { for (i = 0; i < numParityNodes; i++) { RF_ASSERT(qNodes[i].numSuccedents == 1); qNodes[i].succedents[0] = commitNode; commitNode->antecedents[i + numParityNodes] = &qNodes[i]; commitNode->antType[i + numParityNodes] = rf_control; } } /* connect commit node to write nodes */ RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes))); for (i = 0; i < numDataNodes; i++) { RF_ASSERT(writeDataNodes[i].numAntecedents == 1); commitNode->succedents[i] = &writeDataNodes[i]; writeDataNodes[i].antecedents[0] = commitNode; writeDataNodes[i].antType[0] = rf_trueData; } for (i = 0; i < numParityNodes; i++) { RF_ASSERT(writeParityNodes[i].numAntecedents == 1); commitNode->succedents[i + numDataNodes] = &writeParityNodes[i]; writeParityNodes[i].antecedents[0] = commitNode; writeParityNodes[i].antType[0] = rf_trueData; } if (nfaults == 2) { for (i = 0; i < numParityNodes; i++) { RF_ASSERT(writeQNodes[i].numAntecedents == 1); commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i]; writeQNodes[i].antecedents[0] = commitNode; writeQNodes[i].antType[0] = rf_trueData; } } RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); RF_ASSERT(termNode->numSuccedents == 0); for (i = 0; i < numDataNodes; i++) { if (lu_flag) { /* connect write new data nodes to unlock nodes */ RF_ASSERT(writeDataNodes[i].numSuccedents == 1); RF_ASSERT(unlockDataNodes[i].numAntecedents == 1); writeDataNodes[i].succedents[0] = &unlockDataNodes[i]; unlockDataNodes[i].antecedents[0] = &writeDataNodes[i]; unlockDataNodes[i].antType[0] = rf_control; /* connect unlock nodes to term node */ RF_ASSERT(unlockDataNodes[i].numSuccedents == 1); unlockDataNodes[i].succedents[0] = termNode; termNode->antecedents[i] = &unlockDataNodes[i]; termNode->antType[i] = rf_control; } else { /* connect write new data nodes to term node */ RF_ASSERT(writeDataNodes[i].numSuccedents == 1); RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); writeDataNodes[i].succedents[0] = termNode; termNode->antecedents[i] = &writeDataNodes[i]; termNode->antType[i] = rf_control; } } for (i = 0; i < numParityNodes; i++) { if (lu_flag) { /* connect write new parity nodes to unlock nodes */ RF_ASSERT(writeParityNodes[i].numSuccedents == 1); RF_ASSERT(unlockParityNodes[i].numAntecedents == 1); writeParityNodes[i].succedents[0] = &unlockParityNodes[i]; unlockParityNodes[i].antecedents[0] = &writeParityNodes[i]; unlockParityNodes[i].antType[0] = rf_control; /* connect unlock nodes to term node */ RF_ASSERT(unlockParityNodes[i].numSuccedents == 1); unlockParityNodes[i].succedents[0] = termNode; termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i]; termNode->antType[numDataNodes + i] = rf_control; } else { RF_ASSERT(writeParityNodes[i].numSuccedents == 1); writeParityNodes[i].succedents[0] = termNode; termNode->antecedents[numDataNodes + i] = &writeParityNodes[i]; termNode->antType[numDataNodes + i] = rf_control; } } if (nfaults == 2) { for (i = 0; i < numParityNodes; i++) { if (lu_flag) { /* connect write new Q nodes to unlock nodes */ RF_ASSERT(writeQNodes[i].numSuccedents == 1); RF_ASSERT(unlockQNodes[i].numAntecedents == 1); writeQNodes[i].succedents[0] = &unlockQNodes[i]; unlockQNodes[i].antecedents[0] = &writeQNodes[i]; unlockQNodes[i].antType[0] = rf_control; /* connect unlock nodes to unblock node */ RF_ASSERT(unlockQNodes[i].numSuccedents == 1); unlockQNodes[i].succedents[0] = termNode; termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i]; termNode->antType[numDataNodes + numParityNodes + i] = rf_control; } else { RF_ASSERT(writeQNodes[i].numSuccedents == 1); writeQNodes[i].succedents[0] = termNode; termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i]; termNode->antType[numDataNodes + numParityNodes + i] = rf_control; } } }}/****************************************************************************** * create a write graph (fault-free or degraded) for RAID level 1 * * Hdr -> Commit -> Wpd -> Nil -> Trm * -> Wsd -> * * The "Wpd" node writes data to the primary copy in the mirror pair * The "Wsd" node writes data to the secondary copy in the mirror pair * * Parameters: raidPtr - description of the physical array * asmap - logical & physical addresses for this access * bp - buffer ptr (holds write data) * flags - general flags (e.g. disk locking) * allocList - list of memory allocated in DAG creation *****************************************************************************/void rf_CreateRaidOneWriteDAG( RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags, RF_AllocListElem_t *allocList){ RF_DagNode_t *unblockNode, *termNode, *commitNode; RF_DagNode_t *nodes, *wndNode, *wmirNode; int nWndNodes, nWmirNodes, i; RF_ReconUnitNum_t which_ru; RF_PhysDiskAddr_t *pda, *pdaP; RF_StripeNum_t parityStripeID; parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru); if (rf_dagDebug) { printf("[Creating RAID level 1 write DAG]\n"); } dag_h->creator = "RaidOneWriteDAG"; /* 2 implies access not SU aligned */ nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; nWndNodes = (asmap->physInfo->next) ? 2 : 1; /* alloc the Wnd nodes and the Wmir node */ if (asmap->numDataFailed == 1) nWndNodes--; if (asmap->numParityFailed == 1) nWmirNodes--; /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock + terminator) */ RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); i = 0; wndNode = &nodes[i]; i += nWndNodes; wmirNode = &nodes[i]; i += nWmirNodes; commitNode = &nodes[i]; i += 1; unblockNode = &nodes[i]; i += 1; termNode = &nodes[i]; i += 1; RF_ASSERT(i == (nWndNodes + nWmirNodes + 3)); /* this dag can commit immediately */ dag_h->numCommitNodes = 1; dag_h->numCommits = 0; dag_h->numSuccedents = 1; /* initialize the commit, unblock, and term nodes */ rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Cmt", allocList); rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList); rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList); /* initialize the wnd nodes */ if (nWndNodes > 0) { pda = asmap->physInfo; for (i = 0; i < nWndNodes; i++) { rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList); RF_ASSERT(pda != NULL); wndNode[i].params[0].p = pda; wndNode[i].params[1].p = pda->bufPtr; wndNode[i].params[2].v = parityStripeID; wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); pda = pda->next; } RF_ASSERT(pda == NULL); } /* initialize the mirror nodes */ if (nWmirNodes > 0) { pda = asmap->physInfo; pdaP = asmap->parityInfo; for (i = 0; i < nWmirNodes; i++) { rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList); RF_ASSERT(pda != NULL); wmirNode[i].params[0].p = pdaP; wmirNode[i].params[1].p = pda->bufPtr; wmirNode[i].params[2].v = parityStripeID; wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); pda = pda->next; pdaP = pdaP->next; } RF_ASSERT(pda == NULL); RF_ASSERT(pdaP == NULL); } /* link the header node to the commit node */ RF_ASSERT(dag_h->numSuccedents == 1); RF_ASSERT(commitNode->numAntecedents == 0); dag_h->succedents[0] = commitNode; /* link the commit node to the write nodes */ RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes)); for (i = 0; i < nWndNodes; i++) { RF_ASSERT(wndNode[i].numAntecedents == 1); commitNode->succedents[i] = &wndNode[i]; wndNode[i].antecedents[0] = commitNode; wndNode[i].antType[0] = rf_control; } for (i = 0; i < nWmirNodes; i++) { RF_ASSERT(wmirNode[i].numAntecedents == 1); commitNode->succedents[i + nWndNodes] = &wmirNode[i]; wmirNode[i].antecedents[0] = commitNode; wmirNode[i].antType[0] = rf_control; } /* link the write nodes to the unblock node */ RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes)); for (i = 0; i < nWndNodes; i++) { RF_ASSERT(wndNode[i].numSuccedents == 1); wndNode[i].succedents[0] = unblockNode; unblockNode->antecedents[i] = &wndNode[i]; unblockNode->antType[i] = rf_control; } for (i = 0; i < nWmirNodes; i++) { RF_ASSERT(wmirNode[i].numSuccedents == 1); wmirNode[i].succedents[0] = unblockNode; unblockNode->antecedents[i + nWndNodes] = &wmirNode[i]; unblockNode->antType[i + nWndNodes] = rf_control; } /* link the unblock node to the term node */ RF_ASSERT(unblockNode->numSuccedents == 1); RF_ASSERT(termNode->numAntecedents == 1); RF_ASSERT(termNode->numSuccedents == 0); unblockNode->succedents[0] = termNode; termNode->antecedents[0] = unblockNode; termNode->antType[0] = rf_control;}/* DAGs which have no commit points. * * The following DAGs are used in forward and backward error recovery experiments. * They are identical to the DAGs above this comment with the exception that the * the commit points have been removed. */void rf_CommonCreateLargeWriteDAGFwd( RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags, RF_AllocListElem_t *allocList, int nfaults, int (*redFunc)(), int allowBufferRecycle){ RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode; RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode; int nWndNodes, nRodNodes, i, nodeNum, asmNum; RF_AccessStripeMapHeader_t *new_asm_h[2]; RF_StripeNum_t parityStripeID; char *sosBuffer, *eosBuffer; RF_ReconUnitNum_t which_ru; RF_RaidLayout_t *layoutPtr; RF_PhysDiskAddr_t *pda; layoutPtr = &(raidPtr->Layout); parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru); if (rf_dagDebug) printf("[Creating large-write DAG]\n"); dag_h->creator = "LargeWriteDAGFwd"; dag_h->numCommitNodes = 0; dag_h->numCommits = 0; dag_h->numSuccedents = 1; /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */ nWndNodes = asmap->numStripeUnitsAccessed; RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); i = 0; wndNodes = &nodes[i]; i += nWndNodes; xorNode = &nodes[i]; i += 1; wnpNode = &nodes[i]; i += 1; blockNode = &nodes[i]; i += 1; syncNode = &nodes[i]; i += 1; termNode = &nodes[i]; i += 1; if (nfaults == 2) { wnqNode = &nodes[i]; i += 1; } else { wnqNode = NULL; } rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList); if (nRodNodes > 0) { RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); } else { rodNodes = NULL; } /* begin node initialization */ if (nRodNodes > 0) { rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList); rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0, dag_h, "Nil", allocList); } else { rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil", allocList); rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h, "Nil", allocList); } rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList); /* initialize the Rod nodes */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -