sync.cpp

来自「FastDb是高效的内存数据库系统」· C++ 代码 · 共 856 行 · 第 1/2 页

CPP
856
字号

void dbGlobalCriticalSection::leave()
{
  int inc = 1;
  __asm__ __volatile__(

    "lock; xadd %0,%1"
  :"=d" (inc), "=m" (*count)
        :"d" (inc), "m" (*count));

  if (inc != 0)
  {
    /* some other processes waiting to enter critical section */

    static struct sembuf sops[] =
      {
        {
          0, 1, 0
        }
      };

    int rc = semop(semid, sops, 1);
    assert(rc == 0);
  }
}

bool dbGlobalCriticalSection::create(char const* name, sharedsem_t* count)
{
  this->count = count;
  *count = 1;
  return sem_init(semid, name, 0) == 0;
}

bool dbGlobalCriticalSection::open(char const* name, sharedsem_t* count)
{
  this->count = count;
  return sem_init(semid, name, 0) == 0;
}

#else // defined(__GNUC__) && defined(i386)
// "lowest" case, use a SysV semaphore for complete portability
void dbGlobalCriticalSection::enter()
{

  static struct sembuf sops[] =
    {
      {
        0, -1, SEM_UNDO
      }
    };

  int rc;

  while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR)

    ;
  assert(rc == 0);
}

void dbGlobalCriticalSection::leave()
{

  static struct sembuf sops[] =
    {
      {
        0, 1, SEM_UNDO
      }
    };

  int rc = semop(semid, sops, 1);
  assert(rc == 0);
}

bool dbGlobalCriticalSection::open(char const* name, sharedsem_t*)
{
  // XXX: sem_init is Posix, the rest of these calls are SysV.
  return sem_init(semid, name, 1) == 0;
}

bool dbGlobalCriticalSection::create(char const* name, sharedsem_t*)
{
  return sem_init(semid, name, 1) == 0;
}

#endif // defined(__GNUC__) && defined(i386)

void dbGlobalCriticalSection::erase()
{
  semctl(semid, 0, IPC_RMID, &u);
}

dbInitializationMutex::initializationStatus
dbInitializationMutex::initialize(char const* name)
{

  struct sembuf sops[4];
  char* path = (char*)name;

  if (strchr(name, '/') == NULL)
  {
    path = new char[strlen(name)+strlen(keyFileDir)+1];
    sprintf(path, "%s%s", keyFileDir, name);
  }

  int fd = open(path, O_WRONLY|O_CREAT, 0777);

  if (fd < 0)
  {
    if (path != name)
    {
      delete[] path;
    }

    PRINT_ERROR("open");
    return InitializationError;
  }

  ::close(fd);
  int key = ftok(path, '0');

  if (path != name)
  {
    delete[] path;
  }

  if (key < 0)
  {
    PRINT_ERROR("ftok");
    return InitializationError;
  }

  while (true)
  {
    semid = semget(key, 3, IPC_CREAT|0777);

    if (semid < 0)
    {
      PRINT_ERROR("semget");
      return InitializationError;
    }

    // Semaphore 0 - number of active processes
    // Semaphore 1 - intialization in progress (1 while initialization, 0 after it)
    // Semaphore 2 - semaphore was destroyed

    sops[0].sem_num = 0;

    sops[0].sem_op  = 0; /* check if semaphore was already initialized */

    sops[0].sem_flg = IPC_NOWAIT;

    sops[1].sem_num = 0;

    sops[1].sem_op  = 1; /* increment number of active processes */

    sops[1].sem_flg = SEM_UNDO;

    sops[2].sem_num = 1;

    sops[2].sem_op  = 1; /* initialization in process */

    sops[2].sem_flg = SEM_UNDO;

    sops[3].sem_num = 2;

    sops[3].sem_op  = 0; /* check if semaphore was destroyed */

    sops[3].sem_flg = IPC_NOWAIT;

    if (semop(semid, sops, 4) < 0)
    {
      if (errno == EAGAIN)
      {
        sops[0].sem_num = 0;
        sops[0].sem_op  = -1; /* check if semaphore was already initialized */
        sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;
        sops[1].sem_num = 1;
        sops[1].sem_op  = 0; /* wait until inialization completed */
        sops[1].sem_flg = 0;
        sops[2].sem_num = 0;
        sops[2].sem_op  = 2; /* increment number of active processes */
        sops[3].sem_flg = SEM_UNDO;
        sops[3].sem_num = 2;
        sops[3].sem_op  = 0; /* check if semaphore was destroyed */
        sops[3].sem_flg = IPC_NOWAIT;

        if (semop(semid, sops, 4) == 0)
        {
          return AlreadyInitialized;
        }

        if (errno == EAGAIN)
        {
          sleep(1);
          continue;
        }
      }

      if (errno == EIDRM)
      {
        continue;
      }

      PRINT_ERROR("semop");
      return InitializationError;
    }
    else
    {
      return NotYetInitialized;
    }
  }
}

void dbInitializationMutex::done()
{

  struct sembuf sops[1];
  sops[0].sem_num = 1;
  sops[0].sem_op  = -1; /* initialization done */
  sops[0].sem_flg = SEM_UNDO;
  int rc = semop(semid, sops, 1);
  assert(rc == 0);
}

bool dbInitializationMutex::close()
{
  int rc;

  struct sembuf sops[3];

  while (true)
  {
    sops[0].sem_num = 0;
    sops[0].sem_op  = -1; /* decrement process couter */
    sops[0].sem_flg = SEM_UNDO;
    sops[1].sem_num = 0;
    sops[1].sem_op  = 0;  /* check if there are no more active processes */
    sops[1].sem_flg = IPC_NOWAIT;
    sops[2].sem_num = 2;
    sops[2].sem_op  = 1;  /* mark as destructed */
    sops[2].sem_flg = SEM_UNDO;

    if ((rc = semop(semid, sops, 3)) == 0)
    {
      return true;
    }
    else
    {
      assert(errno == EAGAIN);
    }

    sops[0].sem_num = 0;
    sops[0].sem_op  = -2; /* decrement process couter and check for non-zero */
    sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;
    sops[1].sem_num = 0;
    sops[1].sem_op  = 1;
    sops[1].sem_flg = SEM_UNDO;

    if ((rc = semop(semid, sops, 2)) == 0)
    {
      return false;
    }
    else
    {
      assert(errno == EAGAIN);
    }
  }
}

void dbInitializationMutex::erase()
{
  semctl(semid, 0, IPC_RMID, &u);
}

#endif // !USE_POSIX_SEMAPHORES



//  Thread stuff
#ifndef NO_PTHREADS

#if defined(_SC_NPROCESSORS_ONLN)
int dbThread::numberOfProcessors()
{
  return sysconf(_SC_NPROCESSORS_ONLN);
}

#elif defined(__linux__)
#include <linux/smp.h>
int dbThread::numberOfProcessors()
{
  return smp_num_cpus;
}

#elif defined(__FreeBSD__)
#include <sys/sysctl.h>
int dbThread::numberOfProcessors()
{
  int mib[2],ncpus=0;
  size_t len=sizeof(ncpus);
  mib[0]= CTL_HW;
  mib[1]= HW_NCPU;
  sysctl(mib,2,&ncpus,&len,NULL,0);
  return ncpus;
}

#else
int dbThread::numberOfProcessors()
{
  return 1;
}

#endif
#endif // NO_PTHREADS


#else // _WIN32

// Win32 specific code
unsigned dbSystem::getCurrentTimeMsec()
{
  return GetTickCount();
}

#ifdef SET_NULL_DACL
dbNullSecurityDesciptor dbNullSecurityDesciptor::instance;
#endif

#endif // __WIN32

//////////////////////////////////////////////////////////////
// Common W32 and Unix platform code follows

void thread_proc dbPooledThread::pooledThreadFunc(void* arg)
{
  ((dbPooledThread*)arg)->run();
}

dbPooledThread::dbPooledThread(dbThreadPool* threadPool)
{
  pool = threadPool;
  startSem.open();
  readySem.open();
  next = NULL;
  running = true;
  thread.create(&pooledThreadFunc, this);
}

dbPooledThread::~dbPooledThread()
{
  startSem.close();
  readySem.close();
}

void dbPooledThread::stop()
{
  running = false;
  startSem.signal();
  readySem.wait(pool->mutex);
}

void dbPooledThread::run()
{
  dbCriticalSection cs(pool->mutex);

  while (true)
  {
    startSem.wait(pool->mutex);

    if (!running)
    {
      break;
    }

    (*f)(arg);
    readySem.signal();
  }

  readySem.signal();
}

void dbThreadPool::join(dbPooledThread* thr)
{
  dbCriticalSection cs(mutex);
  thr->readySem.wait(mutex);
  thr->next = freeThreads;
  freeThreads = thr;
}


dbPooledThread* dbThreadPool::create(dbThread::thread_proc_t f, void* arg)
{
  dbCriticalSection cs(mutex);
  dbPooledThread* t = freeThreads;

  if (t == NULL)
  {
    t = freeThreads = new dbPooledThread(this);
  }

  freeThreads = t->next;
  t->f = f;
  t->arg = arg;
  t->startSem.signal();
  return t;
}


dbThreadPool::dbThreadPool()
{
  freeThreads = NULL;
}

dbThreadPool::~dbThreadPool()
{
  dbCriticalSection cs(mutex);
  dbPooledThread *t, *next;

  for (t = freeThreads; t != NULL; t = next)
  {
    next = t->next;
    t->stop();
    delete t;
  }
}


⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?