⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 server.cpp

📁 最新版本!fastdb是高效的内存数据库系统
💻 CPP
📖 第 1 页 / 共 4 页
字号:
//-< SERVER.CPP >----------------------------------------------------*--------*
// FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *
// (Main Memory Database Management System)                          *   /\|  *
//                                                                   *  /  \  *
//                          Created:     13-Jan-2000 K.A. Knizhnik   * / [] \ *
//                          Last update: 13-Jan-2000 K.A. Knizhnik   * GARRET *
//-------------------------------------------------------------------*--------*
// CLI multithreaded server implementation
//-------------------------------------------------------------------*--------*

#include "fastdb.h"
#include "compiler.h"
#include "wwwapi.h"
#include "subsql.h"
#include "symtab.h"
#include "hashtab.h"
#include "ttree.h"
#include "rtree.h"
#include "cli.h"
#include "cliproto.h"
#include "server.h"
#include "localcli.h"
#include <ctype.h>

BEGIN_FASTDB_NAMESPACE

#if !THREADS_SUPPORTED
#error Server requires multithreading support
#endif

int dbColumnBinding::unpackArray(char* dst, size_t offs)
{
    int len = this->len;
    int i;
    if (cliType >= cli_array_of_oid) { 
        switch (sizeof_type[cliType - cli_array_of_oid]) { 
          case 1:
            memcpy(dst + offs, ptr + 4, len);
            break;
          case 2:
            for (i = 0; i < len; i++) {
                unpack2(dst + offs + i*2, ptr + 4 + i*2);
            }
            break;
          case 4:
            for (i = 0; i < len; i++) {
                unpack4(dst + offs + i*4, ptr + 4 + i*4);
            }
            break;
          case 8:
            for (i = 0; i < len; i++) {
                unpack8(dst + offs + i*8, ptr + 4 + i*8);
            }
            break;
          default:
            assert(false);
        }
    } else { // string
        memcpy(dst + offs, ptr + 4, len);
    }
    return len;
}

void dbColumnBinding::unpackScalar(char* dst, bool insert)
{
    if (cliType == cli_autoincrement) { 
        assert(fd->type == dbField::tpInt4);
        if (insert) { 
#ifdef AUTOINCREMENT_SUPPORT
            *(int4*)(dst+fd->dbsOffs) = fd->defTable->autoincrementCount;
#else
            *(int4*)(dst+fd->dbsOffs) = ((dbTable*)fd->defTable->db->getRow(fd->defTable->tableId))->nRows;
#endif
        }
        return;
    } 
    switch (fd->type) { 
      case dbField::tpBool:
      case dbField::tpInt1:
        switch (sizeof_type[cliType]) { 
          case 1:
            *(dst + fd->dbsOffs) = *ptr;
            break;
          case 2:
            *(dst + fd->dbsOffs) = (char)unpack2(ptr);
            break;
          case 4:
            *(dst + fd->dbsOffs) = (char)unpack4(ptr);
            break;
          case 8:
            *(dst + fd->dbsOffs) = (char)unpack8(ptr);
            break;
          default:
            assert(false);
        }
        break;          
      case dbField::tpInt2:
        switch (sizeof_type[cliType]) { 
          case 1:
            *(int2*)(dst+fd->dbsOffs) = *ptr;
            break;
          case 2:
            unpack2(dst+fd->dbsOffs, ptr);
            break;
          case 4:
            *(int2*)(dst+fd->dbsOffs) = (int2)unpack4(ptr);
            break;
          case 8:
            *(int2*)(dst+fd->dbsOffs) = (int2)unpack8(ptr);
            break;
          default:
            assert(false);
        }
        break;          
      case dbField::tpInt4:
        switch (sizeof_type[cliType]) { 
          case 1:
            *(int4*)(dst+fd->dbsOffs) = *ptr;
            break;
          case 2:
            *(int4*)(dst+fd->dbsOffs) = unpack2(ptr);
            break;
          case 4:
            unpack4(dst+fd->dbsOffs, ptr);
            break;
          case 8:
            *(int4*)(dst+fd->dbsOffs) = (int4)unpack8(ptr);
            break;
          default:
            assert(false);
        }
        break;          
      case dbField::tpInt8:
        switch (sizeof_type[cliType]) { 
          case 1:
            *(db_int8*)(dst+fd->dbsOffs) = *ptr;
            break;
          case 2:
            *(db_int8*)(dst+fd->dbsOffs) = unpack2(ptr);
            break;
          case 4:
            *(db_int8*)(dst+fd->dbsOffs) = unpack4(ptr);
            break;
          case 8:
            unpack8(dst+fd->dbsOffs, ptr);
            break;
          default:
            assert(false);
        }
        break;          
      case dbField::tpReal4:
        switch (cliType) { 
          case cli_real4:
            unpack4(dst+fd->dbsOffs, ptr);
            break;
          case cli_real8:
            {
                real8 temp;
                unpack8((char*)&temp, ptr);
                *(real4*)(dst + fd->dbsOffs) = (real4)temp;
            }
            break;
          default:
            assert(false);
        }
        break;
      case dbField::tpReal8:
        switch (cliType) { 
          case cli_real4:
            {
                real4 temp;
                unpack4((char*)&temp, ptr);
                *(real8*)(dst + fd->dbsOffs) = temp;
            }
            break;
          case cli_real8:
            unpack8(dst+fd->dbsOffs, ptr);
            break;
          default:
            assert(false);
        }
        break;
     case dbField::tpReference:
        *(oid_t*)(dst + fd->dbsOffs) = unpack_oid(ptr);
         break;
      case dbField::tpRectangle:
        unpack_rectangle((cli_rectangle_t*)(dst + fd->dbsOffs), ptr);
        break;
      default:
        assert(false);
    }
}

void dbStatement::reset()
{
    dbColumnBinding *cb, *next;
    for (cb = columns; cb != NULL; cb = next) { 
        next = cb->next;
        delete cb;
    }
    columns = NULL;
    delete[] params;
    params = NULL;
    delete cursor;
    cursor = NULL;
    query.reset();
    table = NULL;
}

int dbQueryScanner::get()
{
    int i = 0, ch, digits;
    
    do { 
        if ((ch = *p++) == '\0') { 
            return tkn_eof;
        }
    } while (isspace(ch));
    
    if (ch == '*') { 
        return tkn_all;
    } else if (isdigit(ch) || ch == '+' || ch == '-') { 
        do { 
            buf[i++] = ch;
            if (i == dbQueryMaxIdLength) { 
                // Numeric constant too long
                return tkn_error;
            }
            ch = *p++;
        } while (ch != '\0' 
                 && (isdigit(ch) || ch == '+' || ch == '-' || ch == 'e' || 
                     ch == 'E' || ch == '.'));
        p -= 1;
        buf[i] = '\0';
        if (sscanf(buf, INT8_FORMAT "%n", &ival, &digits) != 1) { 
            // Bad integer constant
            return tkn_error;
        }
        if (digits != i) { 
            if (sscanf(buf, "%lf%n", &fval, &digits) != 1 || digits != i) {
                // Bad float constant
                return tkn_error;
            }
            return tkn_fconst;
        } 
        return tkn_iconst;
    } else if (isalpha(ch) || ch == '$' || ch == '_') { 
        do { 
            buf[i++] = ch;
            if (i == dbQueryMaxIdLength) { 
                // Identifier too long
                return tkn_error;
            }
            ch = *p++;
        } while (ch != EOF && (isalnum(ch) || ch == '$' || ch == '_'));
        p -= 1;
        buf[i] = '\0';
        ident = buf;
        return dbSymbolTable::add(ident, tkn_ident);
    } else { 
        // Invalid symbol
        return tkn_error;
    }
}

dbServer* dbServer::chain;

inline dbStatement* dbServer::findStatement(dbSession* session, int stmt_id)
{
    for (dbStatement* stmt = session->stmts; stmt != NULL; stmt = stmt->next)
    {
        if (stmt->id == stmt_id) { 
            return stmt;
        }
    }
    return NULL;
}

void thread_proc dbServer::serverThread(void* arg)
{
    ((dbServer*)arg)->serveClient();
}

void thread_proc dbServer::acceptLocalThread(void* arg)
{
    dbServer* server = (dbServer*)arg;
    server->acceptConnection(server->localAcceptSock);
}

void thread_proc dbServer::acceptGlobalThread(void* arg)
{
    dbServer* server = (dbServer*)arg;
    server->acceptConnection(server->globalAcceptSock);
}

dbServer::dbServer(dbDatabase* db,
                   char const* serverURL, 
                   int optimalNumberOfThreads, 
                   int connectionQueueLen)
{
    char buf[256];
    next = chain;
    chain = this;
    this->db = db;
    this->optimalNumberOfThreads = optimalNumberOfThreads;
    this->URL = new char[strlen(serverURL)+1];
    strcpy(URL, serverURL);
    globalAcceptSock = 
        socket_t::create_global(serverURL, connectionQueueLen);
    if (!globalAcceptSock->is_ok()) { 
        globalAcceptSock->get_error_text(buf, sizeof buf);
        dbTrace("Failed to create global socket: %s\n", buf);
        delete globalAcceptSock;
        globalAcceptSock = NULL;
    }
    localAcceptSock = 
        socket_t::create_local(serverURL, connectionQueueLen);
    if (!localAcceptSock->is_ok()) { 
        localAcceptSock->get_error_text(buf, sizeof buf);
        dbTrace("Failed to create local socket: %s\n", buf);
        delete localAcceptSock;
        localAcceptSock = NULL;
    }
    freeList = activeList = waitList = NULL;
    waitListLength = 0;
}
   
dbServer* dbServer::find(char const* URL)
{
    for (dbServer* server = chain; server != NULL; server = server->next) { 
        if (strcmp(URL, server->URL) == 0) { 
            return server;
        }
    }
    return NULL;
}

void dbServer::cleanup()
{
    dbServer *server, *next;
    for (server = chain; server != NULL; server = next) { 
        next = server->next;
        delete server;
    } 
}

void dbServer::start()
{
    nActiveThreads = nIdleThreads = 0;    
    cancelWait = cancelSession = cancelAccept = false;
    go.open();
    done.open();
    if (globalAcceptSock != NULL) { 
        globalAcceptThread.create(acceptGlobalThread, this);
    }
    if (localAcceptSock != NULL) { 
        localAcceptThread.create(acceptLocalThread, this);
    }
}

void dbServer::stop()
{
    cancelAccept = true;
    if (globalAcceptSock != NULL) {
        globalAcceptSock->cancel_accept();
        globalAcceptThread.join();
    }
    delete globalAcceptSock;
    globalAcceptSock = NULL;

    if (localAcceptSock != NULL) { 
        localAcceptSock->cancel_accept();
        localAcceptThread.join();
    }
    delete localAcceptSock;
    localAcceptSock = NULL;

    dbCriticalSection cs(mutex);
    cancelSession = true;
    while (activeList != NULL) { 
        activeList->sock->shutdown();
        done.wait(mutex);
    }

    cancelWait = true;
    while (nIdleThreads != 0) { 
        go.signal();
        done.wait(mutex);
    }

    while (waitList != NULL) {
        dbSession* next = waitList->next;
        delete waitList->sock;
        waitList->next = freeList;
        freeList = waitList;
        waitList = next;
    }
    waitListLength = 0;
    assert(nActiveThreads == 0);
    done.close();
    go.close();
}

bool dbServer::freeze(dbSession* session, int stmt_id)
{
    dbStatement* stmt = findStatement(session, stmt_id);
    int4 response = cli_ok;
    if (stmt == NULL || stmt->cursor == NULL) { 
        response = cli_bad_descriptor;
    } else { 
        stmt->cursor->freeze();
    }
    pack4(response);
    return session->sock->write(&response, sizeof response);
}
    
bool dbServer::unfreeze(dbSession* session, int stmt_id)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -