📄 server.cpp
字号:
//-< SERVER.CPP >----------------------------------------------------*--------*// FastDB Version 1.0 (c) 1999 GARRET * ? *// (Main Memory Database Management System) * /\| *// * / \ *// Created: 13-Jan-2000 K.A. Knizhnik * / [] \ *// Last update: 13-Jan-2000 K.A. Knizhnik * GARRET *//-------------------------------------------------------------------*--------*// CLI multithreaded server implementation//-------------------------------------------------------------------*--------*#include <ctype.h>#include "fastdb.h"#include "compiler.h"#include "wwwapi.h"#include "subsql.h"#include "symtab.h"#include "hashtab.h"#include "ttree.h"#include "rtree.h"#include "cli.h"#include "cliproto.h"#include "server.h"#include "localcli.h"BEGIN_FASTDB_NAMESPACE#if !THREADS_SUPPORTED#error Server requires multithreading support#endifint dbColumnBinding::unpackArray(char* dst, size_t offs){ int len = this->len; int i; if (cliType >= cli_array_of_oid) { switch (sizeof_type[cliType - cli_array_of_oid]) { case 1: memcpy(dst + offs, ptr + 4, len); break; case 2: for (i = 0; i < len; i++) { unpack2(dst + offs + i*2, ptr + 4 + i*2); } break; case 4: for (i = 0; i < len; i++) { unpack4(dst + offs + i*4, ptr + 4 + i*4); } break; case 8: for (i = 0; i < len; i++) { unpack8(dst + offs + i*8, ptr + 4 + i*8); } break; default: assert(false); } } else { // string memcpy(dst + offs, ptr + 4, len); } return len;}void dbColumnBinding::unpackScalar(char* dst){ if (cliType == cli_autoincrement) { assert(fd->type == dbField::tpInt4);#ifdef AUTOINCREMENT_SUPPORT *(int4*)(dst+fd->dbsOffs) = fd->defTable->autoincrementCount;#else *(int4*)(dst+fd->dbsOffs) = ((dbTable*)fd->defTable->db->getRow(fd->defTable->tableId))->nRows;#endif return; } switch (fd->type) { case dbField::tpBool: case dbField::tpInt1: switch (sizeof_type[cliType]) { case 1: *(dst + fd->dbsOffs) = *ptr; break; case 2: *(dst + fd->dbsOffs) = (char)unpack2(ptr); break; case 4: *(dst + fd->dbsOffs) = (char)unpack4(ptr); break; case 8: *(dst + fd->dbsOffs) = (char)unpack8(ptr); break; default: assert(false); } break; case dbField::tpInt2: switch (sizeof_type[cliType]) { case 1: *(int2*)(dst+fd->dbsOffs) = *ptr; break; case 2: unpack2(dst+fd->dbsOffs, ptr); break; case 4: *(int2*)(dst+fd->dbsOffs) = (int2)unpack4(ptr); break; case 8: *(int2*)(dst+fd->dbsOffs) = (int2)unpack8(ptr); break; default: assert(false); } break; case dbField::tpInt4: switch (sizeof_type[cliType]) { case 1: *(int4*)(dst+fd->dbsOffs) = *ptr; break; case 2: *(int4*)(dst+fd->dbsOffs) = unpack2(ptr); break; case 4: unpack4(dst+fd->dbsOffs, ptr); break; case 8: *(int4*)(dst+fd->dbsOffs) = (int4)unpack8(ptr); break; default: assert(false); } break; case dbField::tpInt8: switch (sizeof_type[cliType]) { case 1: *(db_int8*)(dst+fd->dbsOffs) = *ptr; break; case 2: *(db_int8*)(dst+fd->dbsOffs) = unpack2(ptr); break; case 4: *(db_int8*)(dst+fd->dbsOffs) = unpack4(ptr); break; case 8: unpack8(dst+fd->dbsOffs, ptr); break; default: assert(false); } break; case dbField::tpReal4: switch (cliType) { case cli_real4: unpack4(dst+fd->dbsOffs, ptr); break; case cli_real8: { real8 temp; unpack8((char*)&temp, ptr); *(real4*)(dst + fd->dbsOffs) = (real4)temp; } break; default: assert(false); } break; case dbField::tpReal8: switch (cliType) { case cli_real4: { real4 temp; unpack4((char*)&temp, ptr); *(real8*)(dst + fd->dbsOffs) = temp; } break; case cli_real8: unpack8(dst+fd->dbsOffs, ptr); break; default: assert(false); } break; case dbField::tpReference: *(oid_t*)(dst + fd->dbsOffs) = unpack_oid(ptr); break; case dbField::tpRectangle: unpack_rectangle((cli_rectangle_t*)(dst + fd->dbsOffs), ptr); break; default: assert(false); }}void dbStatement::reset(){ dbColumnBinding *cb, *next; for (cb = columns; cb != NULL; cb = next) { next = cb->next; delete cb; } columns = NULL; delete[] params; params = NULL; delete cursor; cursor = NULL; query.reset(); table = NULL;}int dbQueryScanner::get(){ int i = 0, ch, digits; do { if ((ch = *p++) == '\0') { return tkn_eof; } } while (isspace(ch)); if (ch == '*') { return tkn_all; } else if (isdigit(ch) || ch == '+' || ch == '-') { do { buf[i++] = ch; if (i == dbQueryMaxIdLength) { // Numeric constant too long return tkn_error; } ch = *p++; } while (ch != '\0' && (isdigit(ch) || ch == '+' || ch == '-' || ch == 'e' || ch == 'E' || ch == '.')); p -= 1; buf[i] = '\0'; if (sscanf(buf, INT8_FORMAT "%n", &ival, &digits) != 1) { // Bad integer constant return tkn_error; } if (digits != i) { if (sscanf(buf, "%lf%n", &fval, &digits) != 1 || digits != i) { // Bad float constant return tkn_error; } return tkn_fconst; } return tkn_iconst; } else if (isalpha(ch) || ch == '$' || ch == '_') { do { buf[i++] = ch; if (i == dbQueryMaxIdLength) { // Identifier too long return tkn_error; } ch = *p++; } while (ch != EOF && (isalnum(ch) || ch == '$' || ch == '_')); p -= 1; buf[i] = '\0'; ident = buf; return dbSymbolTable::add(ident, tkn_ident); } else { // Invalid symbol return tkn_error; }}dbServer* dbServer::chain;inline dbStatement* dbServer::findStatement(dbSession* session, int stmt_id){ for (dbStatement* stmt = session->stmts; stmt != NULL; stmt = stmt->next) { if (stmt->id == stmt_id) { return stmt; } } return NULL;}void thread_proc dbServer::serverThread(void* arg){ ((dbServer*)arg)->serveClient();}void thread_proc dbServer::acceptLocalThread(void* arg){ dbServer* server = (dbServer*)arg; server->acceptConnection(server->localAcceptSock);}void thread_proc dbServer::acceptGlobalThread(void* arg){ dbServer* server = (dbServer*)arg; server->acceptConnection(server->globalAcceptSock);}dbServer::dbServer(dbDatabase* db, char const* serverURL, int optimalNumberOfThreads, int connectionQueueLen){ char buf[256]; next = chain; chain = this; this->db = db; this->optimalNumberOfThreads = optimalNumberOfThreads; this->URL = new char[strlen(serverURL)+1]; strcpy(URL, serverURL); globalAcceptSock = socket_t::create_global(serverURL, connectionQueueLen); if (!globalAcceptSock->is_ok()) { globalAcceptSock->get_error_text(buf, sizeof buf); dbTrace("Failed to create global socket: %s\n", buf); delete globalAcceptSock; globalAcceptSock = NULL; } localAcceptSock = socket_t::create_local(serverURL, connectionQueueLen); if (!localAcceptSock->is_ok()) { localAcceptSock->get_error_text(buf, sizeof buf); dbTrace("Failed to create local socket: %s\n", buf); delete localAcceptSock; localAcceptSock = NULL; } freeList = activeList = waitList = NULL; waitListLength = 0;} dbServer* dbServer::find(char const* URL){ for (dbServer* server = chain; server != NULL; server = server->next) { if (strcmp(URL, server->URL) == 0) { return server; } } return NULL;}void dbServer::cleanup(){ dbServer *server, *next; for (server = chain; server != NULL; server = next) { next = server->next; delete server; } }void dbServer::start(){ nActiveThreads = nIdleThreads = 0; cancelWait = cancelSession = cancelAccept = false; go.open(); done.open(); if (globalAcceptSock != NULL) { globalAcceptThread.create(acceptGlobalThread, this); } if (localAcceptSock != NULL) { localAcceptThread.create(acceptLocalThread, this); }}void dbServer::stop(){ cancelAccept = true; if (globalAcceptSock != NULL) { globalAcceptSock->cancel_accept(); globalAcceptThread.join(); } delete globalAcceptSock; globalAcceptSock = NULL; if (localAcceptSock != NULL) { localAcceptSock->cancel_accept(); localAcceptThread.join(); } delete localAcceptSock; localAcceptSock = NULL; dbCriticalSection cs(mutex); cancelSession = true; while (activeList != NULL) { activeList->sock->shutdown(); done.wait(mutex); } cancelWait = true; while (nIdleThreads != 0) { go.signal(); done.wait(mutex); } while (waitList != NULL) { dbSession* next = waitList->next; delete waitList->sock; waitList->next = freeList; freeList = waitList; waitList = next; } waitListLength = 0; assert(nActiveThreads == 0); done.close(); go.close();}bool dbServer::freeze(dbSession* session, int stmt_id){ dbStatement* stmt = findStatement(session, stmt_id); int4 response = cli_ok; if (stmt == NULL || stmt->cursor == NULL) { response = cli_bad_descriptor; } else { stmt->cursor->freeze(); } pack4(response); return session->sock->write(&response, sizeof response);} bool dbServer::unfreeze(dbSession* session, int stmt_id)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -