📄 file.cpp
字号:
//-< FILE.CPP >------------------------------------------------------*--------*
// FastDB Version 1.0 (c) 1999 GARRET * ? *
// (Main Memory Database Management System) * /\| *
// * / \ *
// Created: 20-Nov-98 K.A. Knizhnik * / [] \ *
// Last update: 10-Dec-98 K.A. Knizhnik * GARRET *
//-------------------------------------------------------------------*--------*
// System dependent implementation of mapped on memory file
//-------------------------------------------------------------------*--------*
#ifdef UNIX
#include <unistd.h>
#include <fcntl.h>
static int set_fd_close_on_exec (int fd)
{
int oldflags = ::fcntl(fd, F_GETFD, 0);
if (oldflags < 0)
return oldflags;
oldflags |= FD_CLOEXEC;
return ::fcntl(fd, F_SETFD, oldflags);
}
#ifdef VXWORKS
#define ftruncate ftruncate_shim_implementation
int ftruncate_shim_implementation(int fd, unsigned int size) {
int cur = lseek(fd, 0, SEEK_CUR);
int ret = lseek(fd, size-1, SEEK_END);
if( ret == -1)
return -1;
::write(fd, "0", 1);
// Set the seek position back
ret = lseek(fd, cur, SEEK_SET);
return (ret == -1) ? -1 : 0;
}
#endif // VXWORKS
#endif // UNIX
#define INSIDE_FASTDB
#include "stdtp.h"
#include "file.h"
#include "database.h"
#ifdef VXWORKS
#include "fastdbShim.h"
#endif // VXWORKS
BEGIN_FASTDB_NAMESPACE
const size_t MAX_READ_BUFFER_SIZE = 16*1024*1024;
#if FUZZY_CHECKPOINT
struct dbWriteQuery {
dbWriteQuery* next;
size_t pos;
char page[dbPageSize];
};
const int DEFAULT_QUEUE_LENGTH_LIMIT = 16*1024;
class dbFileWriter {
private:
dbLocalSemaphore event;
dbMutex mutex;
dbWriteQuery* last;
dbWriteQuery* first;
dbWriteQuery* free;
int queueLength;
int queueLengthLimit;
int underflow;
int overflow;
int running;
dbThread thread;
dbFile* file;
static void thread_proc writeLoop(void* arg) {
((dbFileWriter*)arg)->write();
}
void write() {
mutex.lock();
while (true) {
dbWriteQuery* query = first;
if (query != NULL) {
queueLength -= 1;
if (overflow) {
overflow = false;
event.signal();
}
first = query->next;
mutex.unlock();
file->write(query->pos, query->page, dbPageSize);
mutex.lock();
query->next = free;
free = query;
} else {
if (!running) {
break;
}
underflow = true;
event.wait(mutex);
}
}
mutex.unlock();
}
public:
dbFileWriter(dbFile* file) {
this->file = file;
queueLength = 0;
queueLengthLimit = DEFAULT_QUEUE_LENGTH_LIMIT;
event.open();
first = last = NULL;
free = NULL;
overflow = underflow = false;
running = true;
thread.create(&writeLoop, this);
}
void setQueueLimit(size_t limit) {
queueLengthLimit = limit;
}
~dbFileWriter() {
mutex.lock();
running = false;
event.signal();
mutex.unlock();
thread.join();
dbWriteQuery* query = free;
while (query != NULL) {
dbWriteQuery* next = query->next;
delete query;
query = next;
}
event.close();
}
void put(size_t pos, void* page, size_t size) {
char* beg = (char*)page;
char* end = beg + size;
while (beg < end) {
put(pos, beg);
pos += dbPageSize;
beg += dbPageSize;
}
}
void put(size_t pos, void* page) {
mutex.lock();
while (queueLength >= queueLengthLimit) {
overflow = true;
event.wait(mutex);
}
dbWriteQuery* query = free;
if (query == NULL) {
query = new dbWriteQuery();
} else {
free = query->next;
}
if (first == NULL) {
first = query;
} else {
last->next = query;
}
queueLength += 1;
last = query;
query->next = NULL;
query->pos = pos;
memcpy(query->page, page, dbPageSize);
if (underflow) {
underflow = false;
event.signal();
}
mutex.unlock();
}
};
void dbFile::setCheckpointBufferSize(size_t nPages)
{
writer->setQueueLimit(nPages);
}
#endif
dbFile::dbFile()
{
sharedName = NULL;
mmapAddr = NULL;
mmapSize = 0;
readonly = false;
#ifdef REPLICATION_SUPPORT
currUpdateCount = NULL;
diskUpdateCount = NULL;
rootPage = NULL;
db = NULL;
#endif
}
dbFile::~dbFile()
{
}
#if defined(REPLICATION_SUPPORT) || defined(NO_MMAP)
const size_t dbMaxSyncSegmentSize = 128*1024 / dbModMapBlockSize;
#endif
#ifdef REPLICATION_SUPPORT
#include "database.h"
int dbFile::dbSyncTimeout = 1000; // one second
bool dbFile::updatePages(socket_t* s, size_t pos, int pageUpdateCounter, int size)
{
if (pos + size > mmapSize) {
size_t newSize = pos + size > mmapSize*2 ? pos + size : mmapSize*2;
setSize(newSize, sharedName);
((dbHeader*)mmapAddr)->size = newSize;
db->baseAddr = (byte*)mmapAddr;
db->header = (dbHeader*)mmapAddr;
db->version = db->monitor->version += 1;
}
if (s->read(mmapAddr + pos, size)) {
int pageNo = pos >> dbModMapBlockBits;
if (updateCounter < pageUpdateCounter) {
updateCounter = pageUpdateCounter;
}
while (size > 0) {
currUpdateCount[pageNo++] = pageUpdateCounter;
size -= dbModMapBlockSize;
}
return true;
}
return false;
}
bool dbFile::concurrentUpdatePages(socket_t* s, size_t pos, int pageUpdateCounter, int size)
{
if (pos + size > mmapSize) {
size_t newSize = pos + size > mmapSize*2 ? pos + size : mmapSize*2;
TRACE_IMSG(("Extend database from %ld to %ld\n", (long)mmapSize, (long)newSize));
db->beginTransaction(dbDatabase::dbCommitLock);
setSize(newSize, sharedName);
((dbHeader*)mmapAddr)->size = newSize;
#ifdef PROTECT_DATABASE
protect(0, mmapSize);
#endif
db->baseAddr = (byte*)mmapAddr;
db->header = (dbHeader*)mmapAddr;
db->version = db->monitor->version += 1;
db->endTransaction();
}
if (pos == 0 && size <= pageSize) {
if (!s->read(rootPage, size)) {
return false;
}
if (((dbHeader*)rootPage)->curr != ((dbHeader*)mmapAddr)->curr) {
TRACE_MSG(("Commit transaction at replica\n"));
db->beginTransaction(dbDatabase::dbCommitLock);
#ifdef PROTECT_DATABASE
unprotect(pos, size);
#endif
memcpy(mmapAddr, rootPage, size);
// now readers will see updated data
db->monitor->curr = ((dbHeader*)mmapAddr)->curr;
db->endTransaction();
#ifdef SYNCHRONOUS_REPLICATION
ReplicationRequest rr;
rr.op = ReplicationRequest::RR_COMMITTED;
rr.size = 0;
s->write(&rr, sizeof rr);
#endif
} else {
#ifdef PROTECT_DATABASE
unprotect(pos, size);
#endif
memcpy(mmapAddr, rootPage, size);
}
} else {
#ifdef PROTECT_DATABASE
unprotect(pos, size);
#endif
if (!s->read(mmapAddr + pos, size)) {
#ifdef PROTECT_DATABASE
protect(pos, size);
#endif
return false;
}
assert(((dbHeader*)mmapAddr)->curr == db->monitor->curr);
}
#ifdef PROTECT_DATABASE
protect(pos, size);
#endif
int pageNo = pos >> dbModMapBlockBits;
if (updateCounter < pageUpdateCounter) {
updateCounter = pageUpdateCounter;
}
while (size > 0) {
currUpdateCount[pageNo++] = pageUpdateCounter;
size -= dbModMapBlockSize;
}
return true;
}
int dbFile::getUpdateCountTableSize() {
int nPages = mmapSize >> dbModMapBlockBits;
while (--nPages >= 0 && diskUpdateCount[nPages] == 0);
return nPages + 1;
}
int dbFile::getMaxPages()
{
return 1 << (dbDatabaseOffsetBits - dbModMapBlockBits);
}
void dbFile::startSync()
{
#ifndef DISKLESS_CONFIGURATION
doSync = true;
closing = false;
syncEvent.reset();
syncThread.create(startSyncToDisk, this);
#endif
}
void dbFile::stopSync()
{
#ifndef DISKLESS_CONFIGURATION
doSync = false;
syncEvent.signal();
syncThread.join();
#endif
}
void thread_proc dbFile::startSyncToDisk(void* arg)
{
((dbFile*)arg)->syncToDisk();
}
void thread_proc dbFile::startRecovery(void* arg)
{
RecoveryRequest* rr = (RecoveryRequest*)arg;
rr->file->doRecovery(rr->nodeId, rr->updateCounters, rr->nPages);
{
dbCriticalSection cs(rr->file->replCS);
if (--rr->file->nRecovered == 0) {
rr->file->recoveredEvent.signal();
}
}
delete rr;
}
void dbFile::recovery(int nodeId, int* updateCounters, int nPages)
{
RecoveryRequest* rr = new RecoveryRequest;
rr->nodeId = nodeId;
rr->updateCounters = updateCounters;
rr->nPages = nPages;
rr->file = this;
{
dbCriticalSection cs(replCS);
if (nRecovered++ == 0) {
recoveredEvent.reset();
}
}
dbThread recoveryThread;
recoveryThread.create(startRecovery, rr);
recoveryThread.setPriority(dbThread::THR_PRI_HIGH);
}
int dbFile::sendChanges(int nodeId, int* updateCounters, int nPages)
{
ReplicationRequest rr;
int maxUpdateCount = 0;
TRACE_MSG(("Database synchronization: mmapSize=%ld\n", mmapSize));
size_t i, j, n;
for (i = 0, j = 0, n = mmapSize >> dbModMapBlockBits; i < n; i++) {
if (updateCounters[i] > currUpdateCount[i]) {
updateCounters[i] = 0;
} else {
if (updateCounters[i] > maxUpdateCount) {
maxUpdateCount = updateCounters[i];
}
}
if (i > j && (currUpdateCount[i] <= updateCounters[i] || i-j >= dbMaxSyncSegmentSize
|| currUpdateCount[i] != currUpdateCount[j]))
{
db->con[nodeId].nRecoveredPages += (i - j);
rr.op = ReplicationRequest::RR_RECOVER_PAGE;
rr.nodeId = nodeId;
rr.size = (i-j)*dbModMapBlockSize;
rr.page.offs = j << dbModMapBlockBits;
rr.page.updateCount = currUpdateCount[j];
TRACE_MSG(("Send segment [%lx, %ld]\n", rr.page.offs, rr.size));
if (!db->writeReq(nodeId, rr, mmapAddr + rr.page.offs, rr.size)) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -