📄 rf_paritylog.c
字号:
}static void ReintLog( RF_Raid_t *raidPtr, int regionID, RF_ParityLog_t *log){ RF_ASSERT(log); /* Insert an in-core parity log (log) into the disk queue of reintegration work. Set the flag (reintInProgress) for the specified region (regionID) to indicate that reintegration is in progress for this region. NON-BLOCKING */ RF_LOCK_MUTEX(raidPtr->regionInfo[regionID].reintMutex); raidPtr->regionInfo[regionID].reintInProgress = RF_TRUE; /* cleared when reint complete */ if (rf_parityLogDebug) printf("[requesting reintegration of region %d]\n", log->regionID); /* move record to reintegration queue */ RF_LOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); log->next = raidPtr->parityLogDiskQueue.reintQueue; raidPtr->parityLogDiskQueue.reintQueue = log; RF_UNLOCK_MUTEX(raidPtr->regionInfo[regionID].reintMutex); RF_UNLOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); RF_SIGNAL_COND(raidPtr->parityLogDiskQueue.cond);}static void FlushLog( RF_Raid_t *raidPtr, RF_ParityLog_t *log){ /* insert a core log (log) into a list of logs (parityLogDiskQueue.flushQueue) waiting to be written to disk. NON-BLOCKING */ RF_ASSERT(log); RF_ASSERT(log->numRecords == raidPtr->numSectorsPerLog); RF_ASSERT(log->next == NULL); /* move log to flush queue */ RF_LOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); log->next = raidPtr->parityLogDiskQueue.flushQueue; raidPtr->parityLogDiskQueue.flushQueue = log; RF_UNLOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); RF_SIGNAL_COND(raidPtr->parityLogDiskQueue.cond);}static int DumpParityLogToDisk( int finish, RF_ParityLogData_t *logData){ int i, diskCount, regionID = logData->regionID; RF_ParityLog_t *log; RF_Raid_t *raidPtr; raidPtr = logData->common->raidPtr; /* Move a core log to disk. If the log disk is full, initiate reintegration. Return (0) if we can enqueue the dump immediately, otherwise return (1) to indicate we are blocked on reintegration and control of the thread should be relinquished. Caller must hold regionInfo[regionID].mutex NON-BLOCKING */ if (rf_parityLogDebug) printf("[dumping parity log to disk, region %d]\n", regionID); log = raidPtr->regionInfo[regionID].coreLog; RF_ASSERT(log->numRecords == raidPtr->numSectorsPerLog); RF_ASSERT(log->next == NULL); /* if reintegration is in progress, must queue work */ RF_LOCK_MUTEX(raidPtr->regionInfo[regionID].reintMutex); if (raidPtr->regionInfo[regionID].reintInProgress) { /* Can not proceed since this region is currently being reintegrated. We can not block, so queue remaining work and return */ if (rf_parityLogDebug) printf("[region %d waiting on reintegration]\n",regionID); /* XXX not sure about the use of finish - shouldn't this always be "Enqueue"? */ if (finish) RequeueParityLogData(logData, &raidPtr->parityLogDiskQueue.reintBlockHead, &raidPtr->parityLogDiskQueue.reintBlockTail); else EnqueueParityLogData(logData, &raidPtr->parityLogDiskQueue.reintBlockHead, &raidPtr->parityLogDiskQueue.reintBlockTail); RF_UNLOCK_MUTEX(raidPtr->regionInfo[regionID].reintMutex); return(1); /* relenquish control of this thread */ } RF_UNLOCK_MUTEX(raidPtr->regionInfo[regionID].reintMutex); raidPtr->regionInfo[regionID].coreLog = NULL; if ((raidPtr->regionInfo[regionID].diskCount) < raidPtr->regionInfo[regionID].capacity) /* IMPORTANT!! this loop bound assumes region disk holds an integral number of core logs */ { /* update disk map for this region */ diskCount = raidPtr->regionInfo[regionID].diskCount; for (i = 0; i < raidPtr->numSectorsPerLog; i++) { raidPtr->regionInfo[regionID].diskMap[i + diskCount].operation = log->records[i].operation; raidPtr->regionInfo[regionID].diskMap[i + diskCount].parityAddr = log->records[i].parityAddr; } log->diskOffset = diskCount; raidPtr->regionInfo[regionID].diskCount += raidPtr->numSectorsPerLog; FlushLog(raidPtr, log); } else { /* no room for log on disk, send it to disk manager and request reintegration */ RF_ASSERT(raidPtr->regionInfo[regionID].diskCount == raidPtr->regionInfo[regionID].capacity); ReintLog(raidPtr, regionID, log); } if (rf_parityLogDebug) printf("[finished dumping parity log to disk, region %d]\n", regionID); return(0);}int rf_ParityLogAppend( RF_ParityLogData_t *logData, int finish, RF_ParityLog_t **incomingLog, int clearReintFlag){ int regionID, logItem, itemDone; RF_ParityLogData_t *item; int punt, done = RF_FALSE; RF_ParityLog_t *log; RF_Raid_t *raidPtr; RF_Etimer_t timer; int (*wakeFunc)(); void *wakeArg; /* Add parity to the appropriate log, one sector at a time. This routine is called is called by dag functions ParityLogUpdateFunc and ParityLogOverwriteFunc and therefore MUST BE NONBLOCKING. Parity to be logged is contained in a linked-list (logData). When this routine returns, every sector in the list will be in one of three places: 1) entered into the parity log 2) queued, waiting on reintegration 3) queued, waiting on a core log Blocked work is passed to the ParityLoggingDiskManager for completion. Later, as conditions which required the block are removed, the work reenters this routine with the "finish" parameter set to "RF_TRUE." NON-BLOCKING */ raidPtr = logData->common->raidPtr; /* lock the region for the first item in logData */ RF_ASSERT(logData != NULL); regionID = logData->regionID; RF_LOCK_MUTEX(raidPtr->regionInfo[regionID].mutex); RF_ASSERT(raidPtr->regionInfo[regionID].loggingEnabled); if (clearReintFlag) { /* Enable flushing for this region. Holding both locks provides a synchronization barrier with DumpParityLogToDisk */ RF_LOCK_MUTEX(raidPtr->regionInfo[regionID].reintMutex); RF_LOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); RF_ASSERT(raidPtr->regionInfo[regionID].reintInProgress == RF_TRUE); raidPtr->regionInfo[regionID].diskCount = 0; raidPtr->regionInfo[regionID].reintInProgress = RF_FALSE; RF_UNLOCK_MUTEX(raidPtr->regionInfo[regionID].reintMutex); /* flushing is now enabled */ RF_UNLOCK_MUTEX(raidPtr->parityLogDiskQueue.mutex); } /* process each item in logData */ while (logData) { /* remove an item from logData */ item = logData; logData = logData->next; item->next = NULL; item->prev = NULL; if (rf_parityLogDebug) printf("[appending parity log data, region %d, raidAddress %d, numSector %d]\n",item->regionID,item->diskAddress.raidAddress, item->diskAddress.numSector); /* see if we moved to a new region */ if (regionID != item->regionID) { RF_UNLOCK_MUTEX(raidPtr->regionInfo[regionID].mutex); regionID = item->regionID; RF_LOCK_MUTEX(raidPtr->regionInfo[regionID].mutex); RF_ASSERT(raidPtr->regionInfo[regionID].loggingEnabled); } punt = RF_FALSE; /* Set to RF_TRUE if work is blocked. This can happen in one of two ways: 1) no core log (AcquireParityLog) 2) waiting on reintegration (DumpParityLogToDisk) If punt is RF_TRUE, the dataItem was queued, so skip to next item. */ /* process item, one sector at a time, until all sectors processed or we punt */ if (item->diskAddress.numSector > 0) done = RF_FALSE; else RF_ASSERT(0); while (!punt && !done) { /* verify that a core log exists for this region */ if (!raidPtr->regionInfo[regionID].coreLog) { /* Attempt to acquire a parity log. If acquisition fails, queue remaining work in data item and move to nextItem. */ if (incomingLog) if (*incomingLog) { RF_ASSERT((*incomingLog)->next == NULL); raidPtr->regionInfo[regionID].coreLog = *incomingLog; raidPtr->regionInfo[regionID].coreLog->regionID = regionID; *incomingLog = NULL; } else raidPtr->regionInfo[regionID].coreLog = AcquireParityLog(item, finish); else raidPtr->regionInfo[regionID].coreLog = AcquireParityLog(item, finish); /* Note: AcquireParityLog either returns a log or enqueues currentItem */ } if (!raidPtr->regionInfo[regionID].coreLog) punt = RF_TRUE; /* failed to find a core log */ else { RF_ASSERT(raidPtr->regionInfo[regionID].coreLog->next == NULL); /* verify that the log has room for new entries */ /* if log is full, dump it to disk and grab a new log */ if (raidPtr->regionInfo[regionID].coreLog->numRecords == raidPtr->numSectorsPerLog) { /* log is full, dump it to disk */ if (DumpParityLogToDisk(finish, item)) punt = RF_TRUE; /* dump unsuccessful, blocked on reintegration */ else { /* dump was successful */ if (incomingLog) if (*incomingLog) { RF_ASSERT((*incomingLog)->next == NULL); raidPtr->regionInfo[regionID].coreLog = *incomingLog; raidPtr->regionInfo[regionID].coreLog->regionID = regionID; *incomingLog = NULL; } else raidPtr->regionInfo[regionID].coreLog = AcquireParityLog(item, finish); else raidPtr->regionInfo[regionID].coreLog = AcquireParityLog(item, finish); /* if a core log is not available, must queue work and return */ if (!raidPtr->regionInfo[regionID].coreLog) punt = RF_TRUE; /* blocked on log availability */ } } } /* if we didn't punt on this item, attempt to add a sector to the core log */ if (!punt) { RF_ASSERT(raidPtr->regionInfo[regionID].coreLog->next == NULL); /* at this point, we have a core log with enough room for a sector */ /* copy a sector into the log */ log = raidPtr->regionInfo[regionID].coreLog; RF_ASSERT(log->numRecords < raidPtr->numSectorsPerLog); logItem = log->numRecords++; log->records[logItem].parityAddr = item->diskAddress; RF_ASSERT(log->records[logItem].parityAddr.startSector >= raidPtr->regionInfo[regionID].parityStartAddr); RF_ASSERT(log->records[logItem].parityAddr.startSector < raidPtr->regionInfo[regionID].parityStartAddr + raidPtr->regionInfo[regionID].numSectorsParity); log->records[logItem].parityAddr.numSector = 1; log->records[logItem].operation = item->common->operation; bcopy((item->common->bufPtr + (item->bufOffset++ * (1<<item->common->raidPtr->logBytesPerSector))), log->bufPtr + (logItem * (1<<item->common->raidPtr->logBytesPerSector)), (1<<item->common->raidPtr->logBytesPerSector)); item->diskAddress.numSector--; item->diskAddress.startSector++; if (item->diskAddress.numSector == 0) done = RF_TRUE; } } if (!punt) { /* Processed this item completely, decrement count of items to be processed. */ RF_ASSERT(item->diskAddress.numSector == 0); RF_LOCK_MUTEX(item->common->mutex); item->common->cnt--; if (item->common->cnt == 0) itemDone = RF_TRUE; else itemDone = RF_FALSE; RF_UNLOCK_MUTEX(item->common->mutex); if (itemDone) { /* Finished processing all log data for this IO Return structs to free list and invoke wakeup function. */ timer = item->common->startTime; /* grab initial value of timer */ RF_ETIMER_STOP(timer); RF_ETIMER_EVAL(timer); item->common->tracerec->plog_us += RF_ETIMER_VAL_US(timer); if (rf_parityLogDebug) printf("[waking process for region %d]\n", item->regionID); wakeFunc = item->common->wakeFunc; wakeArg = item->common->wakeArg; FreeParityLogCommonData(item->common); FreeParityLogData(item); (wakeFunc)(wakeArg, 0); } else FreeParityLogData(item); } } RF_UNLOCK_MUTEX(raidPtr->regionInfo[regionID].mutex); if (rf_parityLogDebug) printf("[exiting ParityLogAppend]\n"); return(0);}void rf_EnableParityLogging(RF_Raid_t *raidPtr){ int regionID; for (regionID = 0; regionID < rf_numParityRegions; regionID++) { RF_LOCK_MUTEX(raidPtr->regionInfo[regionID].mutex); raidPtr->regionInfo[regionID].loggingEnabled = RF_TRUE; RF_UNLOCK_MUTEX(raidPtr->regionInfo[regionID].mutex); } if (rf_parityLogDebug) printf("[parity logging enabled]\n");}#endif /* RF_INCLUDE_PARITYLOGGING > 0 */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -