sync.cpp
来自「FastDb是高效的内存数据库系统」· C++ 代码 · 共 856 行 · 第 1/2 页
CPP
856 行
void dbGlobalCriticalSection::leave()
{
int inc = 1;
__asm__ __volatile__(
"lock; xadd %0,%1"
:"=d" (inc), "=m" (*count)
:"d" (inc), "m" (*count));
if (inc != 0)
{
/* some other processes waiting to enter critical section */
static struct sembuf sops[] =
{
{
0, 1, 0
}
};
int rc = semop(semid, sops, 1);
assert(rc == 0);
}
}
bool dbGlobalCriticalSection::create(char const* name, sharedsem_t* count)
{
this->count = count;
*count = 1;
return sem_init(semid, name, 0) == 0;
}
bool dbGlobalCriticalSection::open(char const* name, sharedsem_t* count)
{
this->count = count;
return sem_init(semid, name, 0) == 0;
}
#else // defined(__GNUC__) && defined(i386)
// "lowest" case, use a SysV semaphore for complete portability
void dbGlobalCriticalSection::enter()
{
static struct sembuf sops[] =
{
{
0, -1, SEM_UNDO
}
};
int rc;
while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR)
;
assert(rc == 0);
}
void dbGlobalCriticalSection::leave()
{
static struct sembuf sops[] =
{
{
0, 1, SEM_UNDO
}
};
int rc = semop(semid, sops, 1);
assert(rc == 0);
}
bool dbGlobalCriticalSection::open(char const* name, sharedsem_t*)
{
// XXX: sem_init is Posix, the rest of these calls are SysV.
return sem_init(semid, name, 1) == 0;
}
bool dbGlobalCriticalSection::create(char const* name, sharedsem_t*)
{
return sem_init(semid, name, 1) == 0;
}
#endif // defined(__GNUC__) && defined(i386)
void dbGlobalCriticalSection::erase()
{
semctl(semid, 0, IPC_RMID, &u);
}
dbInitializationMutex::initializationStatus
dbInitializationMutex::initialize(char const* name)
{
struct sembuf sops[4];
char* path = (char*)name;
if (strchr(name, '/') == NULL)
{
path = new char[strlen(name)+strlen(keyFileDir)+1];
sprintf(path, "%s%s", keyFileDir, name);
}
int fd = open(path, O_WRONLY|O_CREAT, 0777);
if (fd < 0)
{
if (path != name)
{
delete[] path;
}
PRINT_ERROR("open");
return InitializationError;
}
::close(fd);
int key = ftok(path, '0');
if (path != name)
{
delete[] path;
}
if (key < 0)
{
PRINT_ERROR("ftok");
return InitializationError;
}
while (true)
{
semid = semget(key, 3, IPC_CREAT|0777);
if (semid < 0)
{
PRINT_ERROR("semget");
return InitializationError;
}
// Semaphore 0 - number of active processes
// Semaphore 1 - intialization in progress (1 while initialization, 0 after it)
// Semaphore 2 - semaphore was destroyed
sops[0].sem_num = 0;
sops[0].sem_op = 0; /* check if semaphore was already initialized */
sops[0].sem_flg = IPC_NOWAIT;
sops[1].sem_num = 0;
sops[1].sem_op = 1; /* increment number of active processes */
sops[1].sem_flg = SEM_UNDO;
sops[2].sem_num = 1;
sops[2].sem_op = 1; /* initialization in process */
sops[2].sem_flg = SEM_UNDO;
sops[3].sem_num = 2;
sops[3].sem_op = 0; /* check if semaphore was destroyed */
sops[3].sem_flg = IPC_NOWAIT;
if (semop(semid, sops, 4) < 0)
{
if (errno == EAGAIN)
{
sops[0].sem_num = 0;
sops[0].sem_op = -1; /* check if semaphore was already initialized */
sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;
sops[1].sem_num = 1;
sops[1].sem_op = 0; /* wait until inialization completed */
sops[1].sem_flg = 0;
sops[2].sem_num = 0;
sops[2].sem_op = 2; /* increment number of active processes */
sops[3].sem_flg = SEM_UNDO;
sops[3].sem_num = 2;
sops[3].sem_op = 0; /* check if semaphore was destroyed */
sops[3].sem_flg = IPC_NOWAIT;
if (semop(semid, sops, 4) == 0)
{
return AlreadyInitialized;
}
if (errno == EAGAIN)
{
sleep(1);
continue;
}
}
if (errno == EIDRM)
{
continue;
}
PRINT_ERROR("semop");
return InitializationError;
}
else
{
return NotYetInitialized;
}
}
}
void dbInitializationMutex::done()
{
struct sembuf sops[1];
sops[0].sem_num = 1;
sops[0].sem_op = -1; /* initialization done */
sops[0].sem_flg = SEM_UNDO;
int rc = semop(semid, sops, 1);
assert(rc == 0);
}
bool dbInitializationMutex::close()
{
int rc;
struct sembuf sops[3];
while (true)
{
sops[0].sem_num = 0;
sops[0].sem_op = -1; /* decrement process couter */
sops[0].sem_flg = SEM_UNDO;
sops[1].sem_num = 0;
sops[1].sem_op = 0; /* check if there are no more active processes */
sops[1].sem_flg = IPC_NOWAIT;
sops[2].sem_num = 2;
sops[2].sem_op = 1; /* mark as destructed */
sops[2].sem_flg = SEM_UNDO;
if ((rc = semop(semid, sops, 3)) == 0)
{
return true;
}
else
{
assert(errno == EAGAIN);
}
sops[0].sem_num = 0;
sops[0].sem_op = -2; /* decrement process couter and check for non-zero */
sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;
sops[1].sem_num = 0;
sops[1].sem_op = 1;
sops[1].sem_flg = SEM_UNDO;
if ((rc = semop(semid, sops, 2)) == 0)
{
return false;
}
else
{
assert(errno == EAGAIN);
}
}
}
void dbInitializationMutex::erase()
{
semctl(semid, 0, IPC_RMID, &u);
}
#endif // !USE_POSIX_SEMAPHORES
// Thread stuff
#ifndef NO_PTHREADS
#if defined(_SC_NPROCESSORS_ONLN)
int dbThread::numberOfProcessors()
{
return sysconf(_SC_NPROCESSORS_ONLN);
}
#elif defined(__linux__)
#include <linux/smp.h>
int dbThread::numberOfProcessors()
{
return smp_num_cpus;
}
#elif defined(__FreeBSD__)
#include <sys/sysctl.h>
int dbThread::numberOfProcessors()
{
int mib[2],ncpus=0;
size_t len=sizeof(ncpus);
mib[0]= CTL_HW;
mib[1]= HW_NCPU;
sysctl(mib,2,&ncpus,&len,NULL,0);
return ncpus;
}
#else
int dbThread::numberOfProcessors()
{
return 1;
}
#endif
#endif // NO_PTHREADS
#else // _WIN32
// Win32 specific code
unsigned dbSystem::getCurrentTimeMsec()
{
return GetTickCount();
}
#ifdef SET_NULL_DACL
dbNullSecurityDesciptor dbNullSecurityDesciptor::instance;
#endif
#endif // __WIN32
//////////////////////////////////////////////////////////////
// Common W32 and Unix platform code follows
void thread_proc dbPooledThread::pooledThreadFunc(void* arg)
{
((dbPooledThread*)arg)->run();
}
dbPooledThread::dbPooledThread(dbThreadPool* threadPool)
{
pool = threadPool;
startSem.open();
readySem.open();
next = NULL;
running = true;
thread.create(&pooledThreadFunc, this);
}
dbPooledThread::~dbPooledThread()
{
startSem.close();
readySem.close();
}
void dbPooledThread::stop()
{
running = false;
startSem.signal();
readySem.wait(pool->mutex);
}
void dbPooledThread::run()
{
dbCriticalSection cs(pool->mutex);
while (true)
{
startSem.wait(pool->mutex);
if (!running)
{
break;
}
(*f)(arg);
readySem.signal();
}
readySem.signal();
}
void dbThreadPool::join(dbPooledThread* thr)
{
dbCriticalSection cs(mutex);
thr->readySem.wait(mutex);
thr->next = freeThreads;
freeThreads = thr;
}
dbPooledThread* dbThreadPool::create(dbThread::thread_proc_t f, void* arg)
{
dbCriticalSection cs(mutex);
dbPooledThread* t = freeThreads;
if (t == NULL)
{
t = freeThreads = new dbPooledThread(this);
}
freeThreads = t->next;
t->f = f;
t->arg = arg;
t->startSem.signal();
return t;
}
dbThreadPool::dbThreadPool()
{
freeThreads = NULL;
}
dbThreadPool::~dbThreadPool()
{
dbCriticalSection cs(mutex);
dbPooledThread *t, *next;
for (t = freeThreads; t != NULL; t = next)
{
next = t->next;
t->stop();
delete t;
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?