sync.h

来自「一个功能强大的内存数据库源代码,c++编写,有详细的注释」· C头文件 代码 · 共 1,164 行 · 第 1/2 页

H
1,164
字号
	curr.pid = getpid();	curr.tid = pthread_self();	return curr;    }};#elseclass dbMutex {    public:    void lock() {}    void unlock() {}};class dbThread {   public:    typedef void (thread_proc* thread_proc_t)(void*);    void create(thread_proc_t f, void* arg) { f(arg); }    void join() {}    void detach() {}    static int numberOfProcessors() { return 1; }};class dbLocalSemaphore {     int count;  public:    void wait(dbMutex&) { 	assert (count > 0);	count -= 1;    }    void signal(unsigned inc = 1) {	count += inc;    }    void open(unsigned initValue = 0) {	count = initValue;    }    void close() {}};class dbLocalEvent {     bool signaled;  public:    void wait(dbMutex&) { 	assert(signaled);    }    void signal() {	signaled = true;    }    void reset() {	signaled = false;    }    void open(bool initValue = false) {	signaled = initValue;    }    void close() {}};template<class T>class dbThreadContext {     T* value;  public:    T* get() { 	return value;    }    void set(T* value) { 	this->value = value;    }    dbThreadContext() { value = NULL; }};class dbProcessId {     int       pid;  public:    bool operator != (dbProcessId const& other) const { 	return pid != other.pid;    }        void clear() { 	pid = 0;    }    static dbProcessId getCurrent() {	dbProcessId curr;	curr.pid = getpid();	return curr;    }};#endif#define INFINITE (~0U)#ifdef USE_POSIX_APIclass dbInitializationMutex {     sem_t* sem;  public:     enum initializationStatus { 	InitializationError, 	AlreadyInitialized,	NotYetInitialized    };    initializationStatus initialize(char const* name) { 	initializationStatus status;	char* tmp = NULL;	if (*name != '/') { 	    tmp = new char[strlen(name)+2];	    strcpy(tmp+1, name);	    *tmp = '/';	    name = tmp;	}	while (true) {	    sem = sem_open(name, 0);	    if (sem == NULL) { 		if (errno == ENOENT) {		    sem = sem_open(name, O_CREAT|O_EXCL, 0777, 0);		    if (sem != NULL) { 			status = NotYetInitialized;			break;		    } else if (errno != EEXIST) { 			status = InitializationError;			break;		    }		} else { 		    status = InitializationError;		    break;		}	    } else { 		status = (sem_wait(sem) == 0 && sem_post(sem) == 0) 		    ? AlreadyInitialized : InitializationError;		break;	    }	}	delete[] tmp;	return status;    }    void done() { 	sem_post(sem);    }    bool close() {	sem_close(sem);	return false;    }    void erase() { 	close();    }};class dbSemaphore {   protected:    sem_t* sem;  public:    void wait() { 	int rc = sem_wait(sem);	assert(rc == 0);    }    bool wait(unsigned msec) { #ifdef POSIX_1003_1d	struct timespec abs_ts;	struct timeval  cur_tv;	clock_gettime(CLOCK_REALTIME, &cur_tv);	abs_ts.tv_sec = cur_tv.tv_sec + (msec + tv.tv_usec / 1000) / 1000000; 	abs_ts.tv_nsec = (msec + tv.tv_usec / 1000) % 1000000 * 1000;	int rc = sem_timedwait(sem, &abs_ts);	if (rc < 0) { 	    assert(errno == ETIMEDOUT);	    return false;	}	return true;#else 	int rc = sem_wait(sem);	assert(rc == 0);	return true;#endif	    }    void signal(unsigned inc = 1) {	while (--inc > 0) { 	    sem_post(sem);	}    }    void reset() { 	while (sem_trywait(sem) == 0);    }        bool open(char const* name, unsigned initValue = 0) {	char* tmp = NULL;	if (*name != '/') { 	    tmp = new char[strlen(name)+2];	    strcpy(tmp+1, name);	    *tmp = '/';	    name = tmp;	}	sem = sem_open(name, O_CREAT, 0777, initValue);	delete[] tmp;	return sem != NULL;     }    void close() {	sem_close(sem);    }    void erase() { 	close();    }};class dbEvent : public dbSemaphore {   public:    void wait() { 	dbSemaphore::wait();	sem_post(sem);    }    bool wait(unsigned msec) { 	if (dbSemaphore::wait(msec)) { 	    sem_post(sem);	    return true;	}	return false;    }    void signal() {	while (sem_trywait(sem) == 0);	sem_post(sem);    }    void reset() {	while (sem_trywait(sem) == 0);    }    bool open(char const* name, bool signaled = false) {	return dbSemaphore::open(name, (int)signaled);    }};template<class T>class dbSharedObject {     char* name;    T*  ptr;    int fd;  public:    dbSharedObject() { 	name = NULL;	ptr = NULL;	fd = -1;    }    bool open(char* fileName) { 	delete[] name;	name = new char[strlen(fileName) + 1];	strcpy(name, fileName);	fd = ::open(fileName, O_RDWR|O_CREAT, 0777);	if (fd < 0) { 	    return false;	}	ptr = (T*)mmap(NULL,		       DOALIGN(sizeof(T), 4096),		       PROT_READ|PROT_WRITE,		       MAP_SHARED,		       fd,		       0);	if (ptr == MAP_FAILED) { 	    ptr = NULL;	    ::close(fd);	    return false;	}	return true;    }    T* get() { return ptr; }    void close() { 	if (ptr != NULL) { 	    munmap(ptr, DOALIGN(sizeof(T), 4096));	}	if (fd > 0) { 	    ::close(fd);	}    }    void erase() {	close();	unlink(name);	    }      ~dbSharedObject() { 	delete[] name;    }};#else // USE_POSIX_APIextern char const* keyFileDir; // default value: "/tmp/" class dbInitializationMutex {     int semid;  public:     enum initializationStatus { 	InitializationError, 	AlreadyInitialized,	NotYetInitialized    };    initializationStatus initialize(char const* name);    void done();     bool close();    void erase();};class dbSemaphore {     int s;  public:    bool wait(unsigned msec = INFINITE);    void signal(unsigned inc = 1);    bool open(char const* name, unsigned initValue = 0);    void reset();    void close();    void erase();};class dbEvent {     int e;  public:    bool wait(unsigned msec = INFINITE);    void signal();    void reset();    bool open(char const* name, bool signaled = false);    void close();    void erase();};class dbSharedMemory {   protected:    char*  ptr;    int    shm;  public:    bool  open(char const* name, size_t size);     void  close();    void  erase();     char* get_base() { 	return ptr;    }};template<class T>class dbSharedObject : public dbSharedMemory {   public:    bool open(char* name) { 	return dbSharedMemory::open(name, sizeof(T));    }    T* get() { return (T*)ptr; }};#endif#if defined(__QNX__)typedef pthread_mutext_t sharedsem_t;class dbGlobalCriticalSection {     pthread_mutexattr_t attr;    sharedsem_t* sem;  public:    void enter() {	int rc = pthread_mutex_lock(sem);	assert(rc == 0);    }    void leave() { 	int rc = pthread_mutex_unlock(sem);	assert(rc == 0);    }    bool open(char const*, sharedsem_t* shr) { 	sem = shr;	return true;    }    bool create(char const*, sharedsem_t* shr) { 	sem = shr;	pthread_mutexattr_init(&attr);	pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);	pthread_mutexattr_setrecursive(&attr, PTHREAD_RECURSIVE_ENABLE);	pthread_mutex_init(sem, &attr);	return true;    }    void close() {}    void erase() {	pthread_mutex_destroy(sem);    }};#elif defined(__osf__)#include <errno.h>typedef msemaphore sharedsem_t;class dbGlobalCriticalSection {     sharedsem_t* sem;  public:    void enter() { 	int rc;	while ((rc = msem_lock(sem, 0)) < 0 && errno == EINTR);	assert(rc == 0);    }    void leave() { 	int rc = msem_unlock(sem, 0);	assert(rc == 0);	    }    bool open(char const*, sharedsem_t* shr) { 	sem = shr;	return true;    }    bool create(char const*, sharedsem_t* shr) { 	sem = shr;	msem_init(shr, MSEM_UNLOCKED);	return true;    }    void close() {}    void erase() {	msem_remove(sem);    }};	#elif defined(__sun)#include <synch.h>#include <errno.h>typedef sema_t sharedsem_t;class dbGlobalCriticalSection {     sharedsem_t* sem;  public:    void enter() { 	int rc;	while ((rc = sema_wait(sem)) < 0 && errno == EINTR);	assert(rc == 0);    }    void leave() { 	int rc = sema_post(sem);	assert(rc == 0);    }    bool open(char const*, sharedsem_t* shr) { 	sem = shr;	return true;    }    bool create(char const*, sharedsem_t* shr) { 	sem = shr;	return sema_init(shr, 1, USYNC_PROCESS, NULL) == 0;    }    void close() {}    void erase() {	sema_destroy(sem);    }};#elif defined(USE_POSIX_API)typedef sem_t sharedsem_t;class dbGlobalCriticalSection {     sharedsem_t* sem;  public:    void enter() { 	int rc = sem_wait(sem);	assert(rc == 0);    }    void leave() { 	int rc = sem_post(sem);	assert(rc == 0);    }    bool open(char const* name, sharedsem_t* shr) { 	sem = shr;	return true;    }    bool create(char const* name, sharedsem_t* shr) { 		sem = shr;	return sem_init(sem, 1, 1) == 0;    }    void close() {}    void erase() { 	sem_destroy(sem);    }};#elsetypedef long sharedsem_t;class dbGlobalCriticalSection {     int          semid;    sharedsem_t* count;  public:    void enter();     void leave();    bool open(char const* name, sharedsem_t* shr);    bool create(char const* name, sharedsem_t* shr);    void close() {}    void erase();};#endif#endifclass FASTDB_DLL_ENTRY dbCriticalSection {   private:    dbMutex& mutex;  public:    dbCriticalSection(dbMutex& guard) : mutex(guard) {	mutex.lock();    }    ~dbCriticalSection() { 	mutex.unlock();    }};	#define SMALL_BUF_SIZE 1024class FASTDB_DLL_ENTRY dbSmallBuffer {   protected:    char* buf;    char  smallBuf[SMALL_BUF_SIZE];  public:    dbSmallBuffer(size_t size) { 	if (size > SMALL_BUF_SIZE) { 	    buf = new char[size];	} else { 	    buf = smallBuf;	}    }    operator char*() { return buf; }    char* base() { return buf; }    ~dbSmallBuffer() { 	if (buf != smallBuf) { 	    delete[] buf;	}    }};class FASTDB_DLL_ENTRY dbPooledThread {   private:    friend class dbThreadPool;    dbThread                thread;    dbThreadPool*           pool;    dbPooledThread*         next;    dbThread::thread_proc_t f;    void*                   arg;    bool                    running;    dbLocalSemaphore        startSem;    dbLocalSemaphore        readySem;        static void thread_proc  pooledThreadFunc(void* arg);    void run();    void stop();    dbPooledThread(dbThreadPool* threadPool);     ~dbPooledThread(); };class FASTDB_DLL_ENTRY dbThreadPool {     friend class dbPooledThread;    dbPooledThread* freeThreads;    dbMutex         mutex;  public:    dbPooledThread* create(dbThread::thread_proc_t f, void* arg);    void join(dbPooledThread* thr);    dbThreadPool();    ~dbThreadPool();};        #endif

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?