📄 sync.cpp
字号:
#if defined(USE_INTERNAL_CS_IMPL)
////////////////////////////////////////////////////////////////////
// dbGLobalCriticalSection internal implementation
#if defined(__GNUC__) && (defined(__x86_64__) || defined(__i386__)) && !defined(RECOVERABLE_CRITICAL_SECTION)
void dbGlobalCriticalSection::enter()
{
int inc = -1;
__asm__ __volatile__(
"lock; xadd %0,%1"
:"=d" (inc), "=m" (*count)
:"d" (inc), "m" (*count));
if (inc != 1) {
static struct sembuf sops[] = {{0, -1, 0}};
int rc;
while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);
assert(rc == 0);
}
#if GLOBAL_CS_DEBUG
owner = pthread_self();
#endif
}
void dbGlobalCriticalSection::leave()
{
int inc = 1;
#if GLOBAL_CS_DEBUG
owner = 0;
#endif
__asm__ __volatile__(
"lock; xadd %0,%1"
:"=d" (inc), "=m" (*count)
:"d" (inc), "m" (*count));
if (inc != 0) {
/* some other processes waiting to enter critical section */
static struct sembuf sops[] = {{0, 1, 0}};
int rc = semop(semid, sops, 1);
assert(rc == 0);
}
}
bool dbGlobalCriticalSection::create(char const* name, sharedsem_t* count)
{
this->count = count;
*count = 1;
return sem_init(semid, name, 0) == 0;
}
bool dbGlobalCriticalSection::open(char const* name, sharedsem_t* count)
{
this->count = count;
return sem_init(semid, name, 0) == 0;
}
#elif !defined(RECOVERABLE_CRITICAL_SECTION) && defined(__GNUC__) && defined (__GNUC_MINOR__) && ((4 < __GNUC__) || (4 == __GNUC__ && 1 <= __GNUC_MINOR__))
void dbGlobalCriticalSection::enter()
{
if (__sync_add_and_fetch(count, 1) != 1) {
static struct sembuf sops[] = {{0, -1, 0}};
int rc;
while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);
assert(rc == 0);
}
#if GLOBAL_CS_DEBUG
owner = pthread_self();
#endif
}
void dbGlobalCriticalSection::leave()
{
#if GLOBAL_CS_DEBUG
owner = 0;
#endif
if (__sync_add_and_fetch(count, -1) != 0) {
/* some other processes waiting to enter critical section */
static struct sembuf sops[] = {{0, 1, 0}};
int rc = semop(semid, sops, 1);
assert(rc == 0);
}
}
bool dbGlobalCriticalSection::create(char const* name, sharedsem_t* count)
{
this->count = count;
*count = 0;
return sem_init(semid, name, 0) == 0;
}
bool dbGlobalCriticalSection::open(char const* name, sharedsem_t* count)
{
this->count = count;
return sem_init(semid, name, 0) == 0;
}
#else // defined(__GNUC__) && defined(i386)
// "lowest" case, use a SysV semaphore for complete portability
void dbGlobalCriticalSection::enter()
{
static struct sembuf sops[] = {{0, -1, SEM_UNDO}};
int rc;
while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);
assert(rc == 0);
#if GLOBAL_CS_DEBUG
owner = pthread_self();
#endif
}
void dbGlobalCriticalSection::leave()
{
#if GLOBAL_CS_DEBUG
owner = 0;
#endif
static struct sembuf sops[] = {{0, 1, SEM_UNDO}};
int rc = semop(semid, sops, 1);
assert(rc == 0);
}
bool dbGlobalCriticalSection::open(char const* name, sharedsem_t*)
{
// XXX: sem_init is Posix, the rest of these calls are SysV.
return sem_init(semid, name, 1) == 0;
}
bool dbGlobalCriticalSection::create(char const* name, sharedsem_t*)
{
return sem_init(semid, name, 1) == 0;
}
#endif // defined(__GNUC__) && defined(i386)
void dbGlobalCriticalSection::erase()
{
semctl(semid, 0, IPC_RMID, &u);
}
#endif // USE_INTERNAL_CS_IMPL
dbInitializationMutex::initializationStatus
dbInitializationMutex::initialize(char const* name)
{
struct sembuf sops[4];
char* path = (char*)name;
if (strchr(name, '/') == NULL) {
path = new char[strlen(name)+strlen(keyFileDir)+1];
sprintf(path, "%s%s", keyFileDir, name);
}
int fd = open(path, O_WRONLY|O_CREAT, 0777);
if (fd < 0) {
if (path != name) {
delete[] path;
}
PRINT_ERROR("open");
return InitializationError;
}
::close(fd);
int key = getKeyFromFile(path);
if (path != name) {
delete[] path;
}
if (key < 0) {
PRINT_ERROR("getKeyFromFile");
return InitializationError;
}
while (true) {
semid = semget(key, 3, IPC_CREAT|0777);
if (semid < 0) {
PRINT_ERROR("semget");
return InitializationError;
}
// Semaphore 0 - number of active processes
// Semaphore 1 - intialization in progress (1 while initialization, 0 after it)
// Semaphore 2 - semaphore was destroyed
sops[0].sem_num = 0;
sops[0].sem_op = 0; /* check if semaphore was already initialized */
sops[0].sem_flg = IPC_NOWAIT;
sops[1].sem_num = 0;
sops[1].sem_op = 1; /* increment number of active processes */
sops[1].sem_flg = SEM_UNDO;
sops[2].sem_num = 1;
sops[2].sem_op = 1; /* initialization in process */
sops[2].sem_flg = SEM_UNDO;
sops[3].sem_num = 2;
sops[3].sem_op = 0; /* check if semaphore was destroyed */
sops[3].sem_flg = IPC_NOWAIT;
if (semop(semid, sops, 4) < 0) {
if (errno == EAGAIN) {
sops[0].sem_num = 0;
sops[0].sem_op = -1; /* check if semaphore was already initialized */
sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;
sops[1].sem_num = 1;
sops[1].sem_op = 0; /* wait until inialization completed */
sops[1].sem_flg = 0;
sops[2].sem_num = 0;
sops[2].sem_op = 2; /* increment number of active processes */
sops[2].sem_flg = SEM_UNDO;
sops[3].sem_num = 2;
sops[3].sem_op = 0; /* check if semaphore was destroyed */
sops[3].sem_flg = IPC_NOWAIT;
if (semop(semid, sops, 4) == 0) {
return AlreadyInitialized;
}
if (errno == EAGAIN) {
sleep(1);
continue;
}
}
if (errno == EIDRM) {
continue;
}
PRINT_ERROR("semop");
return InitializationError;
} else {
return NotYetInitialized;
}
}
}
void dbInitializationMutex::done()
{
struct sembuf sops[1];
sops[0].sem_num = 1;
sops[0].sem_op = -1; /* initialization done */
sops[0].sem_flg = SEM_UNDO;
int rc = semop(semid, sops, 1);
assert(rc == 0);
}
bool dbInitializationMutex::close()
{
int rc;
struct sembuf sops[3];
while (true) {
sops[0].sem_num = 0;
sops[0].sem_op = -1; /* decrement process couter */
sops[0].sem_flg = SEM_UNDO;
sops[1].sem_num = 0;
sops[1].sem_op = 0; /* check if there are no more active processes */
sops[1].sem_flg = IPC_NOWAIT;
sops[2].sem_num = 2;
sops[2].sem_op = 1; /* mark as destructed */
sops[2].sem_flg = SEM_UNDO;
if ((rc = semop(semid, sops, 3)) == 0) {
return true;
} else {
assert(errno == EAGAIN);
}
sops[0].sem_num = 0;
sops[0].sem_op = -2; /* decrement process couter and check for non-zero */
sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;
sops[1].sem_num = 0;
sops[1].sem_op = 1;
sops[1].sem_flg = SEM_UNDO;
if ((rc = semop(semid, sops, 2)) == 0) {
return false;
} else {
assert(errno == EAGAIN);
}
}
}
void dbInitializationMutex::erase()
{
semctl(semid, 0, IPC_RMID, &u);
}
#endif // !USE_POSIX_SEMAPHORES
// Thread stuff
#ifndef NO_PTHREADS
#if defined(_SC_NPROCESSORS_ONLN)
int dbThread::numberOfProcessors() {
return sysconf(_SC_NPROCESSORS_ONLN);
}
#elif defined(__linux__)
END_FASTDB_NAMESPACE
#include <linux/smp.h>
BEGIN_FASTDB_NAMESPACE
int dbThread::numberOfProcessors() { return smp_num_cpus; }
#elif defined(__FreeBSD__)
END_FASTDB_NAMESPACE
#include <sys/sysctl.h>
BEGIN_FASTDB_NAMESPACE
int dbThread::numberOfProcessors() {
int mib[2],ncpus=0;
size_t len=sizeof(ncpus);
mib[0]= CTL_HW;
mib[1]= HW_NCPU;
sysctl(mib,2,&ncpus,&len,NULL,0);
return ncpus;
}
#else
int dbThread::numberOfProcessors() { return 1; }
#endif
#endif // NO_PTHREADS
#else // _WIN32
// Win32 specific code
unsigned dbSystem::getCurrentTimeMsec()
{
return GetTickCount();
}
#ifdef SET_NULL_DACL
dbNullSecurityDesciptor dbNullSecurityDesciptor::instance;
#endif
#endif // __WIN32
//////////////////////////////////////////////////////////////
// Common W32 and Unix platform code follows
void thread_proc dbPooledThread::pooledThreadFunc(void* arg)
{
((dbPooledThread*)arg)->run();
}
dbPooledThread::dbPooledThread(dbThreadPool* threadPool)
{
pool = threadPool;
startSem.open();
readySem.open();
next = NULL;
running = true;
thread.create(&pooledThreadFunc, this);
}
dbPooledThread::~dbPooledThread()
{
startSem.close();
readySem.close();
}
void dbPooledThread::stop()
{
running = false;
startSem.signal();
readySem.wait(pool->mutex);
}
void dbPooledThread::run()
{
dbCriticalSection cs(pool->mutex);
while (true) {
startSem.wait(pool->mutex);
if (!running) {
break;
}
(*f)(arg);
readySem.signal();
}
readySem.signal();
}
void dbThreadPool::join(dbPooledThread* thr)
{
dbCriticalSection cs(mutex);
thr->readySem.wait(mutex);
thr->next = freeThreads;
freeThreads = thr;
}
dbPooledThread* dbThreadPool::create(dbThread::thread_proc_t f, void* arg)
{
dbCriticalSection cs(mutex);
dbPooledThread* t = freeThreads;
if (t == NULL) {
t = freeThreads = new dbPooledThread(this);
}
freeThreads = t->next;
t->f = f;
t->arg = arg;
t->startSem.signal();
return t;
}
dbThreadPool::dbThreadPool()
{
freeThreads = NULL;
}
dbThreadPool::~dbThreadPool()
{
dbCriticalSection cs(mutex);
dbPooledThread *t, *next;
for (t = freeThreads; t != NULL; t = next) {
next = t->next;
t->stop();
delete t;
}
}
END_FASTDB_NAMESPACE
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -