⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 server.cpp

📁 FastDb是高效的内存数据库系统
💻 CPP
📖 第 1 页 / 共 4 页
字号:
//-< SERVER.CPP >----------------------------------------------------*--------*// FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *// (Main Memory Database Management System)                          *   /\|  *//                                                                   *  /  \  *//                          Created:     13-Jan-2000 K.A. Knizhnik   * / [] \ *//                          Last update: 13-Jan-2000 K.A. Knizhnik   * GARRET *//-------------------------------------------------------------------*--------*// CLI multithreaded server implementation//-------------------------------------------------------------------*--------*#include <ctype.h>#include "fastdb.h"#include "compiler.h"#include "wwwapi.h"#include "subsql.h"#include "symtab.h"#include "hashtab.h"#include "ttree.h"#include "rtree.h"#include "cli.h"#include "cliproto.h"#include "server.h"#include "localcli.h"BEGIN_FASTDB_NAMESPACE#if !THREADS_SUPPORTED#error Server requires multithreading support#endifint dbColumnBinding::unpackArray(char* dst, size_t offs){    int len = this->len;    int i;    if (cliType >= cli_array_of_oid) {         switch (sizeof_type[cliType - cli_array_of_oid]) {           case 1:            memcpy(dst + offs, ptr + 4, len);            break;          case 2:            for (i = 0; i < len; i++) {                unpack2(dst + offs + i*2, ptr + 4 + i*2);            }            break;          case 4:            for (i = 0; i < len; i++) {                unpack4(dst + offs + i*4, ptr + 4 + i*4);            }            break;          case 8:            for (i = 0; i < len; i++) {                unpack8(dst + offs + i*8, ptr + 4 + i*8);            }            break;          default:            assert(false);        }    } else { // string        memcpy(dst + offs, ptr + 4, len);    }    return len;}void dbColumnBinding::unpackScalar(char* dst){    if (cliType == cli_autoincrement) {         assert(fd->type == dbField::tpInt4);#ifdef AUTOINCREMENT_SUPPORT        *(int4*)(dst+fd->dbsOffs) = fd->defTable->autoincrementCount;#else        *(int4*)(dst+fd->dbsOffs) = ((dbTable*)fd->defTable->db->getRow(fd->defTable->tableId))->nRows;#endif        return;    }     switch (fd->type) {       case dbField::tpBool:      case dbField::tpInt1:        switch (sizeof_type[cliType]) {           case 1:            *(dst + fd->dbsOffs) = *ptr;            break;          case 2:            *(dst + fd->dbsOffs) = (char)unpack2(ptr);            break;          case 4:            *(dst + fd->dbsOffs) = (char)unpack4(ptr);            break;          case 8:            *(dst + fd->dbsOffs) = (char)unpack8(ptr);            break;          default:            assert(false);        }        break;                case dbField::tpInt2:        switch (sizeof_type[cliType]) {           case 1:            *(int2*)(dst+fd->dbsOffs) = *ptr;            break;          case 2:            unpack2(dst+fd->dbsOffs, ptr);            break;          case 4:            *(int2*)(dst+fd->dbsOffs) = (int2)unpack4(ptr);            break;          case 8:            *(int2*)(dst+fd->dbsOffs) = (int2)unpack8(ptr);            break;          default:            assert(false);        }        break;                case dbField::tpInt4:        switch (sizeof_type[cliType]) {           case 1:            *(int4*)(dst+fd->dbsOffs) = *ptr;            break;          case 2:            *(int4*)(dst+fd->dbsOffs) = unpack2(ptr);            break;          case 4:            unpack4(dst+fd->dbsOffs, ptr);            break;          case 8:            *(int4*)(dst+fd->dbsOffs) = (int4)unpack8(ptr);            break;          default:            assert(false);        }        break;                case dbField::tpInt8:        switch (sizeof_type[cliType]) {           case 1:            *(db_int8*)(dst+fd->dbsOffs) = *ptr;            break;          case 2:            *(db_int8*)(dst+fd->dbsOffs) = unpack2(ptr);            break;          case 4:            *(db_int8*)(dst+fd->dbsOffs) = unpack4(ptr);            break;          case 8:            unpack8(dst+fd->dbsOffs, ptr);            break;          default:            assert(false);        }        break;                case dbField::tpReal4:        switch (cliType) {           case cli_real4:            unpack4(dst+fd->dbsOffs, ptr);            break;          case cli_real8:            {                real8 temp;                unpack8((char*)&temp, ptr);                *(real4*)(dst + fd->dbsOffs) = (real4)temp;            }            break;          default:            assert(false);        }        break;      case dbField::tpReal8:        switch (cliType) {           case cli_real4:            {                real4 temp;                unpack4((char*)&temp, ptr);                *(real8*)(dst + fd->dbsOffs) = temp;            }            break;          case cli_real8:            unpack8(dst+fd->dbsOffs, ptr);            break;          default:            assert(false);        }        break;     case dbField::tpReference:        *(oid_t*)(dst + fd->dbsOffs) = unpack_oid(ptr);         break;      case dbField::tpRectangle:        unpack_rectangle((cli_rectangle_t*)(dst + fd->dbsOffs), ptr);        break;      default:        assert(false);    }}void dbStatement::reset(){    dbColumnBinding *cb, *next;    for (cb = columns; cb != NULL; cb = next) {         next = cb->next;        delete cb;    }    columns = NULL;    delete[] params;    params = NULL;    delete cursor;    cursor = NULL;    query.reset();    table = NULL;}int dbQueryScanner::get(){    int i = 0, ch, digits;        do {         if ((ch = *p++) == '\0') {             return tkn_eof;        }    } while (isspace(ch));        if (ch == '*') {         return tkn_all;    } else if (isdigit(ch) || ch == '+' || ch == '-') {         do {             buf[i++] = ch;            if (i == dbQueryMaxIdLength) {                 // Numeric constant too long                return tkn_error;            }            ch = *p++;        } while (ch != '\0'                  && (isdigit(ch) || ch == '+' || ch == '-' || ch == 'e' ||                      ch == 'E' || ch == '.'));        p -= 1;        buf[i] = '\0';        if (sscanf(buf, INT8_FORMAT "%n", &ival, &digits) != 1) {             // Bad integer constant            return tkn_error;        }        if (digits != i) {             if (sscanf(buf, "%lf%n", &fval, &digits) != 1 || digits != i) {                // Bad float constant                return tkn_error;            }            return tkn_fconst;        }         return tkn_iconst;    } else if (isalpha(ch) || ch == '$' || ch == '_') {         do {             buf[i++] = ch;            if (i == dbQueryMaxIdLength) {                 // Identifier too long                return tkn_error;            }            ch = *p++;        } while (ch != EOF && (isalnum(ch) || ch == '$' || ch == '_'));        p -= 1;        buf[i] = '\0';        ident = buf;        return dbSymbolTable::add(ident, tkn_ident);    } else {         // Invalid symbol        return tkn_error;    }}dbServer* dbServer::chain;inline dbStatement* dbServer::findStatement(dbSession* session, int stmt_id){    for (dbStatement* stmt = session->stmts; stmt != NULL; stmt = stmt->next)    {        if (stmt->id == stmt_id) {             return stmt;        }    }    return NULL;}void thread_proc dbServer::serverThread(void* arg){    ((dbServer*)arg)->serveClient();}void thread_proc dbServer::acceptLocalThread(void* arg){    dbServer* server = (dbServer*)arg;    server->acceptConnection(server->localAcceptSock);}void thread_proc dbServer::acceptGlobalThread(void* arg){    dbServer* server = (dbServer*)arg;    server->acceptConnection(server->globalAcceptSock);}dbServer::dbServer(dbDatabase* db,                   char const* serverURL,                    int optimalNumberOfThreads,                    int connectionQueueLen){    char buf[256];    next = chain;    chain = this;    this->db = db;    this->optimalNumberOfThreads = optimalNumberOfThreads;    this->URL = new char[strlen(serverURL)+1];    strcpy(URL, serverURL);    globalAcceptSock =         socket_t::create_global(serverURL, connectionQueueLen);    if (!globalAcceptSock->is_ok()) {         globalAcceptSock->get_error_text(buf, sizeof buf);        dbTrace("Failed to create global socket: %s\n", buf);        delete globalAcceptSock;        globalAcceptSock = NULL;    }    localAcceptSock =         socket_t::create_local(serverURL, connectionQueueLen);    if (!localAcceptSock->is_ok()) {         localAcceptSock->get_error_text(buf, sizeof buf);        dbTrace("Failed to create local socket: %s\n", buf);        delete localAcceptSock;        localAcceptSock = NULL;    }    freeList = activeList = waitList = NULL;    waitListLength = 0;}   dbServer* dbServer::find(char const* URL){    for (dbServer* server = chain; server != NULL; server = server->next) {         if (strcmp(URL, server->URL) == 0) {             return server;        }    }    return NULL;}void dbServer::cleanup(){    dbServer *server, *next;    for (server = chain; server != NULL; server = next) {         next = server->next;        delete server;    } }void dbServer::start(){    nActiveThreads = nIdleThreads = 0;        cancelWait = cancelSession = cancelAccept = false;    go.open();    done.open();    if (globalAcceptSock != NULL) {         globalAcceptThread.create(acceptGlobalThread, this);    }    if (localAcceptSock != NULL) {         localAcceptThread.create(acceptLocalThread, this);    }}void dbServer::stop(){    cancelAccept = true;    if (globalAcceptSock != NULL) {        globalAcceptSock->cancel_accept();        globalAcceptThread.join();    }    delete globalAcceptSock;    globalAcceptSock = NULL;    if (localAcceptSock != NULL) {         localAcceptSock->cancel_accept();        localAcceptThread.join();    }    delete localAcceptSock;    localAcceptSock = NULL;    dbCriticalSection cs(mutex);    cancelSession = true;    while (activeList != NULL) {         activeList->sock->shutdown();        done.wait(mutex);    }    cancelWait = true;    while (nIdleThreads != 0) {         go.signal();        done.wait(mutex);    }    while (waitList != NULL) {        dbSession* next = waitList->next;        delete waitList->sock;        waitList->next = freeList;        freeList = waitList;        waitList = next;    }    waitListLength = 0;    assert(nActiveThreads == 0);    done.close();    go.close();}bool dbServer::freeze(dbSession* session, int stmt_id){    dbStatement* stmt = findStatement(session, stmt_id);    int4 response = cli_ok;    if (stmt == NULL || stmt->cursor == NULL) {         response = cli_bad_descriptor;    } else {         stmt->cursor->freeze();    }    pack4(response);    return session->sock->write(&response, sizeof response);}    bool dbServer::unfreeze(dbSession* session, int stmt_id)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -