database.cpp
来自「FastDb是高效的内存数据库系统」· C++ 代码 · 共 2,469 行 · 第 1/5 页
CPP
2,469 行
sprintf(databaseName, "%s.%d", dbName, version);
int rc = file.open(fileName, databaseName,
accessType == dbReadOnly || accessType == dbConcurrentRead, fileSize, false);
if (rc != dbFile::ok)
{
char msgbuf[64];
file.errorText(rc, msgbuf, sizeof msgbuf);
TRACE_MSG(("File open error: %s\n", msgbuf));
handleError(DatabaseOpenError, "Failed to create database file");
return false;
}
baseAddr = (byte*)file.getAddr();
fileSize = file.getSize();
header = (dbHeader*)baseAddr;
updatedRecordId = 0;
if ((unsigned)header->curr > 1)
{
handleError(DatabaseOpenError, "Database file was corrupted: "
"invalid root index");
return false;
}
if (header->initialized != 1)
{
if (accessType == dbReadOnly || accessType == dbConcurrentRead)
{
handleError(DatabaseOpenError, "Can not open uninitialized "
"file in read only mode");
return false;
}
monitor->curr = header->curr = 0;
header->size = fileSize;
size_t used = dbPageSize;
header->root[0].index = used;
header->root[0].indexSize = indexSize;
header->root[0].indexUsed = dbFirstUserId;
header->root[0].freeList = 0;
used += indexSize*sizeof(offs_t);
header->root[1].index = used;
header->root[1].indexSize = indexSize;
header->root[1].indexUsed = dbFirstUserId;
header->root[1].freeList = 0;
used += indexSize*sizeof(offs_t);
header->root[0].shadowIndex = header->root[1].index;
header->root[1].shadowIndex = header->root[0].index;
header->root[0].shadowIndexSize = indexSize;
header->root[1].shadowIndexSize = indexSize;
header->majorVersion= FASTDB_MAJOR_VERSION;
header->minorVersion = FASTDB_MINOR_VERSION;
index[0] = (offs_t*)(baseAddr + header->root[0].index);
index[1] = (offs_t*)(baseAddr + header->root[1].index);
index[0][dbInvalidId] = dbFreeHandleMarker;
size_t bitmapPages =
(used + dbPageSize*(dbAllocationQuantum*8-1) - 1)
/ (dbPageSize*(dbAllocationQuantum*8-1));
memset(baseAddr+used, 0xFF, (used + bitmapPages*dbPageSize)
/ (dbAllocationQuantum*8));
size_t i;
for (i = 0; i < bitmapPages; i++)
{
index[0][dbBitmapId + i] = used + dbPageObjectMarker;
used += dbPageSize;
}
while (i < dbBitmapPages)
{
index[0][dbBitmapId+i] = dbFreeHandleMarker;
i += 1;
}
currIndex = index[0];
currIndexSize = dbFirstUserId;
committedIndexSize = 0;
initializeMetaTable();
header->dirty = true;
memcpy(index[1], index[0], currIndexSize*sizeof(offs_t));
file.markAsDirty(0, used);
file.flush(true);
header->initialized = true;
file.markAsDirty(0, sizeof(dbHeader));
file.flush(true);
}
else
{
monitor->curr = header->curr;
if (header->dirty)
{
TRACE_MSG(("Database was not normally closed: "
"start recovery\n"));
if (accessType == dbReadOnly || accessType == dbConcurrentRead)
{
handleError(DatabaseOpenError,
"Can not open dirty file in read only moode");
return false;
}
recovery();
TRACE_MSG(("Recovery completed\n"));
}
else
{
if (file.getSize() != header->size)
{
handleError(DatabaseOpenError, "Database file was "
"corrupted: file size in header differs "
"from actual file size");
return false;
}
}
}
if (!loadScheme(true))
{
return false;
}
initMutex.done();
}
else
{
sprintf(name, "%s.cs", dbName);
if (!cs.open(name, &monitor->sem))
{
handleError(DatabaseOpenError, "Failed to open shared semaphore");
return false;
}
if (accessType == dbConcurrentUpdate)
{
sprintf(name, "%s.mcs", dbName);
if (!mutatorCS.open(name, &monitor->mutatorSem))
{
handleError(DatabaseOpenError, "Failed to open shared semaphore");
return false;
}
}
version = 0;
if (!loadScheme(false))
{
return false;
}
}
cs.enter();
monitor->users += 1;
cs.leave();
opened = true;
if (commitDelaySec != 0)
{
dbCriticalSection cs(delayedCommitStartTimerMutex);
commitTimeout = commitDelay = commitDelaySec;
commitThread.create((dbThread::thread_proc_t)delayedCommitProc, this);
commitThreadSyncEvent.wait(delayedCommitStartTimerMutex);
}
return true;
}
void dbDatabase::scheduleBackup(char const* fileName, time_t period)
{
if (backupFileName == NULL)
{
backupFileName = new char[strlen(fileName) + 1];
strcpy(backupFileName, fileName);
backupPeriod = period;
backupThread.create((dbThread::thread_proc_t)backupSchedulerProc, this);
}
}
void dbDatabase::backupScheduler()
{
backupThread.setPriority(dbThread::THR_PRI_LOW);
dbCriticalSection cs(backupMutex);
while (true)
{
time_t timeout = backupPeriod;
if (backupFileName[strlen(backupFileName)-1] != '?')
{
struct stat st;
if (::stat(backupFileName, &st) == 0)
{
time_t howOld = time(NULL) - st.st_atime;
if (timeout < howOld)
{
timeout = 0;
}
else
{
timeout -= howOld;
}
}
}
backupInitEvent.wait(backupMutex, timeout*1000);
if (backupFileName != NULL)
{
if (backupFileName[strlen(backupFileName)-1] == '?')
{
time_t currTime = time(NULL);
char* fileName = new char[strlen(backupFileName) + 32];
struct tm* t = localtime(&currTime);
sprintf(fileName, "%.*s-%04d.%02d.%02d_%02d.%02d.%02d",
(int)strlen(backupFileName)-1, backupFileName,
t->tm_year + 1900, t->tm_mon+1, t->tm_mday,
t->tm_hour, t->tm_min, t->tm_sec);
backup(fileName, false);
delete[] fileName;
}
else
{
char* newFileName = new char[strlen(backupFileName) + 5];
sprintf(newFileName,"%s.new", backupFileName);
backup(newFileName, false);
::remove
(backupFileName);
::rename(newFileName, backupFileName);
delete[] newFileName;
}
}
else
{
return;
}
}
}
void dbDatabase::recovery()
{
int curr = header->curr;
header->size = file.getSize();
header->root[1-curr].indexUsed = header->root[curr].indexUsed;
header->root[1-curr].freeList = header->root[curr].freeList;
header->root[1-curr].index = header->root[curr].shadowIndex;
header->root[1-curr].indexSize =
header->root[curr].shadowIndexSize;
header->root[1-curr].shadowIndex = header->root[curr].index;
header->root[1-curr].shadowIndexSize =
header->root[curr].indexSize;
offs_t* dst = (offs_t*)(baseAddr+header->root[1-curr].index);
offs_t* src = (offs_t*)(baseAddr+header->root[curr].index);
currIndex = dst;
for (int i = 0, n = header->root[curr].indexUsed; i < n; i++)
{
if (dst[i] != src[i])
{
dst[i] = src[i];
file.markAsDirty(header->root[1-curr].index + i*sizeof(offs_t), sizeof(offs_t));
}
}
//
// Restore consistency of table rows l2-list
//
restoreTablesConsistency();
file.markAsDirty(0, sizeof(dbHeader));
}
void dbDatabase::restoreTablesConsistency()
{
dbTable* table = (dbTable*)getRow(dbMetaTableId);
oid_t lastId = table->lastRow;
if (lastId != 0)
{
dbRecord* record = getRow(lastId);
if (record->next != 0)
{
record->next = 0;
file.markAsDirty(currIndex[lastId], sizeof(dbTable));
}
}
oid_t tableId = table->firstRow;
while (tableId != 0)
{
table = (dbTable*)getRow(tableId);
lastId = table->lastRow;
if (lastId != 0)
{
dbRecord* record = getRow(lastId);
if (record->next != 0)
{
record->next = 0;
file.markAsDirty(currIndex[lastId], sizeof(dbTable));
}
}
tableId = table->next;
}
}
void dbDatabase::setConcurrency(unsigned nThreads)
{
if (nThreads == 0)
{ // autodetect number of processors
nThreads = dbThread::numberOfProcessors();
}
if (nThreads > dbMaxParallelSearchThreads)
{
nThreads = dbMaxParallelSearchThreads;
}
parThreads = nThreads;
}
bool dbDatabase::loadScheme(bool alter)
{
if (!beginTransaction(accessType != dbReadOnly && accessType != dbConcurrentRead
? dbExclusiveLock : dbSharedLock))
{
return false;
}
dbTableDescriptor *desc, *next;
dbTable* metaTable = (dbTable*)getRow(dbMetaTableId);
oid_t first = metaTable->firstRow;
oid_t last = metaTable->lastRow;
int nTables = metaTable->nRows;
oid_t tableId = first;
for (desc = dbTableDescriptor::chain; desc != NULL; desc = next)
{
next = desc->next;
if (desc->db != NULL && desc->db != DETACHED_TABLE && desc->db != this)
{
continue;
}
if (desc->db == DETACHED_TABLE)
{
desc = desc->clone();
}
dbFieldDescriptor* fd;
for (fd = desc->firstField; fd != NULL; fd = fd->nextField)
{
fd->tTree = 0;
fd->hashTable = 0;
fd->attr &= ~dbFieldDescriptor::Updated;
}
int n = nTables;
while (--n >= 0)
{
dbTable* table = (dbTable*)getRow(tableId);
oid_t next = table->next;
if (strcmp(desc->name, (char*)table + table->name.offs) == 0)
{
if (!desc->equal(table))
{
if (!alter)
{
handleError(DatabaseOpenError, "Incompatible class"
" definition in application");
return false;
}
beginTransaction(dbExclusiveLock);
modified = true;
if (table->nRows == 0)
{
TRACE_MSG(("Replace definition of table '%s'\n",
desc->name));
updateTableDescriptor(desc, tableId);
}
else
{
reformatTable(tableId, desc);
}
}
else
{
linkTable(desc, tableId);
}
desc->setFlags();
break;
}
if (tableId == last)
{
tableId = first;
}
else
{
tableId = next;
}
}
if (n < 0)
{ // no match found
if (accessType == dbReadOnly || accessType == dbConcurrentRead)
{
handleError(DatabaseOpenError, "New table definition can not "
"be added to read only database");
return false;
}
else
{
TRACE_MSG(("Create new table '%s' in database\n", desc->name));
addNewTable(desc);
modified = true;
}
}
if (accessType != dbReadOnly && accessType != dbConcurrentRead)
{
if (!addIndices(alter, desc))
{
handleError(DatabaseOpenError, "Failed to alter indices with"
" active applications");
rollback();
return false;
}
}
}
for (desc = tables; desc != NULL; desc = desc->nextDbTable)
{
if (desc->cloneOf != NULL)
{
for (dbFieldDescriptor *fd = desc->firstField; fd != NULL; fd = fd->nextField)
{
if (fd->refTable != NULL)
{
fd->refTable = lookupTable(fd->refTable);
}
}
}
desc->checkRelationship();
}
commit();
return true;
}
void dbDatabase::reformatTable(oid_t tableId, dbTableDescriptor* desc)
{
dbTable* table = (dbTable*)putRow(tableId);
if (desc->match(table, confirmDeleteColumns))
{
TRACE_MSG(("New version of table '%s' is compatible with old one\n",
desc->name));
updateTableDescriptor(desc, tableId);
}
else
{
TRACE_MSG(("
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?