📄 rf_paritylogdiskmgr.c
字号:
rf_DispatchDAG(*pwr_dag_h, rf_MCPairWakeupFunc, (void *) pwr_mcpair);}static void FlushLogsToDisk( RF_Raid_t *raidPtr, RF_ParityLog_t *logList){ /* Flush a linked list of core logs to the log disk. Logs contain the disk location where they should be written. Logs were written in FIFO order and that order must be preserved. Recommended optimizations: 1) allow multiple flushes to occur simultaneously 2) coalesce contiguous flush operations BLOCKING */ RF_ParityLog_t *log; RF_RegionId_t regionID; RF_MCPair_t *fwr_mcpair; RF_DagHeader_t *fwr_dag_h; RF_AllocListElem_t *fwr_alloclist; RF_PhysDiskAddr_t *fwr_pda; fwr_mcpair = rf_AllocMCPair(); RF_LOCK_MUTEX(fwr_mcpair->mutex); RF_ASSERT(logList); log = logList; while (log) { regionID = log->regionID; /* create and launch a DAG to write the core log */ if (rf_parityLogDebug) printf("[initiating write of core log for region %d]\n", regionID); fwr_mcpair->flag = RF_FALSE; WriteCoreLog(log, fwr_mcpair, raidPtr, &fwr_dag_h, &fwr_alloclist, &fwr_pda); /* wait for the DAG to complete */#ifndef SIMULATE while (!fwr_mcpair->flag) RF_WAIT_COND(fwr_mcpair->cond, fwr_mcpair->mutex);#endif /* !SIMULATE */ if (fwr_dag_h->status != rf_enable) { RF_ERRORMSG1("Unable to write core log to disk (region %d)\n", regionID); RF_ASSERT(0); } /* RF_Free(fwr_pda, sizeof(RF_PhysDiskAddr_t)); */ rf_FreePhysDiskAddr(fwr_pda); rf_FreeDAG(fwr_dag_h); rf_FreeAllocList(fwr_alloclist); log = log->next; } RF_UNLOCK_MUTEX(fwr_mcpair->mutex); rf_FreeMCPair(fwr_mcpair); rf_ReleaseParityLogs(raidPtr, logList);}static void ReintegrateRegion( RF_Raid_t *raidPtr, RF_RegionId_t regionID, RF_ParityLog_t *coreLog){ RF_MCPair_t *rrd_mcpair, *prd_mcpair, *pwr_mcpair; RF_DagHeader_t *rrd_dag_h, *prd_dag_h, *pwr_dag_h; RF_AllocListElem_t *rrd_alloclist, *prd_alloclist, *pwr_alloclist; RF_PhysDiskAddr_t *rrd_pda, *prd_pda, *pwr_pda; caddr_t parityBuffer, regionBuffer; /* Reintegrate a region (regionID). 1. acquire region and parity buffers 2. read log from disk 3. read parity from disk 4. apply log to parity 5. apply core log to parity 6. write new parity to disk BLOCKING */ if (rf_parityLogDebug) printf("[reintegrating region %d]\n", regionID); /* initiate read of region parity */ if (rf_parityLogDebug) printf("[initiating read of parity for region %d]\n", regionID); parityBuffer = AcquireReintBuffer(&raidPtr->parityBufferPool); prd_mcpair = rf_AllocMCPair(); RF_LOCK_MUTEX(prd_mcpair->mutex); prd_mcpair->flag = RF_FALSE; ReadRegionParity(regionID, prd_mcpair, parityBuffer, raidPtr, &prd_dag_h, &prd_alloclist, &prd_pda); /* if region log nonempty, initiate read */ if (raidPtr->regionInfo[regionID].diskCount > 0) { if (rf_parityLogDebug) printf("[initiating read of disk log for region %d]\n", regionID); regionBuffer = AcquireReintBuffer(&raidPtr->regionBufferPool); rrd_mcpair = rf_AllocMCPair(); RF_LOCK_MUTEX(rrd_mcpair->mutex); rrd_mcpair->flag = RF_FALSE; ReadRegionLog(regionID, rrd_mcpair, regionBuffer, raidPtr, &rrd_dag_h, &rrd_alloclist, &rrd_pda); } /* wait on read of region parity to complete */#ifndef SIMULATE while (!prd_mcpair->flag) { RF_WAIT_COND(prd_mcpair->cond, prd_mcpair->mutex); }#endif /* !SIMULATE */ RF_UNLOCK_MUTEX(prd_mcpair->mutex); if (prd_dag_h->status != rf_enable) { RF_ERRORMSG("Unable to read parity from disk\n"); /* add code to fail the parity disk */ RF_ASSERT(0); } /* apply core log to parity */ /* if (coreLog) ApplyLogsToParity(coreLog, parityBuffer); */ if (raidPtr->regionInfo[regionID].diskCount > 0) { /* wait on read of region log to complete */#ifndef SIMULATE while (!rrd_mcpair->flag) RF_WAIT_COND(rrd_mcpair->cond, rrd_mcpair->mutex);#endif /* !SIMULATE */ RF_UNLOCK_MUTEX(rrd_mcpair->mutex); if (rrd_dag_h->status != rf_enable) { RF_ERRORMSG("Unable to read region log from disk\n"); /* add code to fail the log disk */ RF_ASSERT(0); } /* apply region log to parity */ /* ApplyRegionToParity(regionID, regionBuffer, parityBuffer); */ /* release resources associated with region log */ /* RF_Free(rrd_pda, sizeof(RF_PhysDiskAddr_t)); */ rf_FreePhysDiskAddr(rrd_pda); rf_FreeDAG(rrd_dag_h); rf_FreeAllocList(rrd_alloclist); rf_FreeMCPair(rrd_mcpair); ReleaseReintBuffer(&raidPtr->regionBufferPool, regionBuffer); } /* write reintegrated parity to disk */ if (rf_parityLogDebug) printf("[initiating write of parity for region %d]\n", regionID); pwr_mcpair = rf_AllocMCPair(); RF_LOCK_MUTEX(pwr_mcpair->mutex); pwr_mcpair->flag = RF_FALSE; WriteRegionParity(regionID, pwr_mcpair, parityBuffer, raidPtr, &pwr_dag_h, &pwr_alloclist, &pwr_pda);#ifndef SIMULATE while (!pwr_mcpair->flag) RF_WAIT_COND(pwr_mcpair->cond, pwr_mcpair->mutex);#endif /* !SIMULATE */ RF_UNLOCK_MUTEX(pwr_mcpair->mutex); if (pwr_dag_h->status != rf_enable) { RF_ERRORMSG("Unable to write parity to disk\n"); /* add code to fail the parity disk */ RF_ASSERT(0); } /* release resources associated with read of old parity */ /* RF_Free(prd_pda, sizeof(RF_PhysDiskAddr_t)); */ rf_FreePhysDiskAddr(prd_pda); rf_FreeDAG(prd_dag_h); rf_FreeAllocList(prd_alloclist); rf_FreeMCPair(prd_mcpair); /* release resources associated with write of new parity */ ReleaseReintBuffer(&raidPtr->parityBufferPool, parityBuffer); /* RF_Free(pwr_pda, sizeof(RF_PhysDiskAddr_t)); */ rf_FreePhysDiskAddr(pwr_pda); rf_FreeDAG(pwr_dag_h); rf_FreeAllocList(pwr_alloclist); rf_FreeMCPair(pwr_mcpair); if (rf_parityLogDebug) printf("[finished reintegrating region %d]\n", regionID);}static void ReintegrateLogs( RF_Raid_t *raidPtr, RF_ParityLog_t *logList){ RF_ParityLog_t *log, *freeLogList = NULL; RF_ParityLogData_t *logData, *logDataList; RF_RegionId_t regionID; RF_ASSERT(logList); while (logList) { log = logList; logList = logList->next; log->next = NULL; regionID = log->regionID; ReintegrateRegion(raidPtr, regionID, log); log->numRecords = 0; /* remove all items which are blocked on reintegration of this region */ RF_LOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); logData = rf_SearchAndDequeueParityLogData(raidPtr, regionID, &raidPtr->parityLogDiskQueue.reintBlockHead, &raidPtr->parityLogDiskQueue.reintBlockTail, RF_TRUE); logDataList = logData; while (logData) { logData->next = rf_SearchAndDequeueParityLogData(raidPtr, regionID, &raidPtr->parityLogDiskQueue.reintBlockHead, &raidPtr->parityLogDiskQueue.reintBlockTail, RF_TRUE); logData = logData->next; } RF_UNLOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); /* process blocked log data and clear reintInProgress flag for this region */ if (logDataList) rf_ParityLogAppend(logDataList, RF_TRUE, &log, RF_TRUE); else { /* Enable flushing for this region. Holding both locks provides a synchronization barrier with DumpParityLogToDisk */ RF_LOCK_MUTEX(raidPtr->regionInfo[regionID].mutex); RF_LOCK_MUTEX(raidPtr->regionInfo[regionID].reintMutex); RF_LOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); raidPtr->regionInfo[regionID].diskCount = 0; raidPtr->regionInfo[regionID].reintInProgress = RF_FALSE; RF_UNLOCK_MUTEX(raidPtr->regionInfo[regionID].mutex); RF_UNLOCK_MUTEX(raidPtr->regionInfo[regionID].reintMutex); /* flushing is now enabled */ RF_UNLOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); } /* if log wasn't used, attach it to the list of logs to be returned */ if (log) { log->next = freeLogList; freeLogList = log; } } if (freeLogList) rf_ReleaseParityLogs(raidPtr, freeLogList);}int rf_ShutdownLogging(RF_Raid_t *raidPtr){ /* shutdown parity logging 1) disable parity logging in all regions 2) reintegrate all regions */ RF_SectorCount_t diskCount; RF_RegionId_t regionID; RF_ParityLog_t *log; if (rf_parityLogDebug) printf("[shutting down parity logging]\n"); /* Since parity log maps are volatile, we must reintegrate all regions. */ if (rf_forceParityLogReint) { for (regionID = 0; regionID < rf_numParityRegions; regionID++) { RF_LOCK_MUTEX(raidPtr->regionInfo[regionID].mutex); raidPtr->regionInfo[regionID].loggingEnabled = RF_FALSE; log = raidPtr->regionInfo[regionID].coreLog; raidPtr->regionInfo[regionID].coreLog = NULL; diskCount = raidPtr->regionInfo[regionID].diskCount; RF_UNLOCK_MUTEX(raidPtr->regionInfo[regionID].mutex); if (diskCount > 0 || log != NULL) ReintegrateRegion(raidPtr, regionID, log); if (log != NULL) rf_ReleaseParityLogs(raidPtr, log); } } if (rf_parityLogDebug) { printf("[parity logging disabled]\n"); printf("[should be done!]\n"); } return(0);}int rf_ParityLoggingDiskManager(RF_Raid_t *raidPtr){ RF_ParityLog_t *reintQueue, *flushQueue; int workNeeded, done = RF_FALSE; rf_assign_threadid(); /* don't remove this line */ /* Main program for parity logging disk thread. This routine waits for work to appear in either the flush or reintegration queues and is responsible for flushing core logs to the log disk as well as reintegrating parity regions. BLOCKING */ RF_LOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); /* * Inform our creator that we're running. Don't bother doing the * mutex lock/unlock dance- we locked above, and we'll unlock * below with nothing to do, yet. */ raidPtr->parityLogDiskQueue.threadState |= RF_PLOG_RUNNING; RF_SIGNAL_COND(raidPtr->parityLogDiskQueue.cond); /* empty the work queues */ flushQueue = raidPtr->parityLogDiskQueue.flushQueue; raidPtr->parityLogDiskQueue.flushQueue = NULL; reintQueue = raidPtr->parityLogDiskQueue.reintQueue; raidPtr->parityLogDiskQueue.reintQueue = NULL; workNeeded = (flushQueue || reintQueue); while (!done) { while (workNeeded) { /* First, flush all logs in the flush queue, freeing buffers Second, reintegrate all regions which are reported as full. Third, append queued log data until blocked. Note: Incoming appends (ParityLogAppend) can block on either 1. empty buffer pool 2. region under reintegration To preserve a global FIFO ordering of appends, buffers are not released to the world until those appends blocked on buffers are removed from the append queue. Similarly, regions which are reintegrated are not opened for general use until the append queue has been emptied. */ RF_UNLOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); /* empty flushQueue, using free'd log buffers to process bufTail */ if (flushQueue) FlushLogsToDisk(raidPtr, flushQueue); /* empty reintQueue, flushing from reintTail as we go */ if (reintQueue) ReintegrateLogs(raidPtr, reintQueue); RF_LOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); flushQueue = raidPtr->parityLogDiskQueue.flushQueue; raidPtr->parityLogDiskQueue.flushQueue = NULL; reintQueue = raidPtr->parityLogDiskQueue.reintQueue; raidPtr->parityLogDiskQueue.reintQueue = NULL; workNeeded = (flushQueue || reintQueue); } /* no work is needed at this point */ if (raidPtr->parityLogDiskQueue.threadState&RF_PLOG_TERMINATE) { /* shutdown parity logging 1. disable parity logging in all regions 2. reintegrate all regions */ done = RF_TRUE; /* thread disabled, no work needed */ RF_UNLOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); rf_ShutdownLogging(raidPtr); } if (!done) { /* thread enabled, no work needed, so sleep */ if (rf_parityLogDebug) printf("[parity logging disk manager sleeping]\n"); RF_WAIT_COND(raidPtr->parityLogDiskQueue.cond, raidPtr->parityLogDiskQueue.mutex); if (rf_parityLogDebug) printf("[parity logging disk manager just woke up]\n"); flushQueue = raidPtr->parityLogDiskQueue.flushQueue; raidPtr->parityLogDiskQueue.flushQueue = NULL; reintQueue = raidPtr->parityLogDiskQueue.reintQueue; raidPtr->parityLogDiskQueue.reintQueue = NULL; workNeeded = (flushQueue || reintQueue); } } /* * Announce that we're done. */ RF_LOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); raidPtr->parityLogDiskQueue.threadState |= RF_PLOG_SHUTDOWN; RF_UNLOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); RF_SIGNAL_COND(raidPtr->parityLogDiskQueue.cond); return(0);}#endif /* RF_INCLUDE_PARITYLOGGING > 0 */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -