📄 sync.cpp
字号:
#if defined(USE_LOCAL_CS_IMPL)////////////////////////////////////////////////////////////////////// dbGLobalCriticalSection local implementation// If we are on an i386 based platform, we can use the processor primitives// (XXX: although do these work with multiprocessor? shouldn't this be// under the non-pthread based implemenation?)#if defined(__GNUC__) && defined(i386) && !defined(RECOVERABLE_CRITICAL_SECTION)void dbGlobalCriticalSection::enter(){ int inc = -1; __asm__ __volatile__( "lock; xadd %0,%1" :"=d" (inc), "=m" (*count) :"d" (inc), "m" (*count)); if (inc != 1) { static struct sembuf sops[] = {{0, -1, 0}}; int rc; while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR); assert(rc == 0); } }void dbGlobalCriticalSection::leave(){ int inc = 1; __asm__ __volatile__( "lock; xadd %0,%1" :"=d" (inc), "=m" (*count) :"d" (inc), "m" (*count)); if (inc != 0) { /* some other processes waiting to enter critical section */ static struct sembuf sops[] = {{0, 1, 0}}; int rc = semop(semid, sops, 1); assert(rc == 0); }}bool dbGlobalCriticalSection::create(char const* name, sharedsem_t* count){ this->count = count; *count = 1; return sem_init(semid, name, 0) == 0;}bool dbGlobalCriticalSection::open(char const* name, sharedsem_t* count){ this->count = count; return sem_init(semid, name, 0) == 0;}#else // defined(__GNUC__) && defined(i386)// "lowest" case, use a SysV semaphore for complete portabilityvoid dbGlobalCriticalSection::enter(){ static struct sembuf sops[] = {{0, -1, SEM_UNDO}}; int rc; while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR); assert(rc == 0);}void dbGlobalCriticalSection::leave(){ static struct sembuf sops[] = {{0, 1, SEM_UNDO}}; int rc = semop(semid, sops, 1); assert(rc == 0);}bool dbGlobalCriticalSection::open(char const* name, sharedsem_t*){// XXX: sem_init is Posix, the rest of these calls are SysV. return sem_init(semid, name, 1) == 0;}bool dbGlobalCriticalSection::create(char const* name, sharedsem_t*){ return sem_init(semid, name, 1) == 0;}#endif // defined(__GNUC__) && defined(i386)void dbGlobalCriticalSection::erase(){ semctl(semid, 0, IPC_RMID, &u);}#endif // USE_LOCAL_CS_IMPLdbInitializationMutex::initializationStatus dbInitializationMutex::initialize(char const* name){ struct sembuf sops[4]; char* path = (char*)name; if (strchr(name, '/') == NULL) { path = new char[strlen(name)+strlen(keyFileDir)+1]; sprintf(path, "%s%s", keyFileDir, name); } int fd = open(path, O_WRONLY|O_CREAT, 0777); if (fd < 0) { if (path != name) { delete[] path; } PRINT_ERROR("open"); return InitializationError; } ::close(fd); int key = ftok(path, '0'); if (path != name) { delete[] path; } if (key < 0) { PRINT_ERROR("ftok"); return InitializationError; } while (true) { semid = semget(key, 3, IPC_CREAT|0777); if (semid < 0) { PRINT_ERROR("semget"); return InitializationError; } // Semaphore 0 - number of active processes // Semaphore 1 - intialization in progress (1 while initialization, 0 after it) // Semaphore 2 - semaphore was destroyed sops[0].sem_num = 0; sops[0].sem_op = 0; /* check if semaphore was already initialized */ sops[0].sem_flg = IPC_NOWAIT; sops[1].sem_num = 0; sops[1].sem_op = 1; /* increment number of active processes */ sops[1].sem_flg = SEM_UNDO; sops[2].sem_num = 1; sops[2].sem_op = 1; /* initialization in process */ sops[2].sem_flg = SEM_UNDO; sops[3].sem_num = 2; sops[3].sem_op = 0; /* check if semaphore was destroyed */ sops[3].sem_flg = IPC_NOWAIT; if (semop(semid, sops, 4) < 0) { if (errno == EAGAIN) { sops[0].sem_num = 0; sops[0].sem_op = -1; /* check if semaphore was already initialized */ sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT; sops[1].sem_num = 1; sops[1].sem_op = 0; /* wait until inialization completed */ sops[1].sem_flg = 0; sops[2].sem_num = 0; sops[2].sem_op = 2; /* increment number of active processes */ sops[3].sem_flg = SEM_UNDO; sops[3].sem_num = 2; sops[3].sem_op = 0; /* check if semaphore was destroyed */ sops[3].sem_flg = IPC_NOWAIT; if (semop(semid, sops, 4) == 0) { return AlreadyInitialized; } if (errno == EAGAIN) { sleep(1); continue; } } if (errno == EIDRM) { continue; } PRINT_ERROR("semop"); return InitializationError; } else { return NotYetInitialized; } }}void dbInitializationMutex::done() { struct sembuf sops[1]; sops[0].sem_num = 1; sops[0].sem_op = -1; /* initialization done */ sops[0].sem_flg = SEM_UNDO; int rc = semop(semid, sops, 1); assert(rc == 0);} bool dbInitializationMutex::close(){ int rc; struct sembuf sops[3]; while (true) { sops[0].sem_num = 0; sops[0].sem_op = -1; /* decrement process couter */ sops[0].sem_flg = SEM_UNDO; sops[1].sem_num = 0; sops[1].sem_op = 0; /* check if there are no more active processes */ sops[1].sem_flg = IPC_NOWAIT; sops[2].sem_num = 2; sops[2].sem_op = 1; /* mark as destructed */ sops[2].sem_flg = SEM_UNDO; if ((rc = semop(semid, sops, 3)) == 0) { return true; } else { assert(errno == EAGAIN); } sops[0].sem_num = 0; sops[0].sem_op = -2; /* decrement process couter and check for non-zero */ sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT; sops[1].sem_num = 0; sops[1].sem_op = 1; sops[1].sem_flg = SEM_UNDO; if ((rc = semop(semid, sops, 2)) == 0) { return false; } else { assert(errno == EAGAIN); } }}void dbInitializationMutex::erase(){ semctl(semid, 0, IPC_RMID, &u);}#endif // !USE_POSIX_SEMAPHORES// Thread stuff#ifndef NO_PTHREADS#if defined(_SC_NPROCESSORS_ONLN) int dbThread::numberOfProcessors() { return sysconf(_SC_NPROCESSORS_ONLN); }#elif defined(__linux__)END_FASTDB_NAMESPACE#include <linux/smp.h>BEGIN_FASTDB_NAMESPACEint dbThread::numberOfProcessors() { return smp_num_cpus; }#elif defined(__FreeBSD__)END_FASTDB_NAMESPACE#include <sys/sysctl.h>BEGIN_FASTDB_NAMESPACEint dbThread::numberOfProcessors() { int mib[2],ncpus=0; size_t len=sizeof(ncpus); mib[0]= CTL_HW; mib[1]= HW_NCPU; sysctl(mib,2,&ncpus,&len,NULL,0); return ncpus; }#elseint dbThread::numberOfProcessors() { return 1; }#endif#endif // NO_PTHREADS#else // _WIN32// Win32 specific codeunsigned dbSystem::getCurrentTimeMsec(){ return GetTickCount();}#ifdef SET_NULL_DACLdbNullSecurityDesciptor dbNullSecurityDesciptor::instance;#endif#endif // __WIN32//////////////////////////////////////////////////////////////// Common W32 and Unix platform code followsvoid thread_proc dbPooledThread::pooledThreadFunc(void* arg){ ((dbPooledThread*)arg)->run();}dbPooledThread::dbPooledThread(dbThreadPool* threadPool){ pool = threadPool; startSem.open(); readySem.open(); next = NULL; running = true; thread.create(&pooledThreadFunc, this);}dbPooledThread::~dbPooledThread(){ startSem.close(); readySem.close();}void dbPooledThread::stop() { running = false; startSem.signal(); readySem.wait(pool->mutex);}void dbPooledThread::run() { dbCriticalSection cs(pool->mutex); while (true) { startSem.wait(pool->mutex); if (!running) { break; } (*f)(arg); readySem.signal(); } readySem.signal();} void dbThreadPool::join(dbPooledThread* thr) { dbCriticalSection cs(mutex); thr->readySem.wait(mutex); thr->next = freeThreads; freeThreads = thr;}dbPooledThread* dbThreadPool::create(dbThread::thread_proc_t f, void* arg){ dbCriticalSection cs(mutex); dbPooledThread* t = freeThreads; if (t == NULL) { t = freeThreads = new dbPooledThread(this); } freeThreads = t->next; t->f = f; t->arg = arg; t->startSem.signal(); return t;}dbThreadPool::dbThreadPool(){ freeThreads = NULL;} dbThreadPool::~dbThreadPool(){ dbCriticalSection cs(mutex); dbPooledThread *t, *next; for (t = freeThreads; t != NULL; t = next) { next = t->next; t->stop(); delete t; } }END_FASTDB_NAMESPACE
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -