⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sync_unix.h

📁 最新版本!fastdb是高效的内存数据库系统
💻 H
📖 第 1 页 / 共 2 页
字号:
//-< SYNC_UNIX.H >---------------------------------------------------*--------*
// FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *
// (Main Memory Database Management System)                          *   /\|  *
//                                                                   *  /  \  *
//                          Created:     20-Nov-98    K.A. Knizhnik  * / [] \ *
//                          Last update: 20-Dec-98    K.A. Knizhnik  * GARRET *
//-------------------------------------------------------------------*--------*
// Intertask synchonization primitives for Unix platforms
//-------------------------------------------------------------------*--------*

#ifndef __SYNC_UNIX_H__
#define __SYNC_UNIX_H__

// Standard includes for all Unix platforms
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#ifdef VXWORKS
#include "fastdbShim.h"
#else
#include <sys/time.h>
#endif // VXWORKS
#include <sys/types.h>
#include <assert.h>
#include <errno.h>

#if !defined(USE_POSIX_SEMAPHORES) || !defined(USE_POSIX_MMAP) || !USE_POSIX_MMAP
#ifndef VXWORKS
#include <sys/ipc.h> 
#endif // ndef VXWORKS
extern char const* keyFileDir; // default value: "/tmp/" 
#endif

#if defined(USE_POSIX_SEMAPHORES)
#include <semaphore.h>  // For POSIX style semaphores
#else
#include <sys/sem.h>    // For SysV style semaphores
#endif

#if defined(USE_POSIX_MMAP) && USE_POSIX_MMAP
#include <sys/mman.h>   // For mmap()
#else
#ifndef VXWORKS
#include <sys/shm.h>    
#endif // ndef VXWORKS
#include <sys/mman.h>
#endif

BEGIN_FASTDB_NAMESPACE

#define thread_proc

//////////////////////////////////////////////////////////
// If this system uses pthread based threads, then define
//   dbMutex(), dbThread(), dbLocalEvent(), etc as pthread-based implemenations

#ifndef NO_PTHREADS

// Use pthread based implementation
#include <pthread.h>

class dbMutex { 
    friend class dbLocalEvent;
    friend class dbLocalSemaphore;
    pthread_mutex_t cs;
    bool            initialized;
  public:
    dbMutex() {
#ifdef NDEBUG
        pthread_mutex_init(&cs, NULL);
#else
#ifdef VXWORKS
	memset(&cs, '\0', sizeof(cs));
#endif // VXWORKS
        int rc = pthread_mutex_init(&cs, NULL);
        assert(rc == 0);
#endif
        initialized = true;
    }
    ~dbMutex() {
#ifdef NDEBUG
        pthread_mutex_destroy(&cs);
#else
        int rc = pthread_mutex_destroy(&cs);
        assert(rc == 0);
#endif
        initialized = false;
    }
    bool isInitialized() { 
        return initialized;
    }
    void lock() {
        if (initialized) { 
#ifdef NDEBUG
            pthread_mutex_lock(&cs);
#else
            int rc = pthread_mutex_lock(&cs);
            assert(rc == 0);
#endif
        }
    }
    void unlock() {
        if (initialized) { 
#ifdef NDEBUG
            pthread_mutex_unlock(&cs);
#else
            int rc = pthread_mutex_unlock(&cs);
            assert(rc == 0);
#endif
        }
    }
};


const size_t dbThreadStackSize = 1024*1024;

class dbThread { 
    pthread_t thread;
  public:
    typedef void (thread_proc* thread_proc_t)(void*);
    
    static void sleep(time_t sec) { 
        ::sleep(sec);
    }

    void create(thread_proc_t f, void* arg) {
        pthread_attr_t attr;
        pthread_attr_init(&attr);
#if !defined(__linux__)
        pthread_attr_setstacksize(&attr, dbThreadStackSize);
#endif
#if defined(_AIX41)
        // At AIX 4.1, 4.2 threads are by default created detached
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_UNDETACHED);
#endif
        pthread_create(&thread, &attr, (void*(*)(void*))f, arg);
        pthread_attr_destroy(&attr);
    }

    void join() { 
        void* result;
        pthread_join(thread, &result);
    }
    void detach() { 
        pthread_detach(thread);
    }

    enum ThreadPriority { 
        THR_PRI_LOW, 
        THR_PRI_HIGH
    };
    void setPriority(ThreadPriority pri) { 
#if defined(PRI_OTHER_MIN) && defined(PRI_OTHER_MAX)
        struct sched_param sp;
        sp.sched_priority = pri == THR_PRI_LOW ? PRI_OTHER_MIN : PRI_OTHER_MAX;
        pthread_setschedparam(thread, SCHED_OTHER, &sp); 
#endif
    }

    static int numberOfProcessors();
};


class dbLocalEvent { 
    pthread_cond_t   cond;
    int              signaled;
  public:
    void wait(dbMutex& mutex) { 
        while (!signaled) { 
            pthread_cond_wait(&cond, &mutex.cs);
        }
    }
    bool wait(dbMutex& mutex, time_t timeout) {
        if (!signaled) { 
            struct timespec abs_ts; 
#ifdef PTHREAD_GET_EXPIRATION_NP
            struct timespec rel_ts; 
            rel_ts.tv_sec = timeout/1000; 
            rel_ts.tv_nsec = timeout%1000*1000000;
            pthread_get_expiration_np(&rel_ts, &abs_ts);
#else
            struct timeval cur_tv;
            gettimeofday(&cur_tv, NULL);
            abs_ts.tv_sec = cur_tv.tv_sec + timeout/1000; 
            abs_ts.tv_nsec = cur_tv.tv_usec*1000 + timeout%1000*1000000;
            if (abs_ts.tv_nsec > 1000000000) { 
                abs_ts.tv_nsec -= 1000000000;
                abs_ts.tv_sec += 1;
            }
#endif
            do { 
                int rc = pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts);
                if (rc != 0) {
                    return false;
                }
            } while (!signaled);

        }
        return true;
    }
    void signal() {
        signaled = true;
        pthread_cond_broadcast(&cond);
    }
    void reset() {
        signaled = false;
    }
    void open(bool initValue = false) { 
        signaled = initValue;
        pthread_cond_init(&cond, NULL);
    }
    void close() {
        pthread_cond_destroy(&cond);
    }
};

class dbLocalSemaphore { 
    pthread_cond_t   cond;
    int              count;
  public:
    void wait(dbMutex& mutex) { 
        while (count == 0) { 
            pthread_cond_wait(&cond, &mutex.cs);
        }
        count -= 1;
    }
    bool wait(dbMutex& mutex, time_t timeout) {
        if (count == 0) { 
            struct timespec abs_ts; 
#ifdef PTHREAD_GET_EXPIRATION_NP
            struct timespec rel_ts; 
            rel_ts.tv_sec = timeout/1000; 
            rel_ts.tv_nsec = timeout%1000*1000000;
            pthread_get_expiration_np(&rel_ts, &abs_ts);
#else
            struct timeval cur_tv;
            gettimeofday(&cur_tv, NULL);
            abs_ts.tv_sec = cur_tv.tv_sec + timeout/1000; 
            abs_ts.tv_nsec = cur_tv.tv_usec*1000 + timeout%1000*1000000;
            if (abs_ts.tv_nsec > 1000000000) { 
                abs_ts.tv_nsec -= 1000000000;
                abs_ts.tv_sec += 1;
            }
#endif
            do { 
                int rc = pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts);
                if (rc != 0) { 
                    return false;
                }
            } while (count == 0);
        }
        count -= 1;
        return true;
    }
    void signal(unsigned inc = 1) {
        count += inc;
        if (inc > 1) { 
            pthread_cond_broadcast(&cond);
        } else if (inc == 1) { 
            pthread_cond_signal(&cond);
        }
    }
    void open(unsigned initValue = 0) { 
        pthread_cond_init(&cond, NULL);
        count = initValue;
    }
    void close() {
        pthread_cond_destroy(&cond);
    }
};

template<class T> 
class dbThreadContext { 
    pthread_key_t key;
  public:
    T* get() { 
        T* p_val = (T*)pthread_getspecific(key);
#ifdef VXWORKS
	// HACK - on VXWORKS, setting to NULL releases the key for _all_ threads.
	// when windriver defect number WIND00119383 is fixed - this hack may be
	// removed.
	if (p_val == (T*)-1)
	    p_val = (T*)NULL;
#endif // VXWORKS
	return p_val;
    }
    void set(T* value) { 
#ifdef VXWORKS
	// HACK - on VXWORKS, setting to NULL releases the key for _all_ threads.
	// when windriver defect number WIND00119383 is fixed - this hack may be
	// removed.
	if (value == NULL)
	    value = (T*)-1;
#endif // VXWORKS
        pthread_setspecific(key, value);
    }
    dbThreadContext() { 
        pthread_key_create(&key, NULL);
    }
    ~dbThreadContext() { 
        pthread_key_delete(key);
    }
};

class dbProcessId { 
    int       pid;
    pthread_t tid;
  public:
    bool operator != (dbProcessId const& other) const { 
        return pid != other.pid || tid != other.tid;
    }

    void clear() { 
        pid = 0;
        tid = 0;
    }

    static dbProcessId getCurrent() {
        dbProcessId curr;
        curr.pid = getpid();
        curr.tid = pthread_self();
        return curr;
    }
};

#else // NO_PTHREAD

// Non pthread based threads, mutexes, etc.
// Maps to skeleton  functions, this implementation isn't using threads.

class dbMutex {
    bool initialized;

   public:
    dbMutex() {
        initialized = true;
    }

    ~dbMutex() { 
        initialized = false;
    }

    bool isInitialized() { 
        return initialized;
    }

    void lock() {}
    void unlock() {}
};

class dbThread { 
  public:
    typedef void (thread_proc* thread_proc_t)(void*);
    void create(thread_proc_t f, void* arg) { f(arg); }
    void join() {}
    void detach() {}
    enum ThreadPriority { 
        THR_PRI_LOW, 
        THR_PRI_HIGH
    };
    void setPriority(ThreadPriority pri) { }
    static int numberOfProcessors() { return 1; }
};

class dbLocalSemaphore { 
    int count;
  public:
    void wait(dbMutex&) { 
        assert (count > 0);
        count -= 1;
    }
    void signal(unsigned inc = 1) {
        count += inc;
    }
    void open(unsigned initValue = 0) {
        count = initValue;
    }
    void close() {}
};

class dbLocalEvent { 
    bool signaled;
  public:
    void wait(dbMutex&) { 
        assert(signaled);
    }
    bool wait(dbMutex& mutex, time_t timeout) {
        return true;
    }
    void signal() {
        signaled = true;
    }
    void reset() {
        signaled = false;
    }
    void open(bool initValue = false) {
        signaled = initValue;
    }
    void close() {}
};

template<class T>
class dbThreadContext { 
    T* value;
  public:
    T* get() { 
        return value;
    }
    void set(T* value) { 
        this->value = value;
    }
    dbThreadContext() { value = NULL; }
};


class dbProcessId { 
    int       pid;
  public:
    bool operator != (dbProcessId const& other) const { 
        return pid != other.pid;
    }
    
    void clear() { 
        pid = 0;
    }

    static dbProcessId getCurrent() {
        dbProcessId curr;
        curr.pid = getpid();
        return curr;
    }
};

#endif // NO_PTHREAD


#define INFINITE (~0U)


#ifdef USE_POSIX_SEMAPHORES

// Initialization Mutex using Posix based semaphores
class dbInitializationMutex { 
    sem_t* sem;
    char*  name;
  public: 
    enum initializationStatus { 
        InitializationError, 
        AlreadyInitialized,
        NotYetInitialized
    };
    initializationStatus initialize(char const* name) { 
        initializationStatus status;
        this->name = new char[strlen(name)+2];
        if (*name != '/') { 
            strcpy(this->name+1, name);
            *this->name = '/';
        } else { 
            strcpy(this->name, name);
        }
        while (true) {
            sem = sem_open(this->name, 0);
            if (sem == SEM_FAILED) { 
                if (errno == ENOENT) {
                    sem = sem_open(this->name, O_CREAT|O_EXCL, 0777, 0);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -