⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sync.cpp

📁 FastDb是高效的内存数据库系统
💻 CPP
📖 第 1 页 / 共 2 页
字号:
#if defined(USE_LOCAL_CS_IMPL)////////////////////////////////////////////////////////////////////// dbGLobalCriticalSection local implementation// If we are on an i386 based platform, we can use the processor primitives//   (XXX: although do these work with multiprocessor?   shouldn't this be//    under the non-pthread based implemenation?)#if defined(__GNUC__) && defined(i386) && !defined(RECOVERABLE_CRITICAL_SECTION)void dbGlobalCriticalSection::enter(){    int inc = -1;    __asm__ __volatile__(                        "lock; xadd %0,%1"                        :"=d" (inc), "=m" (*count)                        :"d" (inc), "m" (*count));    if (inc != 1) {         static struct sembuf sops[] = {{0, -1, 0}};        int rc;        while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);        assert(rc == 0);    }                                  }void dbGlobalCriticalSection::leave(){    int inc = 1;    __asm__ __volatile__(                        "lock; xadd %0,%1"                        :"=d" (inc), "=m" (*count)                        :"d" (inc), "m" (*count));    if (inc != 0) {         /* some other processes waiting to enter critical section */        static struct sembuf sops[] = {{0, 1, 0}};        int rc = semop(semid, sops, 1);        assert(rc == 0);    }}bool dbGlobalCriticalSection::create(char const* name, sharedsem_t* count){    this->count = count;    *count = 1;    return sem_init(semid, name, 0) == 0;}bool dbGlobalCriticalSection::open(char const* name, sharedsem_t* count){    this->count = count;    return sem_init(semid, name, 0) == 0;}#else // defined(__GNUC__) && defined(i386)// "lowest" case, use a SysV semaphore for complete portabilityvoid dbGlobalCriticalSection::enter(){    static struct sembuf sops[] = {{0, -1, SEM_UNDO}};    int rc;    while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);    assert(rc == 0);}void dbGlobalCriticalSection::leave(){    static struct sembuf sops[] = {{0, 1, SEM_UNDO}};    int rc = semop(semid, sops, 1);    assert(rc == 0);}bool dbGlobalCriticalSection::open(char const* name, sharedsem_t*){// XXX: sem_init is Posix, the rest of these calls are SysV.    return sem_init(semid, name, 1) == 0;}bool dbGlobalCriticalSection::create(char const* name, sharedsem_t*){    return sem_init(semid, name, 1) == 0;}#endif // defined(__GNUC__) && defined(i386)void dbGlobalCriticalSection::erase(){    semctl(semid, 0, IPC_RMID, &u);}#endif // USE_LOCAL_CS_IMPLdbInitializationMutex::initializationStatus dbInitializationMutex::initialize(char const* name){    struct sembuf sops[4];    char* path = (char*)name;    if (strchr(name, '/') == NULL) {         path = new char[strlen(name)+strlen(keyFileDir)+1];        sprintf(path, "%s%s", keyFileDir, name);    }    int fd = open(path, O_WRONLY|O_CREAT, 0777);    if (fd < 0) {        if (path != name) {             delete[] path;        }        PRINT_ERROR("open");        return InitializationError;    }    ::close(fd);    int key = ftok(path, '0');    if (path != name) {         delete[] path;    }    if (key < 0) {        PRINT_ERROR("ftok");        return InitializationError;    }    while (true) {         semid = semget(key, 3, IPC_CREAT|0777);        if (semid < 0) {             PRINT_ERROR("semget");            return InitializationError;        }        // Semaphore 0 - number of active processes        // Semaphore 1 - intialization in progress (1 while initialization, 0 after it)        // Semaphore 2 - semaphore was destroyed                sops[0].sem_num = 0;        sops[0].sem_op  = 0; /* check if semaphore was already initialized */        sops[0].sem_flg = IPC_NOWAIT;        sops[1].sem_num = 0;        sops[1].sem_op  = 1; /* increment number of active processes */        sops[1].sem_flg = SEM_UNDO;        sops[2].sem_num = 1;        sops[2].sem_op  = 1; /* initialization in process */        sops[2].sem_flg = SEM_UNDO;        sops[3].sem_num = 2;        sops[3].sem_op  = 0; /* check if semaphore was destroyed */        sops[3].sem_flg = IPC_NOWAIT;        if (semop(semid, sops, 4) < 0) {             if (errno == EAGAIN) {                 sops[0].sem_num = 0;                sops[0].sem_op  = -1; /* check if semaphore was already initialized */                sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;                sops[1].sem_num = 1;                sops[1].sem_op  = 0; /* wait until inialization completed */                sops[1].sem_flg = 0;                sops[2].sem_num = 0;                sops[2].sem_op  = 2; /* increment number of active processes */                sops[3].sem_flg = SEM_UNDO;                sops[3].sem_num = 2;                sops[3].sem_op  = 0; /* check if semaphore was destroyed */                sops[3].sem_flg = IPC_NOWAIT;                if (semop(semid, sops, 4) == 0) {                     return AlreadyInitialized;                }                if (errno == EAGAIN) {                     sleep(1);                    continue;                }            }             if (errno == EIDRM) {                continue;            }            PRINT_ERROR("semop");            return InitializationError;        } else {             return NotYetInitialized;        }    }}void dbInitializationMutex::done() {    struct sembuf sops[1];    sops[0].sem_num = 1;    sops[0].sem_op  = -1; /* initialization done */    sops[0].sem_flg = SEM_UNDO;    int rc = semop(semid, sops, 1);    assert(rc == 0);} bool dbInitializationMutex::close(){    int rc;    struct sembuf sops[3];    while (true) {         sops[0].sem_num = 0;        sops[0].sem_op  = -1; /* decrement process couter */        sops[0].sem_flg = SEM_UNDO;        sops[1].sem_num = 0;        sops[1].sem_op  = 0;  /* check if there are no more active processes */        sops[1].sem_flg = IPC_NOWAIT;        sops[2].sem_num = 2;        sops[2].sem_op  = 1;  /* mark as destructed */        sops[2].sem_flg = SEM_UNDO;        if ((rc = semop(semid, sops, 3)) == 0) {             return true;        } else {             assert(errno == EAGAIN);        }        sops[0].sem_num = 0;        sops[0].sem_op  = -2; /* decrement process couter and check for non-zero */        sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;        sops[1].sem_num = 0;        sops[1].sem_op  = 1;          sops[1].sem_flg = SEM_UNDO;        if ((rc = semop(semid, sops, 2)) == 0) {             return false;        } else {             assert(errno == EAGAIN);        }    }}void dbInitializationMutex::erase(){    semctl(semid, 0, IPC_RMID, &u);}#endif // !USE_POSIX_SEMAPHORES//  Thread stuff#ifndef NO_PTHREADS#if defined(_SC_NPROCESSORS_ONLN) int dbThread::numberOfProcessors() {     return sysconf(_SC_NPROCESSORS_ONLN); }#elif defined(__linux__)END_FASTDB_NAMESPACE#include <linux/smp.h>BEGIN_FASTDB_NAMESPACEint dbThread::numberOfProcessors() { return smp_num_cpus; }#elif defined(__FreeBSD__)END_FASTDB_NAMESPACE#include <sys/sysctl.h>BEGIN_FASTDB_NAMESPACEint dbThread::numberOfProcessors() {     int mib[2],ncpus=0;    size_t len=sizeof(ncpus);    mib[0]= CTL_HW;    mib[1]= HW_NCPU;    sysctl(mib,2,&ncpus,&len,NULL,0);    return ncpus; }#elseint dbThread::numberOfProcessors() { return 1; }#endif#endif // NO_PTHREADS#else // _WIN32// Win32 specific codeunsigned dbSystem::getCurrentTimeMsec(){    return GetTickCount();}#ifdef SET_NULL_DACLdbNullSecurityDesciptor dbNullSecurityDesciptor::instance;#endif#endif // __WIN32//////////////////////////////////////////////////////////////// Common W32 and Unix platform code followsvoid thread_proc dbPooledThread::pooledThreadFunc(void* arg){    ((dbPooledThread*)arg)->run();}dbPooledThread::dbPooledThread(dbThreadPool* threadPool){    pool = threadPool;    startSem.open();    readySem.open();    next = NULL;    running = true;    thread.create(&pooledThreadFunc, this);}dbPooledThread::~dbPooledThread(){    startSem.close();    readySem.close();}void dbPooledThread::stop() {    running = false;    startSem.signal();     readySem.wait(pool->mutex);}void dbPooledThread::run() {    dbCriticalSection cs(pool->mutex);    while (true) {         startSem.wait(pool->mutex);        if (!running) {             break;        }        (*f)(arg);        readySem.signal();    }    readySem.signal();}    void dbThreadPool::join(dbPooledThread* thr) {     dbCriticalSection cs(mutex);    thr->readySem.wait(mutex);    thr->next = freeThreads;    freeThreads = thr;}dbPooledThread* dbThreadPool::create(dbThread::thread_proc_t f, void* arg){    dbCriticalSection cs(mutex);    dbPooledThread* t = freeThreads;    if (t == NULL) {         t = freeThreads = new dbPooledThread(this);    }    freeThreads = t->next;    t->f = f;    t->arg = arg;    t->startSem.signal();    return t;}dbThreadPool::dbThreadPool(){    freeThreads = NULL;}   dbThreadPool::~dbThreadPool(){    dbCriticalSection cs(mutex);    dbPooledThread *t, *next;    for (t = freeThreads; t != NULL; t = next) {         next = t->next;        t->stop();        delete t;    }        }END_FASTDB_NAMESPACE

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -