⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rf_dagffwr.c

📁 RAIDFrame是个非常好的磁盘阵列RAID仿真工具
💻 C
📖 第 1 页 / 共 5 页
字号:
  for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {    if (new_asm_h[asmNum]) {      pda = new_asm_h[asmNum]->stripeMap->physInfo;      while (pda) {	rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Rod", allocList);	rodNodes[nodeNum].params[0].p = pda;	rodNodes[nodeNum].params[1].p = pda->bufPtr;	rodNodes[nodeNum].params[2].v = parityStripeID;	rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);	nodeNum++;	pda=pda->next;      }    }  }  RF_ASSERT(nodeNum == nRodNodes);  /* initialize the wnd nodes */  pda = asmap->physInfo;  for (i=0; i < nWndNodes; i++) {    rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);    RF_ASSERT(pda != NULL);    wndNodes[i].params[0].p = pda;    wndNodes[i].params[1].p = pda->bufPtr;    wndNodes[i].params[2].v = parityStripeID;    wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);    pda = pda->next;  }  /* initialize the redundancy node */  rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);  xorNode->flags |= RF_DAGNODE_FLAG_YIELD;  for (i=0; i < nWndNodes; i++) {    xorNode->params[2*i+0] = wndNodes[i].params[0];         /* pda */    xorNode->params[2*i+1] = wndNodes[i].params[1];         /* buf ptr */  }  for (i=0; i < nRodNodes; i++) {    xorNode->params[2*(nWndNodes+i)+0] = rodNodes[i].params[0];         /* pda */    xorNode->params[2*(nWndNodes+i)+1] = rodNodes[i].params[1];         /* buf ptr */  }  xorNode->params[2*(nWndNodes+nRodNodes)].p = raidPtr; /* xor node needs to get at RAID information */    /* look for an Rod node that reads a complete SU.  If none, alloc a buffer to receive the parity info.   * Note that we can't use a new data buffer because it will not have gotten written when the xor occurs.   */  if (allowBufferRecycle) {    for (i = 0; i < nRodNodes; i++)      if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)        break;  }  if ((!allowBufferRecycle) || (i == nRodNodes)) {    RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);  }  else    xorNode->results[0] = rodNodes[i].params[1].p;  /* initialize the Wnp node */  rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);  wnpNode->params[0].p = asmap->parityInfo;  wnpNode->params[1].p = xorNode->results[0];  wnpNode->params[2].v = parityStripeID;  wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);  RF_ASSERT(asmap->parityInfo->next == NULL);        /* parityInfo must describe entire parity unit */  if (nfaults == 2)    {      /* we never try to recycle a buffer for the Q calcuation in addition to the parity.	 This would cause two buffers to get smashed during the P and Q calculation, 	 guaranteeing one would be wrong.      */      RF_CallocAndAdd(xorNode->results[1], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList);       rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);      wnqNode->params[0].p = asmap->qInfo;      wnqNode->params[1].p = xorNode->results[1];      wnqNode->params[2].v = parityStripeID;      wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);      RF_ASSERT(asmap->parityInfo->next == NULL);        /* parityInfo must describe entire parity unit */    }  /* connect nodes to form graph */  /* connect dag header to block node */  RF_ASSERT(blockNode->numAntecedents == 0);  dag_h->succedents[0] = blockNode;  if (nRodNodes > 0) {    /* connect the block node to the Rod nodes */    RF_ASSERT(blockNode->numSuccedents == nRodNodes);    RF_ASSERT(syncNode->numAntecedents == nRodNodes);    for (i = 0; i < nRodNodes; i++) {      RF_ASSERT(rodNodes[i].numAntecedents == 1);      blockNode->succedents[i] = &rodNodes[i];      rodNodes[i].antecedents[0] = blockNode;      rodNodes[i].antType[0] = rf_control;      /* connect the Rod nodes to the Nil node */      RF_ASSERT(rodNodes[i].numSuccedents == 1);      rodNodes[i].succedents[0] = syncNode;      syncNode->antecedents[i] = &rodNodes[i];      syncNode->antType[i] = rf_trueData;    }  }  else {    /* connect the block node to the Nil node */    RF_ASSERT(blockNode->numSuccedents == 1);    RF_ASSERT(syncNode->numAntecedents == 1);    blockNode->succedents[0] = syncNode;    syncNode->antecedents[0] = blockNode;    syncNode->antType[0] = rf_control;  }  /* connect the sync node to the Wnd nodes */  RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes));  for (i = 0; i < nWndNodes; i++) {    RF_ASSERT(wndNodes->numAntecedents == 1);    syncNode->succedents[i] = &wndNodes[i];    wndNodes[i].antecedents[0] = syncNode;    wndNodes[i].antType[0] = rf_control;  }  /* connect the sync node to the Xor node */  RF_ASSERT(xorNode->numAntecedents == 1);  syncNode->succedents[nWndNodes] = xorNode;  xorNode->antecedents[0] = syncNode;  xorNode->antType[0] = rf_control;  /* connect the xor node to the write parity node */  RF_ASSERT(xorNode->numSuccedents == nfaults);  RF_ASSERT(wnpNode->numAntecedents == 1);  xorNode->succedents[0] = wnpNode;  wnpNode->antecedents[0]= xorNode;  wnpNode->antType[0] = rf_trueData;  if (nfaults == 2) {    RF_ASSERT(wnqNode->numAntecedents == 1);    xorNode->succedents[1] = wnqNode;    wnqNode->antecedents[0] = xorNode;    wnqNode->antType[0] = rf_trueData;  }  /* connect the write nodes to the term node */  RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);  RF_ASSERT(termNode->numSuccedents == 0);  for (i = 0; i < nWndNodes; i++) {    RF_ASSERT(wndNodes->numSuccedents == 1);    wndNodes[i].succedents[0] = termNode;    termNode->antecedents[i] = &wndNodes[i];    termNode->antType[i] = rf_control;  }  RF_ASSERT(wnpNode->numSuccedents == 1);  wnpNode->succedents[0] = termNode;  termNode->antecedents[nWndNodes] = wnpNode;  termNode->antType[nWndNodes] = rf_control;  if (nfaults == 2) {    RF_ASSERT(wnqNode->numSuccedents == 1);    wnqNode->succedents[0] = termNode;    termNode->antecedents[nWndNodes + 1] = wnqNode;    termNode->antType[nWndNodes + 1] = rf_control;  }}/****************************************************************************** * * creates a DAG to perform a small-write operation (either raid 5 or pq), * which is as follows: * * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm *            \- Rod X- Wnd [Und] -------/ *           [\- Rod X- Wnd [Und] ------/] *           [\- Roq - Q --> Wnq [Unq]-/] * * Rop = read old parity * Rod = read old data * Roq = read old "q" * Cmt = commit node * Und = unlock data disk * Unp = unlock parity disk * Unq = unlock q disk * Wnp = write new parity * Wnd = write new data * Wnq = write new "q" * [ ] denotes optional segments in the graph * * Parameters:  raidPtr   - description of the physical array *              asmap     - logical & physical addresses for this access *              bp        - buffer ptr (holds write data) *              flags     - general flags (e.g. disk locking)  *              allocList - list of memory allocated in DAG creation *              pfuncs    - list of parity generating functions *              qfuncs    - list of q generating functions * * A null qfuncs indicates single fault tolerant *****************************************************************************/void rf_CommonCreateSmallWriteDAGFwd(  RF_Raid_t             *raidPtr,  RF_AccessStripeMap_t  *asmap,  RF_DagHeader_t        *dag_h,  void                  *bp,  RF_RaidAccessFlags_t   flags,  RF_AllocListElem_t    *allocList,  RF_RedFuncs_t         *pfuncs,  RF_RedFuncs_t         *qfuncs){  RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;  RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;  RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes;  RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;  int i, j, nNodes, totalNumNodes, lu_flag;  RF_ReconUnitNum_t which_ru;  int (*func)(), (*undoFunc)(), (*qfunc)();  int numDataNodes, numParityNodes;  RF_StripeNum_t parityStripeID;  RF_PhysDiskAddr_t *pda;  char *name, *qname;  long nfaults;  nfaults = qfuncs ? 2 : 1;  lu_flag = (rf_enableAtomicRMW) ? 1 : 0;          /* lock/unlock flag */  parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru);  pda = asmap->physInfo;  numDataNodes = asmap->numStripeUnitsAccessed;  numParityNodes = (asmap->parityInfo->next) ? 2 : 1;  if (rf_dagDebug) printf("[Creating small-write DAG]\n");  RF_ASSERT(numDataNodes > 0);  dag_h->creator = "SmallWriteDAGFwd";  dag_h->numCommitNodes = 0;  dag_h->numCommits = 0;  dag_h->numSuccedents = 1;  qfunc = NULL;  qname = NULL;  /* DAG creation occurs in four steps:     1. count the number of nodes in the DAG     2. create the nodes     3. initialize the nodes     4. connect the nodes   */  /* Step 1. compute number of nodes in the graph */  /* number of nodes:      a read and write for each data unit      a redundancy computation node for each parity node (nfaults * nparity)      a read and write for each parity unit      a block node      a terminate node      if atomic RMW        an unlock node for each data unit, redundancy unit  */  totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) + (nfaults * 2 * numParityNodes) + 2;  if (lu_flag)    totalNumNodes += (numDataNodes + (nfaults * numParityNodes));  /* Step 2. create the nodes */  RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);  i = 0;  blockNode        = &nodes[i]; i += 1;  readDataNodes    = &nodes[i]; i += numDataNodes;  readParityNodes  = &nodes[i]; i += numParityNodes;  writeDataNodes   = &nodes[i]; i += numDataNodes;  writeParityNodes = &nodes[i]; i += numParityNodes;  xorNodes         = &nodes[i]; i += numParityNodes;  termNode         = &nodes[i]; i += 1;  if (lu_flag) {    unlockDataNodes   = &nodes[i]; i += numDataNodes;    unlockParityNodes = &nodes[i]; i += numParityNodes;  }  else {    unlockDataNodes = unlockParityNodes = NULL;  }  if (nfaults == 2) {    readQNodes     = &nodes[i]; i += numParityNodes;    writeQNodes    = &nodes[i]; i += numParityNodes;    qNodes         = &nodes[i]; i += numParityNodes;    if (lu_flag) {      unlockQNodes    = &nodes[i]; i += numParityNodes;    }    else {      unlockQNodes = NULL;    }  }  else {    readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;  }  RF_ASSERT(i == totalNumNodes);    /* Step 3. initialize the nodes */  /* initialize block node (Nil) */  nNodes     = numDataNodes + (nfaults * numParityNodes);  rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);  /* initialize terminate node (Trm) */  rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);  /* initialize nodes which read old data (Rod) */  for (i = 0; i < numDataNodes; i++) {    rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h, "Rod", allocList);    RF_ASSERT(pda != NULL);    readDataNodes[i].params[0].p = pda;  /* physical disk addr desc */    readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList);  /* buffer to hold old data */    readDataNodes[i].params[2].v = parityStripeID;    readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);    pda=pda->next;    for (j = 0; j < readDataNodes[i].numSuccedents; j++)      readDataNodes[i].propList[j] = NULL;  }  /* initialize nodes which read old parity (Rop) */  pda = asmap->parityInfo; i = 0;  for (i = 0; i < numParityNodes; i++) {    RF_ASSERT(pda != NULL);    rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);    readParityNodes[i].params[0].p = pda;    readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList);    /* buffer to hold old parity */    readParityNodes[i].params[2].v = parityStripeID;    readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);    for (j = 0; j < readParityNodes[i].numSuccedents; j++)      readParityNodes[i].propList[0] = NULL;    pda=pda->next;  }  /* initialize nodes which read old Q (Roq) */  if (nfaults == 2)    {      pda = asmap->qInfo;       for (i = 0; i < numParityNodes; i++) {	RF_ASSERT(pda != NULL);	rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);	readQNodes[i].params[0].p = pda;	readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old Q */	readQNodes[i].params[2].v = parityStripeID;	readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru);	for (j = 0; j < readQNodes[i].numSuccedents; j++)	  readQNodes[i].propList[0] = NULL;	pda=pda->next;      }    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -