⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sync.h

📁 实现内存数据库的源代码
💻 H
📖 第 1 页 / 共 2 页
字号:
//-< SYNC.H >--------------------------------------------------------*--------*
// FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *
// (Main Memory Database Management System)                          *   /\|  *
//                                                                   *  /  \  *
//                          Created:     20-Nov-98    K.A. Knizhnik  * / [] \ *
//                          Last update: 20-Dec-98    K.A. Knizhnik  * GARRET *
//-------------------------------------------------------------------*--------*
// Intertask synchonization primitives
//-------------------------------------------------------------------*--------*

#ifndef __SYNC_H__
#define __SYNC_H__

class FASTDB_DLL_ENTRY dbSystem { 
  public:
    static unsigned getCurrentTimeMsec();
};

#ifdef _WIN32
class FASTDB_DLL_ENTRY dbMutex { 
    CRITICAL_SECTION cs;
  public:
    dbMutex() { 
	InitializeCriticalSection(&cs);
    }
    ~dbMutex() { 
	DeleteCriticalSection(&cs);
    }
    void lock() { 
	EnterCriticalSection(&cs);
    }
    void unlock() { 
	LeaveCriticalSection(&cs);
    }
};

#define thread_proc WINAPI

class FASTDB_DLL_ENTRY dbThread { 
    HANDLE h;
  public:
    void create(void (thread_proc* f)(void*), void* arg) { 
	DWORD threadid;
	h = CreateThread(NULL, NULL, LPTHREAD_START_ROUTINE(f), arg,
			 0, &threadid);
    }
    enum ThreadPriority { 
	THR_PRI_LOW, 
	THR_PRI_HIGH
    };

    void setPriority(ThreadPriority pri) { 
	SetThreadPriority(h, pri == THR_PRI_LOW ? THREAD_PRIORITY_IDLE : THREAD_PRIORITY_HIGHEST);
    }
	
    void join() { 
	WaitForSingleObject(h, INFINITE);
	CloseHandle(h);
	h = NULL;
    }
    void detach() { 
	if (h != NULL) { 
	    CloseHandle(h);
	    h = NULL;
	}
    }	
    dbThread() { 
	h = NULL; 
    }
    ~dbThread() { 
	if (h != NULL) { 
	    CloseHandle(h);
	}
    }
    static int numberOfProcessors() { 
#ifdef PHAR_LAP
	return 1;
#else
	SYSTEM_INFO sysinfo;
	GetSystemInfo(&sysinfo);
	return sysinfo.dwNumberOfProcessors;
#endif
    }
};
    
class FASTDB_DLL_ENTRY dbProcessId { 
    DWORD tid;
  public:
    bool operator != (dbProcessId const& other) const { 
	return tid != other.tid;
    }

    void clear() { 
	tid = 0;
    }

    static dbProcessId getCurrent() {
	dbProcessId curr;
	curr.tid = GetCurrentThreadId();
	return curr;
    }
};

class FASTDB_DLL_ENTRY dbInitializationMutex { 
    HANDLE m;
  public: 
    enum initializationStatus { 
	InitializationError, 
	AlreadyInitialized,
	NotYetInitialized
    };
    initializationStatus initialize(char const* name) { 
	initializationStatus status;
	m = CreateMutex(NULL, true, name);
	if (GetLastError() == ERROR_ALREADY_EXISTS) { 
	    status = WaitForSingleObject(m, INFINITE) == WAIT_OBJECT_0 
		   ? AlreadyInitialized : InitializationError;
	    ReleaseMutex(m);
	} else if (m != NULL) { 
	    status = NotYetInitialized;
	} else { 
	    status = InitializationError;
	}
	return status;
    }
    void done() { 
	ReleaseMutex(m);
    }
    bool close() {
	CloseHandle(m);
	return false;
    }
    void erase() { 
	close();
    }
};


const int dbMaxSemValue = 1000000;


class FASTDB_DLL_ENTRY dbSemaphore { 
  protected:
    HANDLE s;
  public:
    bool wait(unsigned msec = INFINITE) { 
	int rc = WaitForSingleObject(s, msec);
	assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT);
	return rc == WAIT_OBJECT_0;
    }
    void signal(unsigned inc = 1) {
	if (inc != 0) { 
	    ReleaseSemaphore(s, inc, NULL);
	}
    }
    void reset() { 
	while (WaitForSingleObject(s, 0) == WAIT_OBJECT_0);
    }    
    bool open(char const* name, unsigned initValue = 0) {
	s = CreateSemaphore(NULL, initValue, dbMaxSemValue, name);
	return s != NULL; 
    }
    void close() {
	CloseHandle(s);
    }
    void erase() { 
	close();
    }
};

class FASTDB_DLL_ENTRY dbEvent { 
  protected:
    HANDLE e;
  public:
    bool wait(unsigned msec = INFINITE) { 
	int rc = WaitForSingleObject(e, msec);
	assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT);
	return rc == WAIT_OBJECT_0;
    }
    void signal() {
	SetEvent(e);
    }
    void reset() {
	ResetEvent(e);
    }
    bool open(char const* name, bool signaled = false) {
	e = CreateEvent(NULL, true, signaled, name);
	return e != NULL; 
    }
    void close() {
	CloseHandle(e);
    }
    void erase() { 
	close();
    }
};

class FASTDB_DLL_ENTRY dbLocalSemaphore : public dbSemaphore { 
  public:
    void wait(dbMutex& mutex, time_t timeoutSec) { 
	mutex.unlock();
	int rc = WaitForSingleObject(s, timeoutSec*1000);
	assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT);
	mutex.lock();
    }
    void wait(dbMutex& mutex) { 
	mutex.unlock();
	int rc = WaitForSingleObject(s, INFINITE);
	assert(rc == WAIT_OBJECT_0);
	mutex.lock();
    }
    bool open(unsigned initValue = 0) {
	return dbSemaphore::open(NULL, initValue);
    }
};

class FASTDB_DLL_ENTRY dbLocalEvent : public dbEvent { 
  public:
    void wait(dbMutex& mutex, time_t timeoutSec) { 
	mutex.unlock();
	int rc = WaitForSingleObject(e, timeoutSec*1000);
	assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT);
	mutex.lock();
    }
    void wait(dbMutex& mutex) { 
	mutex.unlock();
	int rc = WaitForSingleObject(e, INFINITE);
	assert(rc == WAIT_OBJECT_0);
	mutex.lock();
    }
    bool open(bool signaled = false) {
	 return dbEvent::open(NULL, signaled);
     }
};

template<class T>
class dbThreadContext { 
    int index;
  public:
    T* get() { 
	return (T*)TlsGetValue(index);
    }
    void set(T* value) { 
	TlsSetValue(index, value);
    }
    dbThreadContext() { 
	index = TlsAlloc();
	assert(index != TLS_OUT_OF_INDEXES);
    }
    ~dbThreadContext() { 
	TlsFree(index);
    }
};

template<class T>
class dbSharedObject { 
    T*     ptr;
    HANDLE h;
  public:

    bool open(char* name) { 
#ifdef NO_MMAP
	ptr = new T();
#else
	h = CreateFileMapping(INVALID_HANDLE_VALUE,
			      NULL, PAGE_READWRITE, 0, 
			      sizeof(T), name);
	if (h == NULL) { 
	    return false;
	}
	ptr = (T*)MapViewOfFile(h, FILE_MAP_ALL_ACCESS, 0, 0, 0);
	if (ptr == NULL) { 
	    CloseHandle(h);
	    return false;
	}
#endif
	return true;
    }

    T* get() { return ptr; }

    void close() { 
#ifdef NO_MMAP
	delete[] ptr;
#else
	UnmapViewOfFile(ptr);
	CloseHandle(h);
#endif
    }
    void erase() { 
	close();
    }
};

typedef long sharedsem_t;

class FASTDB_DLL_ENTRY dbGlobalCriticalSection { 
    HANDLE       event;
    sharedsem_t* count;

  public:
    void enter() { 
	if (InterlockedDecrement(count) != 0) { 
	    // another process is in critical section
	    int rc = WaitForSingleObject(event, INFINITE);
	    assert (rc == WAIT_OBJECT_0);
	}
    }

    void leave() { 
	if (InterlockedIncrement(count) <= 0) { 
	    // some other processes try to enter critical section
	    SetEvent(event);
	}
    }

    bool open(char const* name, long* count) { 
	this->count = count;
	event = OpenEvent(EVENT_ALL_ACCESS, FALSE, name);
	return event != NULL;
    }
    bool create(char const* name, long* count) { 
	this->count = count;
	*count = 1;
	event = CreateEvent(NULL, false, false, name);
	return event != NULL;
    }
    void close() { 
	CloseHandle(event);
    }
    void erase() { 
	close();
    }
};
	
    
#else // Unix

#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <sys/time.h>
#include <sys/types.h>

#ifdef USE_POSIX_API
#include <semaphore.h>
#include <sys/mman.h>
#include <errno.h>
#else
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/mman.h>
#endif

#define thread_proc

#ifndef NO_PTHREADS

#include <pthread.h>

class dbMutex { 
    friend class dbLocalEvent;
    friend class dbLocalSemaphore;
    pthread_mutex_t cs;
  public:
    dbMutex() { 
	pthread_mutex_init(&cs, NULL);
    }
    ~dbMutex() { 
	pthread_mutex_destroy(&cs);
    }
    void lock() { 
	pthread_mutex_lock(&cs);
    }
    void unlock() { 
	pthread_mutex_unlock(&cs);
    }
};

const size_t dbThreadStackSize = 1024*1024;

class dbThread { 
    pthread_t thread;
  public:
    void create(void (thread_proc* f)(void*), void* arg) {
	pthread_attr_t attr;
	pthread_attr_init(&attr);
#if !defined(__linux__)
	pthread_attr_setstacksize(&attr, dbThreadStackSize);
#endif
#if defined(_AIX41)
	// At AIX 4.1, 4.2 threads are by default created detached
	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_UNDETACHED);
#endif
	pthread_create(&thread, &attr, (void*(*)(void*))f, arg);
	pthread_attr_destroy(&attr);
    }

    void join() { 
	void* result;
	pthread_join(thread, &result);
    }
    void detach() { 
	pthread_detach(thread);
    }

    enum ThreadPriority { 
	THR_PRI_LOW, 
	THR_PRI_HIGH
    };
    void setPriority(ThreadPriority pri) { 
#if defined(PRI_OTHER_MIN) && defined(PRI_OTHER_MAX)
	struct sched_param sp;
	sp.sched_priority = pri == THR_PRI_LOW ? PRI_OTHER_MIN : PRI_OTHER_MAX;
	pthread_setschedparam(thread, SCHED_OTHER, &sp); 
#endif
    }

    static int numberOfProcessors();
};


class dbLocalEvent { 
    pthread_cond_t   cond;
    int              signaled;
  public:
    void wait(dbMutex& mutex) { 
	while (!signaled) { 
	    pthread_cond_wait(&cond, &mutex.cs);
	}
    }
    void wait(dbMutex& mutex, time_t timeout) {
	while (!signaled) {
	    struct timespec abs_ts; 
#ifdef PTHREAD_GET_EXPIRATION_NP
	    struct timespec rel_ts; 
	    rel_ts.tv_sec = timeout; 
	    rel_ts.tv_nsec = 0;
	    pthread_get_expiration_np(&rel_ts, &abs_ts);
#else
	    struct timeval cur_tv;
	    gettimeofday(&cur_tv, NULL);
	    abs_ts.tv_sec = cur_tv.tv_sec + timeout; 
	    abs_ts.tv_nsec = cur_tv.tv_usec * 1000;
#endif
	    pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts);
	}
    }
    void signal() {
	signaled = true;
	pthread_cond_broadcast(&cond);
    }
    void reset() {
	signaled = false;
    }
    void open(bool initValue = false) { 
	signaled = initValue;
	pthread_cond_init(&cond, NULL);
    }
    void close() {
	pthread_cond_destroy(&cond);
    }
};

class dbLocalSemaphore { 
    pthread_cond_t   cond;
    int              count;
  public:
    void wait(dbMutex& mutex) { 
	while (count == 0) { 
	    pthread_cond_wait(&cond, &mutex.cs);
	}
	count -= 1;
    }
    void wait(dbMutex& mutex, time_t timeout) {
	while (count == 0) {
	    struct timespec abs_ts; 
#ifdef PTHREAD_GET_EXPIRATION_NP
	    struct timespec rel_ts; 
	    rel_ts.tv_sec = timeout; 
	    rel_ts.tv_nsec = 0;
	    pthread_get_expiration_np(&rel_ts, &abs_ts);
#else
	    struct timeval cur_tv;
	    gettimeofday(&cur_tv, NULL);
	    abs_ts.tv_sec = cur_tv.tv_sec + timeout; 
	    abs_ts.tv_nsec = cur_tv.tv_usec;
#endif
	    pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts);
	}
	count -= 1;
    }
     void signal(unsigned inc = 1) {
	count += inc;
	if (inc > 1) { 
	    pthread_cond_broadcast(&cond);
	} else if (inc == 1) { 
	    pthread_cond_signal(&cond);
	}
    }
    void open(unsigned initValue = 0) { 
	pthread_cond_init(&cond, NULL);
	count = initValue;
    }
    void close() {
	pthread_cond_destroy(&cond);
    }
};

template<class T> 
class dbThreadContext { 
    pthread_key_t key;
  public:
    T* get() { 
	return (T*)pthread_getspecific(key);
    }
    void set(T* value) { 
	pthread_setspecific(key, value);
    }
    dbThreadContext() { 
        pthread_key_create(&key, NULL);
    }
    ~dbThreadContext() { 
	pthread_key_delete(key);
    }
};

class dbProcessId { 
    int       pid;
    pthread_t tid;
  public:
    bool operator != (dbProcessId const& other) const { 
	return pid != other.pid || tid != other.tid;
    }

    void clear() { 
	pid = 0;
	tid = 0;
    }

    static dbProcessId getCurrent() {
	dbProcessId curr;
	curr.pid = getpid();
	curr.tid = pthread_self();
	return curr;
    }
};

#else

class dbMutex { 
   public:
    void lock() {}
    void unlock() {}
};

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -