⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 server.cpp

📁 FastDb是高效的内存数据库系统
💻 CPP
📖 第 1 页 / 共 4 页
字号:
//-< SERVER.CPP >----------------------------------------------------*--------*
// FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *
// (Main Memory Database Management System)                          *   /\|  *
//                                                                   *  /  \  *
//                          Created:     13-Jan-2000 K.A. Knizhnik   * / [] \ *
//                          Last update: 13-Jan-2000 K.A. Knizhnik   * GARRET *
//-------------------------------------------------------------------*--------*
// CLI multithreaded server implementation
//-------------------------------------------------------------------*--------*

#include <ctype.h>
#include "fastdb.h"
#include "compiler.h"
#include "wwwapi.h"
#include "subsql.h"
#include "symtab.h"
#include "hashtab.h"
#include "ttree.h"
#include "cli.h"
#include "cliproto.h"
#include "server.h"
#include "localcli.h"

#if !THREADS_SUPPORTED
#error Server requires multithreading support
#endif

int dbColumnBinding::unpackArray(char* dst, size_t offs)
{
  int len = this->len;
  int i;

  if (cliType >= cli_array_of_oid)
  {
    switch (sizeof_type[cliType - cli_array_of_oid])
    {

    case 1:
      memcpy(dst + offs, ptr + 4, len);
      break;

    case 2:

      for (i = 0; i < len; i++)
      {
        unpack2(dst + offs + i*2, ptr + 4 + i*2);
      }

      break;

    case 4:

      for (i = 0; i < len; i++)
      {
        unpack4(dst + offs + i*4, ptr + 4 + i*4);
      }

      break;

    case 8:

      for (i = 0; i < len; i++)
      {
        unpack8(dst + offs + i*8, ptr + 4 + i*8);
      }

      break;

    default:
      assert(false);
    }
  }
  else
  { // string
    memcpy(dst + offs, ptr + 4, len);
  }

  return len;
}

void dbColumnBinding::unpackScalar(char* dst)
{
  if (cliType == cli_autoincrement)
  {
    assert(fd->type == dbField::tpInt4);
#ifdef AUTOINCREMENT_SUPPORT

    *(int4*)(dst+fd->dbsOffs) = fd->defTable->autoincrementCount;
#else

    *(int4*)(dst+fd->dbsOffs) = ((dbTable*)fd->defTable->db->getRow(fd->defTable->tableId))->nRows;
#endif

    return;
  }

  switch (fd->type)
  {

  case dbField::tpBool:

  case dbField::tpInt1:

    switch (sizeof_type[cliType])
    {

    case 1:
      *(dst + fd->dbsOffs) = *ptr;
      break;

    case 2:
      *(dst + fd->dbsOffs) = (char)unpack2(ptr);
      break;

    case 4:
      *(dst + fd->dbsOffs) = (char)unpack4(ptr);
      break;

    case 8:
      *(dst + fd->dbsOffs) = (char)unpack8(ptr);
      break;

    default:
      assert(false);
    }

    break;

  case dbField::tpInt2:

    switch (sizeof_type[cliType])
    {

    case 1:
      *(int2*)(dst+fd->dbsOffs) = *ptr;
      break;

    case 2:
      unpack2(dst+fd->dbsOffs, ptr);
      break;

    case 4:
      *(int2*)(dst+fd->dbsOffs) = (int2)unpack4(ptr);
      break;

    case 8:
      *(int2*)(dst+fd->dbsOffs) = (int2)unpack8(ptr);
      break;

    default:
      assert(false);
    }

    break;

  case dbField::tpInt4:

    switch (sizeof_type[cliType])
    {

    case 1:
      *(int4*)(dst+fd->dbsOffs) = *ptr;
      break;

    case 2:
      *(int4*)(dst+fd->dbsOffs) = unpack2(ptr);
      break;

    case 4:
      unpack4(dst+fd->dbsOffs, ptr);
      break;

    case 8:
      *(int4*)(dst+fd->dbsOffs) = (int4)unpack8(ptr);
      break;

    default:
      assert(false);
    }

    break;

  case dbField::tpInt8:

    switch (sizeof_type[cliType])
    {

    case 1:
      *(db_int8*)(dst+fd->dbsOffs) = *ptr;
      break;

    case 2:
      *(db_int8*)(dst+fd->dbsOffs) = unpack2(ptr);
      break;

    case 4:
      *(db_int8*)(dst+fd->dbsOffs) = unpack4(ptr);
      break;

    case 8:
      unpack8(dst+fd->dbsOffs, ptr);
      break;

    default:
      assert(false);
    }

    break;

  case dbField::tpReal4:

    switch (cliType)
    {

    case cli_real4:
      unpack4(dst+fd->dbsOffs, ptr);
      break;

    case cli_real8:
      {
        real8 temp;
        unpack8((char*)&temp, ptr);
        *(real4*)(dst + fd->dbsOffs) = (real4)temp;
      }

      break;

    default:
      assert(false);
    }

    break;

  case dbField::tpReal8:

    switch (cliType)
    {

    case cli_real4:
      {
        real4 temp;
        unpack4((char*)&temp, ptr);
        *(real8*)(dst + fd->dbsOffs) = temp;
      }

      break;

    case cli_real8:
      unpack8(dst+fd->dbsOffs, ptr);
      break;

    default:
      assert(false);
    }

    break;

  default:
    assert(false);
  }
}

void dbStatement::reset()
{
  dbColumnBinding *cb, *next;

  for (cb = columns; cb != NULL; cb = next)
  {
    next = cb->next;
    delete cb;
  }

  columns = NULL;
  delete[] params;
  params = NULL;
  delete cursor;
  cursor = NULL;
  query.reset();
  table = NULL;
}

int dbQueryScanner::get
  ()
{
  int i = 0, ch, digits;

  do
  {
    if ((ch = *p++) == '\0')
    {
      return tkn_eof;
    }
  }
  while (isspace(ch));

  if (ch == '*')
  {
    return tkn_all;
  }
  else if (isdigit(ch) || ch == '+' || ch == '-')
  {
    do
    {
      buf[i++] = ch;

      if (i == dbQueryMaxIdLength)
      {
        // Numeric constant too long
        return tkn_error;
      }

      ch = *p++;
    }
    while (ch != '\0'
           && (isdigit(ch) || ch == '+' || ch == '-' || ch == 'e' ||
               ch == 'E' || ch == '.'));

    p -= 1;

    buf[i] = '\0';

    if (sscanf(buf, INT8_FORMAT "%n", &ival, &digits) != 1)
    {
      // Bad integer constant
      return tkn_error;
    }

    if (digits != i)
    {
      if (sscanf(buf, "%lf%n", &fval, &digits) != 1 || digits != i)
      {
        // Bad float constant
        return tkn_error;
      }

      return tkn_fconst;
    }

    return tkn_iconst;
  }
  else if (isalpha(ch) || ch == '$' || ch == '_')
  {
    do
    {
      buf[i++] = ch;

      if (i == dbQueryMaxIdLength)
      {
        // Identifier too long
        return tkn_error;
      }

      ch = *p++;
    }
    while (ch != EOF && (isalnum(ch) || ch == '$' || ch == '_'));

    p -= 1;

    buf[i] = '\0';

    ident = buf;

    return dbSymbolTable::add
             (ident, tkn_ident);
  }
  else
  {
    // Invalid symbol
    return tkn_error;
  }
}

dbServer* dbServer::chain;

inline dbStatement* dbServer::findStatement(dbSession* session, int stmt_id)
{
  for (dbStatement* stmt = session->stmts; stmt != NULL; stmt = stmt->next)
  {
    if (stmt->id == stmt_id)
    {
      return stmt;
    }
  }

  return NULL;
}

void thread_proc dbServer::serverThread(void* arg)
{
  ((dbServer*)arg)->serveClient();
}

void thread_proc dbServer::acceptLocalThread(void* arg)
{
  dbServer* server = (dbServer*)arg;
  server->acceptConnection(server->localAcceptSock);
}

void thread_proc dbServer::acceptGlobalThread(void* arg)
{
  dbServer* server = (dbServer*)arg;
  server->acceptConnection(server->globalAcceptSock);
}

dbServer::dbServer(dbDatabase* db,
                   char const* serverURL,
                   int optimalNumberOfThreads,
                   int connectionQueueLen)
{
  char buf[256];
  next = chain;
  chain = this;
  this->db = db;
  this->optimalNumberOfThreads = optimalNumberOfThreads;
  this->URL = new char[strlen(serverURL)+1];
  strcpy(URL, serverURL);
  globalAcceptSock =
    socket_t::create_global(serverURL, connectionQueueLen);

  if (!globalAcceptSock->is_ok())
  {
    globalAcceptSock->get_error_text(buf, sizeof buf);
    dbTrace("Failed to create global socket: %s\n", buf);
    delete globalAcceptSock;
    globalAcceptSock = NULL;
  }

  localAcceptSock =
    socket_t::create_local(serverURL, connectionQueueLen);

  if (!localAcceptSock->is_ok())
  {
    localAcceptSock->get_error_text(buf, sizeof buf);
    dbTrace("Failed to create local socket: %s\n", buf);
    delete localAcceptSock;
    localAcceptSock = NULL;
  }

  freeList = activeList = waitList = NULL;
  waitListLength = 0;
}

dbServer* dbServer::find(char const* URL)
{
  for (dbServer* server = chain; server != NULL; server = server->next)
  {
    if (strcmp(URL, server->URL) == 0)
    {
      return server;
    }
  }

  return NULL;
}

void dbServer::cleanup()
{
  dbServer *server, *next;

  for (server = chain; server != NULL; server = next)
  {
    next = server->next;
    delete server;
  }
}

void dbServer::start()
{
  nActiveThreads = nIdleThreads = 0;
  cancelWait = cancelSession = cancelAccept = false;
  go.open();
  done.open();

  if (globalAcceptSock != NULL)
  {
    globalAcceptThread.create(acceptGlobalThread, this);
  }

  if (localAcceptSock != NULL)
  {
    localAcceptThread.create(acceptLocalThread, this);
  }
}

void dbServer::stop()
{
  cancelAccept = true;

  if (globalAcceptSock != NULL)
  {
    globalAcceptSock->cancel_accept();
    globalAcceptThread.join();
  }

  delete globalAcceptSock;
  globalAcceptSock = NULL;

  if (localAcceptSock != NULL)
  {
    localAcceptSock->cancel_accept();
    localAcceptThread.join();
  }

  delete localAcceptSock;
  localAcceptSock = NULL;

  dbCriticalSection cs(mutex);
  cancelSession = true;

  while (activeList != NULL)
  {
    activeList->sock->shutdown();
    done.wait(mutex);
  }

  cancelWait = true;

  while (nIdleThreads != 0)
  {
    go.signal();
    done.wait(mutex);
  }

  while (waitList != NULL)
  {
    dbSession* next = waitList->next;
    delete waitList->sock;
    waitList->next = freeList;
    freeList = waitList;
    waitList = next;
  }

  waitListLength = 0;
  assert(nActiveThreads == 0);
  done.close();
  go.close();
}

bool dbServer::freeze(dbSession* session, int stmt_id)
{
  dbStatement* stmt = findStatement(session, stmt_id);
  int4 response = cli_ok;

  if (stmt == NULL || stmt->cursor == NULL)
  {
    response = cli_bad_descriptor;
  }
  else
  {
    stmt->cursor->freeze();
  }

  pack4(response);
  return session->sock->write(&response, sizeof response);
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -