⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sync.cpp

📁 最新版本!fastdb是高效的内存数据库系统
💻 CPP
📖 第 1 页 / 共 2 页
字号:
#if defined(USE_INTERNAL_CS_IMPL)

////////////////////////////////////////////////////////////////////
// dbGLobalCriticalSection internal implementation

#if defined(__GNUC__) && (defined(__x86_64__) || defined(__i386__)) && !defined(RECOVERABLE_CRITICAL_SECTION)

void dbGlobalCriticalSection::enter()
{
    int inc = -1;
    __asm__ __volatile__(
                        "lock; xadd %0,%1"
                        :"=d" (inc), "=m" (*count)
                        :"d" (inc), "m" (*count));
    if (inc != 1) { 
        static struct sembuf sops[] = {{0, -1, 0}};
        int rc;
        while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);
        assert(rc == 0);
    }                                  
#if GLOBAL_CS_DEBUG
    owner = pthread_self();
#endif
}

void dbGlobalCriticalSection::leave()
{
    int inc = 1;
#if GLOBAL_CS_DEBUG
    owner = 0;
#endif
    __asm__ __volatile__(
                        "lock; xadd %0,%1"
                        :"=d" (inc), "=m" (*count)
                        :"d" (inc), "m" (*count));
    if (inc != 0) { 
        /* some other processes waiting to enter critical section */
        static struct sembuf sops[] = {{0, 1, 0}};
        int rc = semop(semid, sops, 1);
        assert(rc == 0);
    }
}

bool dbGlobalCriticalSection::create(char const* name, sharedsem_t* count)
{
    this->count = count;
    *count = 1;
    return sem_init(semid, name, 0) == 0;
}

bool dbGlobalCriticalSection::open(char const* name, sharedsem_t* count)
{
    this->count = count;
    return sem_init(semid, name, 0) == 0;
}

#elif !defined(RECOVERABLE_CRITICAL_SECTION) && defined(__GNUC__) && defined (__GNUC_MINOR__) && ((4 < __GNUC__) || (4 == __GNUC__ && 1 <= __GNUC_MINOR__))

void dbGlobalCriticalSection::enter()
{
    if (__sync_add_and_fetch(count, 1) != 1) { 
        static struct sembuf sops[] = {{0, -1, 0}};
        int rc;
        while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);
        assert(rc == 0);
    }                                  
#if GLOBAL_CS_DEBUG
    owner = pthread_self();
#endif
}

void dbGlobalCriticalSection::leave()
{
#if GLOBAL_CS_DEBUG
    owner = 0;
#endif
    if (__sync_add_and_fetch(count, -1) != 0) { 
        /* some other processes waiting to enter critical section */
        static struct sembuf sops[] = {{0, 1, 0}};
        int rc = semop(semid, sops, 1);
        assert(rc == 0);
    }
}

bool dbGlobalCriticalSection::create(char const* name, sharedsem_t* count)
{
    this->count = count;
    *count = 0;
    return sem_init(semid, name, 0) == 0;
}

bool dbGlobalCriticalSection::open(char const* name, sharedsem_t* count)
{
    this->count = count;
    return sem_init(semid, name, 0) == 0;
}

#else // defined(__GNUC__) && defined(i386)
// "lowest" case, use a SysV semaphore for complete portability
void dbGlobalCriticalSection::enter()
{
    static struct sembuf sops[] = {{0, -1, SEM_UNDO}};
    int rc;
    while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);
    assert(rc == 0);
#if GLOBAL_CS_DEBUG
    owner = pthread_self();
#endif
}

void dbGlobalCriticalSection::leave()
{
#if GLOBAL_CS_DEBUG
    owner = 0;
#endif
    static struct sembuf sops[] = {{0, 1, SEM_UNDO}};
    int rc = semop(semid, sops, 1);
    assert(rc == 0);
}

bool dbGlobalCriticalSection::open(char const* name, sharedsem_t*)
{
// XXX: sem_init is Posix, the rest of these calls are SysV.
    return sem_init(semid, name, 1) == 0;
}

bool dbGlobalCriticalSection::create(char const* name, sharedsem_t*)
{
    return sem_init(semid, name, 1) == 0;
}

#endif // defined(__GNUC__) && defined(i386)

void dbGlobalCriticalSection::erase()
{
    semctl(semid, 0, IPC_RMID, &u);
}

#endif // USE_INTERNAL_CS_IMPL

dbInitializationMutex::initializationStatus 
dbInitializationMutex::initialize(char const* name)
{
    struct sembuf sops[4];
    char* path = (char*)name;
    if (strchr(name, '/') == NULL) { 
        path = new char[strlen(name)+strlen(keyFileDir)+1];
        sprintf(path, "%s%s", keyFileDir, name);
    }
    int fd = open(path, O_WRONLY|O_CREAT, 0777);
    if (fd < 0) {
        if (path != name) { 
            delete[] path;
        }
        PRINT_ERROR("open");
        return InitializationError;
    }
    ::close(fd);
    int key = getKeyFromFile(path);
    if (path != name) { 
        delete[] path;
    }
    if (key < 0) {
        PRINT_ERROR("getKeyFromFile");
        return InitializationError;
    }
    while (true) { 
        semid = semget(key, 3, IPC_CREAT|0777);
        if (semid < 0) { 
            PRINT_ERROR("semget");
            return InitializationError;
        }
        // Semaphore 0 - number of active processes
        // Semaphore 1 - intialization in progress (1 while initialization, 0 after it)
        // Semaphore 2 - semaphore was destroyed
        
        sops[0].sem_num = 0;
        sops[0].sem_op  = 0; /* check if semaphore was already initialized */
        sops[0].sem_flg = IPC_NOWAIT;
        sops[1].sem_num = 0;
        sops[1].sem_op  = 1; /* increment number of active processes */
        sops[1].sem_flg = SEM_UNDO;
        sops[2].sem_num = 1;
        sops[2].sem_op  = 1; /* initialization in process */
        sops[2].sem_flg = SEM_UNDO;
        sops[3].sem_num = 2;
        sops[3].sem_op  = 0; /* check if semaphore was destroyed */
        sops[3].sem_flg = IPC_NOWAIT;
        if (semop(semid, sops, 4) < 0) { 
            if (errno == EAGAIN) { 
                sops[0].sem_num = 0;
                sops[0].sem_op  = -1; /* check if semaphore was already initialized */
                sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;
                sops[1].sem_num = 1;
                sops[1].sem_op  = 0; /* wait until inialization completed */
                sops[1].sem_flg = 0;
                sops[2].sem_num = 0;
                sops[2].sem_op  = 2; /* increment number of active processes */
                sops[2].sem_flg = SEM_UNDO;
                sops[3].sem_num = 2;
                sops[3].sem_op  = 0; /* check if semaphore was destroyed */
                sops[3].sem_flg = IPC_NOWAIT;
                if (semop(semid, sops, 4) == 0) { 
                    return AlreadyInitialized;
                }
                if (errno == EAGAIN) { 
                    sleep(1);
                    continue;
                }
            } 
            if (errno == EIDRM) {
                continue;
            }
            PRINT_ERROR("semop");
            return InitializationError;
        } else { 
            return NotYetInitialized;
        }
    }
}

void dbInitializationMutex::done() 
{
    struct sembuf sops[1];
    sops[0].sem_num = 1;
    sops[0].sem_op  = -1; /* initialization done */
    sops[0].sem_flg = SEM_UNDO;
    int rc = semop(semid, sops, 1);
    assert(rc == 0);
} 

bool dbInitializationMutex::close()
{
    int rc;
    struct sembuf sops[3];
    while (true) { 
        sops[0].sem_num = 0;
        sops[0].sem_op  = -1; /* decrement process couter */
        sops[0].sem_flg = SEM_UNDO;
        sops[1].sem_num = 0;
        sops[1].sem_op  = 0;  /* check if there are no more active processes */
        sops[1].sem_flg = IPC_NOWAIT;
        sops[2].sem_num = 2;
        sops[2].sem_op  = 1;  /* mark as destructed */
        sops[2].sem_flg = SEM_UNDO;
        if ((rc = semop(semid, sops, 3)) == 0) { 
            return true;
        } else { 
            assert(errno == EAGAIN);
        }
        sops[0].sem_num = 0;
        sops[0].sem_op  = -2; /* decrement process couter and check for non-zero */
        sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;
        sops[1].sem_num = 0;
        sops[1].sem_op  = 1;  
        sops[1].sem_flg = SEM_UNDO;
        if ((rc = semop(semid, sops, 2)) == 0) { 
            return false;
        } else { 
            assert(errno == EAGAIN);
        }
    }
}

void dbInitializationMutex::erase()
{
    semctl(semid, 0, IPC_RMID, &u);
}
#endif // !USE_POSIX_SEMAPHORES



//  Thread stuff
#ifndef NO_PTHREADS

#if defined(_SC_NPROCESSORS_ONLN) 
int dbThread::numberOfProcessors() { 
    return sysconf(_SC_NPROCESSORS_ONLN); 
}
#elif defined(__linux__)
END_FASTDB_NAMESPACE
#include <linux/smp.h>
BEGIN_FASTDB_NAMESPACE
int dbThread::numberOfProcessors() { return smp_num_cpus; }
#elif defined(__FreeBSD__)
END_FASTDB_NAMESPACE
#include <sys/sysctl.h>
BEGIN_FASTDB_NAMESPACE
int dbThread::numberOfProcessors() { 
    int mib[2],ncpus=0;
    size_t len=sizeof(ncpus);
    mib[0]= CTL_HW;
    mib[1]= HW_NCPU;
    sysctl(mib,2,&ncpus,&len,NULL,0);
    return ncpus; 
}
#else
int dbThread::numberOfProcessors() { return 1; }
#endif
#endif // NO_PTHREADS


#else // _WIN32

// Win32 specific code
unsigned dbSystem::getCurrentTimeMsec()
{
    return GetTickCount();
}

#ifdef SET_NULL_DACL
dbNullSecurityDesciptor dbNullSecurityDesciptor::instance;
#endif

#endif // __WIN32

//////////////////////////////////////////////////////////////
// Common W32 and Unix platform code follows

void thread_proc dbPooledThread::pooledThreadFunc(void* arg)
{
    ((dbPooledThread*)arg)->run();
}

dbPooledThread::dbPooledThread(dbThreadPool* threadPool)
{
    pool = threadPool;
    startSem.open();
    readySem.open();
    next = NULL;
    running = true;
    thread.create(&pooledThreadFunc, this);
}

dbPooledThread::~dbPooledThread()
{
    startSem.close();
    readySem.close();
}

void dbPooledThread::stop() 
{
    running = false;
    startSem.signal(); 
    readySem.wait(pool->mutex);
}

void dbPooledThread::run() 
{
    dbCriticalSection cs(pool->mutex);
    while (true) { 
        startSem.wait(pool->mutex);
        if (!running) { 
            break;
        }
        (*f)(arg);
        readySem.signal();
    }
    readySem.signal();
}
    
void dbThreadPool::join(dbPooledThread* thr) 
{ 
    dbCriticalSection cs(mutex);
    thr->readySem.wait(mutex);
    thr->next = freeThreads;
    freeThreads = thr;
}


dbPooledThread* dbThreadPool::create(dbThread::thread_proc_t f, void* arg)
{
    dbCriticalSection cs(mutex);
    dbPooledThread* t = freeThreads;
    if (t == NULL) { 
        t = freeThreads = new dbPooledThread(this);
    }
    freeThreads = t->next;
    t->f = f;
    t->arg = arg;
    t->startSem.signal();
    return t;
}


dbThreadPool::dbThreadPool()
{
    freeThreads = NULL;
}
   
dbThreadPool::~dbThreadPool()
{
    dbCriticalSection cs(mutex);
    dbPooledThread *t, *next;
    for (t = freeThreads; t != NULL; t = next) { 
        next = t->next;
        t->stop();
        delete t;
    }        
}

END_FASTDB_NAMESPACE

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -