sync.cpp
来自「一个功能强大的内存数据库源代码,c++编写,有详细的注释」· C++ 代码 · 共 618 行
CPP
618 行
//-< SYNC.CPP >------------------------------------------------------*--------*// FastDB Version 1.0 (c) 1999 GARRET * ? *// (Main Memory Database Management System) * /\| *// * / \ *// Created: 20-Nov-98 K.A. Knizhnik * / [] \ *// Last update: 10-Dec-98 K.A. Knizhnik * GARRET *//-------------------------------------------------------------------*--------*// Intertask synchonization primitives//-------------------------------------------------------------------*--------*#define INSIDE_FASTDB#include "stdtp.h"#include "sync.h"#ifndef _WIN32unsigned dbSystem::getCurrentTimeMsec(){ struct timeval tv; gettimeofday(&tv, NULL); return tv.tv_sec*1000 + tv.tv_usec / 1000;}#ifndef USE_POSIX_API#include <errno.h>#include <signal.h>#define PRINT_ERROR(func) perror(func)char const* keyFileDir = "/tmp/";static void alarm_handler(int){}class moduleInitializer { public: moduleInitializer() { static struct sigaction sigact; sigact.sa_handler = alarm_handler; ::sigaction(SIGALRM, &sigact, NULL); }};static moduleInitializer initializer; // install SIGLARM handlerbool dbSharedMemory::open(char const* name, size_t size){ char* fileName = (char*)name; if (strchr(name, '/') == NULL) { fileName = new char[strlen(name)+strlen(keyFileDir)+1]; sprintf(fileName, "%s%s", keyFileDir, name); } int fd = ::open(fileName, O_RDWR|O_CREAT, 0777); if (fd < 0) { if (fileName != name) { delete[] fileName; } return false; } ::close(fd); int key = ftok(fileName, '0'); if (fileName != name) { delete[] fileName; } if (key < 0) { return false; } shm = shmget(key, DOALIGN(size, 4096), IPC_CREAT|0777); if (shm < 0) { return false; } ptr = (char*)shmat(shm, NULL, 0); return (ptr != (char*)-1);}void dbSharedMemory::close(){ shmdt((char*)ptr);}void dbSharedMemory::erase(){ close(); shmctl(shm, IPC_RMID, NULL);} int sem_init(int& sem, char const* name, unsigned init_value){ key_t key = IPC_PRIVATE; int semid; struct sembuf sops[3]; sops[0].sem_num = 1; sops[0].sem_op = 0; /* check if semaphore was already initialized */ sops[0].sem_flg = IPC_NOWAIT; sops[1].sem_num = 1; sops[1].sem_op = 1; /* mark semaphore as initialized */ sops[1].sem_flg = 0; sops[2].sem_num = 0; sops[2].sem_op = init_value; sops[2].sem_flg = 0; if (name != NULL) { int fd; char* path = (char*)name; if (strchr(name, '/') == NULL) { path = new char[strlen(name)+strlen(keyFileDir)+1]; sprintf(path, "%s%s", keyFileDir, name); } fd = open(path, O_WRONLY|O_CREAT, 0777); if (fd < 0) { if (path != name) { delete[] path; } PRINT_ERROR("open"); return -1; } close(fd); key = ftok(path, '0'); if (path != name) { delete[] path; } if (key < 0) { PRINT_ERROR("ftok"); return -1; } } semid = semget(key, 2, IPC_CREAT|0777); if (semid < 0) { PRINT_ERROR("semget"); return -1; } if (semop(semid, sops, items(sops)) && errno != EAGAIN) { PRINT_ERROR("semop"); return -1; } sem = semid; return 0;}enum wait_status { wait_ok, wait_timeout_expired, wait_error };static wait_status wait_semaphore(int& sem, unsigned msec, struct sembuf* sops, int n_sops){ if (msec != INFINITE) { struct timeval start; struct timeval stop; gettimeofday(&start, NULL); unsigned long usec = start.tv_usec + msec % 1000 * 1000; stop.tv_usec = usec % 1000000; stop.tv_sec = start.tv_sec + msec / 1000 + usec / 1000000; while (true) { struct itimerval it; it.it_interval.tv_sec = 0; it.it_interval.tv_usec = 0; it.it_value.tv_sec = stop.tv_sec - start.tv_sec; it.it_value.tv_usec = stop.tv_usec - start.tv_usec; if (stop.tv_usec < start.tv_usec) { it.it_value.tv_usec += 1000000; it.it_value.tv_sec -= 1; } if (setitimer(ITIMER_REAL, &it, NULL) < 0) { return wait_error; } if (semop(sem, sops, n_sops) == 0) { break; } if (errno != EINTR) { return wait_error; } gettimeofday(&start, NULL); if (stop.tv_sec < start.tv_sec || (stop.tv_sec == start.tv_sec && stop.tv_usec < start.tv_sec)) { return wait_timeout_expired; } } } else { while (semop(sem, sops, n_sops) < 0) { if (errno != EINTR) { return wait_error; } } } return wait_ok;}bool dbSemaphore::wait(unsigned msec){ static struct sembuf sops[] = {{0, -1, 0}}; wait_status ws = wait_semaphore(s, msec, sops, items(sops)); assert(ws != wait_error); return ws == wait_ok;}void dbSemaphore::signal(unsigned inc){ if (inc != 0) { struct sembuf sops[1]; sops[0].sem_num = 0; sops[0].sem_op = inc; sops[0].sem_flg = 0; int rc = semop(s, sops, 1); assert(rc == 0); }}#if (defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)) || defined(__FreeBSD__) /* union semun is defined by including <sys/sem.h> */ #elseunion semun { int val; struct semid_ds* buf; unsigned short* array;};#endifstatic union semun u;void dbSemaphore::reset() { u.val = 0; int rc = semctl(s, 0, SETVAL, u); assert(rc >= 0);}bool dbSemaphore::open(char const* name, unsigned init_value){ return sem_init(s, name, init_value) == 0;}void dbSemaphore::close() {}void dbSemaphore::erase() { semctl(s, 0, IPC_RMID, &u);}bool dbEvent::wait(unsigned msec){ static struct sembuf sops[] = {{0, -1, 0}, {0, 1, 0}}; wait_status ws = wait_semaphore(e, msec, sops, items(sops)); assert(ws != wait_error); return ws == wait_ok;}void dbEvent::signal(){ static struct sembuf sops[] = {{0, 0, IPC_NOWAIT}, {0, 1, 0}}; int rc = semop(e, sops, items(sops)); assert(rc == 0 || errno == EAGAIN); }void dbEvent::reset(){ static struct sembuf sops[] = {{0, -1, IPC_NOWAIT}}; int rc = semop(e, sops, items(sops)); assert(rc == 0 || errno == EAGAIN); }bool dbEvent::open(char const* name, bool signaled){ return sem_init(e, name, signaled) == 0;}void dbEvent::close() {}void dbEvent::erase() { semctl(e, 0, IPC_RMID, &u);}#if !defined(__osf__) && !defined(__sun)#if defined(__GNUC__) && defined(i386)void dbGlobalCriticalSection::enter(){ int inc = -1; __asm__ __volatile__( "lock; xadd %0,%1" :"=d" (inc), "=m" (*count) :"d" (inc), "m" (*count)); if (inc != 1) { static struct sembuf sops[] = {{0, -1, SEM_UNDO}}; int rc; while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR); assert(rc == 0); } }void dbGlobalCriticalSection::leave(){ int inc = 1; __asm__ __volatile__( "lock; xadd %0,%1" :"=d" (inc), "=m" (*count) :"d" (inc), "m" (*count)); if (inc != 0) { /* some other processes waiting to enter critical section */ static struct sembuf sops[] = {{0, 1, SEM_UNDO}}; int rc = semop(semid, sops, 1); assert(rc == 0); }}bool dbGlobalCriticalSection::create(char const* name, sharedsem_t* count){ this->count = count; *count = 1; return sem_init(semid, name, 0) == 0;}bool dbGlobalCriticalSection::open(char const* name, sharedsem_t* count){ this->count = count; return sem_init(semid, name, 0) == 0;}#else // defined(__GNUC__) && defined(i386)void dbGlobalCriticalSection::enter(){ static struct sembuf sops[] = {{0, -1, SEM_UNDO}}; int rc; while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR); assert(rc == 0);}void dbGlobalCriticalSection::leave(){ static struct sembuf sops[] = {{0, 1, SEM_UNDO}}; int rc = semop(semid, sops, 1); assert(rc == 0);}bool dbGlobalCriticalSection::open(char const* name, sharedsem_t*){ return sem_init(semid, name, 1) == 0;}bool dbGlobalCriticalSection::create(char const* name, sharedsem_t*){ return sem_init(semid, name, 1) == 0;}#endif // defined(__GNUC__) && defined(i386)void dbGlobalCriticalSection::erase(){ semctl(semid, 0, IPC_RMID, &u);}#endif // !defined(__osf__) && !defined(__sun) dbInitializationMutex::initializationStatus dbInitializationMutex::initialize(char const* name){ struct sembuf sops[4]; char* path = (char*)name; if (strchr(name, '/') == NULL) { path = new char[strlen(name)+strlen(keyFileDir)+1]; sprintf(path, "%s%s", keyFileDir, name); } int fd = open(path, O_WRONLY|O_CREAT, 0777); if (fd < 0) { if (path != name) { delete[] path; } PRINT_ERROR("open"); return InitializationError; } ::close(fd); int key = ftok(path, '0'); if (path != name) { delete[] path; } if (key < 0) { PRINT_ERROR("ftok"); return InitializationError; } while (true) { semid = semget(key, 3, IPC_CREAT|0777); if (semid < 0) { PRINT_ERROR("semget"); return InitializationError; } // Semaphore 0 - number of active processes // Semaphore 1 - intialization in progress (1 while initialization, 0 after it) // Semaphore 2 - semaphore was destroyed sops[0].sem_num = 0; sops[0].sem_op = 0; /* check if semaphore was already initialized */ sops[0].sem_flg = IPC_NOWAIT; sops[1].sem_num = 0; sops[1].sem_op = 1; /* increment number of active processes */ sops[1].sem_flg = SEM_UNDO; sops[2].sem_num = 1; sops[2].sem_op = 1; /* initialization in process */ sops[2].sem_flg = SEM_UNDO; sops[3].sem_num = 2; sops[3].sem_op = 0; /* check if semaphore was destroyed */ sops[3].sem_flg = IPC_NOWAIT; if (semop(semid, sops, 4) < 0) { if (errno == EAGAIN) { sops[0].sem_num = 0; sops[0].sem_op = -1; /* check if semaphore was already initialized */ sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT; sops[1].sem_num = 1; sops[1].sem_op = 0; /* wait until inialization completed */ sops[1].sem_flg = 0; sops[2].sem_num = 0; sops[2].sem_op = 2; /* increment number of active processes */ sops[3].sem_flg = SEM_UNDO; sops[3].sem_num = 2; sops[3].sem_op = 0; /* check if semaphore was destroyed */ sops[3].sem_flg = IPC_NOWAIT; if (semop(semid, sops, 4) == 0) { return AlreadyInitialized; } if (errno == EAGAIN) { sleep(1); continue; } } if (errno == EIDRM) { continue; } PRINT_ERROR("semop"); return InitializationError; } else { return NotYetInitialized; } }}void dbInitializationMutex::done() { struct sembuf sops[1]; sops[0].sem_num = 1; sops[0].sem_op = -1; /* initialization done */ sops[0].sem_flg = SEM_UNDO; int rc = semop(semid, sops, 1); assert(rc == 0);} bool dbInitializationMutex::close(){ int rc; struct sembuf sops[3]; while (true) { sops[0].sem_num = 0; sops[0].sem_op = -1; /* decrement process couter */ sops[0].sem_flg = SEM_UNDO; sops[1].sem_num = 0; sops[1].sem_op = 0; /* check if there are no more active processes */ sops[1].sem_flg = IPC_NOWAIT; sops[2].sem_num = 2; sops[2].sem_op = 1; /* mark as destructed */ sops[2].sem_flg = SEM_UNDO; if ((rc = semop(semid, sops, 3)) == 0) { return true; } else { assert(errno == EAGAIN); } sops[0].sem_num = 0; sops[0].sem_op = -2; /* decrement process couter and check for non-zero */ sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT; sops[1].sem_num = 0; sops[1].sem_op = 1; sops[1].sem_flg = SEM_UNDO; if ((rc = semop(semid, sops, 2)) == 0) { return false; } else { assert(errno == EAGAIN); } }}void dbInitializationMutex::erase(){ semctl(semid, 0, IPC_RMID, &u);}#endif // USE_POSIX_API#ifndef NO_PTHREADS#if defined(_SC_NPROCESSORS_ONLN) int dbThread::numberOfProcessors() { return sysconf(_SC_NPROCESSORS_ONLN); }#elif defined(__linux__)#include <linux/smp.h>int dbThread::numberOfProcessors() { return smp_num_cpus; }#elif defined(__FreeBSD__)#include <sys/sysctl.h>int dbThread::numberOfProcessors() { int mib[2],ncpus=0; size_t len=sizeof(ncpus); mib[0]= CTL_HW; mib[1]= HW_NCPU; sysctl(mib,2,&ncpus,&len,NULL,0); return ncpus; }#elseint dbThread::numberOfProcessors() { return 1; }#endif#endif // NO_PTHREADS#else // _WIN32unsigned dbSystem::getCurrentTimeMsec(){ return GetTickCount();}#ifdef SET_NULL_DACLdbNullSecurityDesciptor dbNullSecurityDesciptor::instance;#endif#endifvoid thread_proc dbPooledThread::pooledThreadFunc(void* arg){ ((dbPooledThread*)arg)->run();}dbPooledThread::dbPooledThread(dbThreadPool* threadPool){ pool = threadPool; startSem.open(); readySem.open(); next = NULL; running = true; thread.create(&pooledThreadFunc, this);}dbPooledThread::~dbPooledThread(){ startSem.close(); readySem.close();}void dbPooledThread::stop() { running = false; startSem.signal(); readySem.wait(pool->mutex);}void dbPooledThread::run() { dbCriticalSection cs(pool->mutex); while (true) { startSem.wait(pool->mutex); if (!running) { break; } (*f)(arg); readySem.signal(); } readySem.signal();} void dbThreadPool::join(dbPooledThread* thr) { dbCriticalSection cs(mutex); thr->readySem.wait(mutex); thr->next = freeThreads; freeThreads = thr;}dbPooledThread* dbThreadPool::create(dbThread::thread_proc_t f, void* arg){ dbCriticalSection cs(mutex); dbPooledThread* t = freeThreads; if (t == NULL) { t = freeThreads = new dbPooledThread(this); } freeThreads = t->next; t->f = f; t->arg = arg; t->startSem.signal(); return t;}dbThreadPool::dbThreadPool(){ freeThreads = NULL;} dbThreadPool::~dbThreadPool(){ dbCriticalSection cs(mutex); dbPooledThread *t, *next; for (t = freeThreads; t != NULL; t = next) { next = t->next; t->stop(); delete t; } }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?