📄 server.cpp
字号:
//-< SERVER.CPP >----------------------------------------------------*--------*
// FastDB Version 1.0 (c) 1999 GARRET * ? *
// (Main Memory Database Management System) * /\| *
// * / \ *
// Created: 13-Jan-2000 K.A. Knizhnik * / [] \ *
// Last update: 13-Jan-2000 K.A. Knizhnik * GARRET *
//-------------------------------------------------------------------*--------*
// CLI multithreaded server implementation
//-------------------------------------------------------------------*--------*
#include "fastdb.h"
#include "compiler.h"
#include "wwwapi.h"
#include "subsql.h"
#include "symtab.h"
#include "hashtab.h"
#include "ttree.h"
#include "rtree.h"
#include "cli.h"
#include "cliproto.h"
#include "server.h"
#include "localcli.h"
#include <ctype.h>
BEGIN_FASTDB_NAMESPACE
#if !THREADS_SUPPORTED
#error Server requires multithreading support
#endif
int dbColumnBinding::unpackArray(char* dst, size_t offs)
{
int len = this->len;
int i;
if (cliType >= cli_array_of_oid) {
switch (sizeof_type[cliType - cli_array_of_oid]) {
case 1:
memcpy(dst + offs, ptr + 4, len);
break;
case 2:
for (i = 0; i < len; i++) {
unpack2(dst + offs + i*2, ptr + 4 + i*2);
}
break;
case 4:
for (i = 0; i < len; i++) {
unpack4(dst + offs + i*4, ptr + 4 + i*4);
}
break;
case 8:
for (i = 0; i < len; i++) {
unpack8(dst + offs + i*8, ptr + 4 + i*8);
}
break;
default:
assert(false);
}
} else { // string
memcpy(dst + offs, ptr + 4, len);
}
return len;
}
void dbColumnBinding::unpackScalar(char* dst, bool insert)
{
if (cliType == cli_autoincrement) {
assert(fd->type == dbField::tpInt4);
if (insert) {
#ifdef AUTOINCREMENT_SUPPORT
*(int4*)(dst+fd->dbsOffs) = fd->defTable->autoincrementCount;
#else
*(int4*)(dst+fd->dbsOffs) = ((dbTable*)fd->defTable->db->getRow(fd->defTable->tableId))->nRows;
#endif
}
return;
}
switch (fd->type) {
case dbField::tpBool:
case dbField::tpInt1:
switch (sizeof_type[cliType]) {
case 1:
*(dst + fd->dbsOffs) = *ptr;
break;
case 2:
*(dst + fd->dbsOffs) = (char)unpack2(ptr);
break;
case 4:
*(dst + fd->dbsOffs) = (char)unpack4(ptr);
break;
case 8:
*(dst + fd->dbsOffs) = (char)unpack8(ptr);
break;
default:
assert(false);
}
break;
case dbField::tpInt2:
switch (sizeof_type[cliType]) {
case 1:
*(int2*)(dst+fd->dbsOffs) = *ptr;
break;
case 2:
unpack2(dst+fd->dbsOffs, ptr);
break;
case 4:
*(int2*)(dst+fd->dbsOffs) = (int2)unpack4(ptr);
break;
case 8:
*(int2*)(dst+fd->dbsOffs) = (int2)unpack8(ptr);
break;
default:
assert(false);
}
break;
case dbField::tpInt4:
switch (sizeof_type[cliType]) {
case 1:
*(int4*)(dst+fd->dbsOffs) = *ptr;
break;
case 2:
*(int4*)(dst+fd->dbsOffs) = unpack2(ptr);
break;
case 4:
unpack4(dst+fd->dbsOffs, ptr);
break;
case 8:
*(int4*)(dst+fd->dbsOffs) = (int4)unpack8(ptr);
break;
default:
assert(false);
}
break;
case dbField::tpInt8:
switch (sizeof_type[cliType]) {
case 1:
*(db_int8*)(dst+fd->dbsOffs) = *ptr;
break;
case 2:
*(db_int8*)(dst+fd->dbsOffs) = unpack2(ptr);
break;
case 4:
*(db_int8*)(dst+fd->dbsOffs) = unpack4(ptr);
break;
case 8:
unpack8(dst+fd->dbsOffs, ptr);
break;
default:
assert(false);
}
break;
case dbField::tpReal4:
switch (cliType) {
case cli_real4:
unpack4(dst+fd->dbsOffs, ptr);
break;
case cli_real8:
{
real8 temp;
unpack8((char*)&temp, ptr);
*(real4*)(dst + fd->dbsOffs) = (real4)temp;
}
break;
default:
assert(false);
}
break;
case dbField::tpReal8:
switch (cliType) {
case cli_real4:
{
real4 temp;
unpack4((char*)&temp, ptr);
*(real8*)(dst + fd->dbsOffs) = temp;
}
break;
case cli_real8:
unpack8(dst+fd->dbsOffs, ptr);
break;
default:
assert(false);
}
break;
case dbField::tpReference:
*(oid_t*)(dst + fd->dbsOffs) = unpack_oid(ptr);
break;
case dbField::tpRectangle:
unpack_rectangle((cli_rectangle_t*)(dst + fd->dbsOffs), ptr);
break;
default:
assert(false);
}
}
void dbStatement::reset()
{
dbColumnBinding *cb, *next;
for (cb = columns; cb != NULL; cb = next) {
next = cb->next;
delete cb;
}
columns = NULL;
delete[] params;
params = NULL;
delete cursor;
cursor = NULL;
query.reset();
table = NULL;
}
int dbQueryScanner::get()
{
int i = 0, ch, digits;
do {
if ((ch = *p++) == '\0') {
return tkn_eof;
}
} while (isspace(ch));
if (ch == '*') {
return tkn_all;
} else if (isdigit(ch) || ch == '+' || ch == '-') {
do {
buf[i++] = ch;
if (i == dbQueryMaxIdLength) {
// Numeric constant too long
return tkn_error;
}
ch = *p++;
} while (ch != '\0'
&& (isdigit(ch) || ch == '+' || ch == '-' || ch == 'e' ||
ch == 'E' || ch == '.'));
p -= 1;
buf[i] = '\0';
if (sscanf(buf, INT8_FORMAT "%n", &ival, &digits) != 1) {
// Bad integer constant
return tkn_error;
}
if (digits != i) {
if (sscanf(buf, "%lf%n", &fval, &digits) != 1 || digits != i) {
// Bad float constant
return tkn_error;
}
return tkn_fconst;
}
return tkn_iconst;
} else if (isalpha(ch) || ch == '$' || ch == '_') {
do {
buf[i++] = ch;
if (i == dbQueryMaxIdLength) {
// Identifier too long
return tkn_error;
}
ch = *p++;
} while (ch != EOF && (isalnum(ch) || ch == '$' || ch == '_'));
p -= 1;
buf[i] = '\0';
ident = buf;
return dbSymbolTable::add(ident, tkn_ident);
} else {
// Invalid symbol
return tkn_error;
}
}
dbServer* dbServer::chain;
inline dbStatement* dbServer::findStatement(dbSession* session, int stmt_id)
{
for (dbStatement* stmt = session->stmts; stmt != NULL; stmt = stmt->next)
{
if (stmt->id == stmt_id) {
return stmt;
}
}
return NULL;
}
void thread_proc dbServer::serverThread(void* arg)
{
((dbServer*)arg)->serveClient();
}
void thread_proc dbServer::acceptLocalThread(void* arg)
{
dbServer* server = (dbServer*)arg;
server->acceptConnection(server->localAcceptSock);
}
void thread_proc dbServer::acceptGlobalThread(void* arg)
{
dbServer* server = (dbServer*)arg;
server->acceptConnection(server->globalAcceptSock);
}
dbServer::dbServer(dbDatabase* db,
char const* serverURL,
int optimalNumberOfThreads,
int connectionQueueLen)
{
char buf[256];
next = chain;
chain = this;
this->db = db;
this->optimalNumberOfThreads = optimalNumberOfThreads;
this->URL = new char[strlen(serverURL)+1];
strcpy(URL, serverURL);
globalAcceptSock =
socket_t::create_global(serverURL, connectionQueueLen);
if (!globalAcceptSock->is_ok()) {
globalAcceptSock->get_error_text(buf, sizeof buf);
dbTrace("Failed to create global socket: %s\n", buf);
delete globalAcceptSock;
globalAcceptSock = NULL;
}
localAcceptSock =
socket_t::create_local(serverURL, connectionQueueLen);
if (!localAcceptSock->is_ok()) {
localAcceptSock->get_error_text(buf, sizeof buf);
dbTrace("Failed to create local socket: %s\n", buf);
delete localAcceptSock;
localAcceptSock = NULL;
}
freeList = activeList = waitList = NULL;
waitListLength = 0;
}
dbServer* dbServer::find(char const* URL)
{
for (dbServer* server = chain; server != NULL; server = server->next) {
if (strcmp(URL, server->URL) == 0) {
return server;
}
}
return NULL;
}
void dbServer::cleanup()
{
dbServer *server, *next;
for (server = chain; server != NULL; server = next) {
next = server->next;
delete server;
}
}
void dbServer::start()
{
nActiveThreads = nIdleThreads = 0;
cancelWait = cancelSession = cancelAccept = false;
go.open();
done.open();
if (globalAcceptSock != NULL) {
globalAcceptThread.create(acceptGlobalThread, this);
}
if (localAcceptSock != NULL) {
localAcceptThread.create(acceptLocalThread, this);
}
}
void dbServer::stop()
{
cancelAccept = true;
if (globalAcceptSock != NULL) {
globalAcceptSock->cancel_accept();
globalAcceptThread.join();
}
delete globalAcceptSock;
globalAcceptSock = NULL;
if (localAcceptSock != NULL) {
localAcceptSock->cancel_accept();
localAcceptThread.join();
}
delete localAcceptSock;
localAcceptSock = NULL;
dbCriticalSection cs(mutex);
cancelSession = true;
while (activeList != NULL) {
activeList->sock->shutdown();
done.wait(mutex);
}
cancelWait = true;
while (nIdleThreads != 0) {
go.signal();
done.wait(mutex);
}
while (waitList != NULL) {
dbSession* next = waitList->next;
delete waitList->sock;
waitList->next = freeList;
freeList = waitList;
waitList = next;
}
waitListLength = 0;
assert(nActiveThreads == 0);
done.close();
go.close();
}
bool dbServer::freeze(dbSession* session, int stmt_id)
{
dbStatement* stmt = findStatement(session, stmt_id);
int4 response = cli_ok;
if (stmt == NULL || stmt->cursor == NULL) {
response = cli_bad_descriptor;
} else {
stmt->cursor->freeze();
}
pack4(response);
return session->sock->write(&response, sizeof response);
}
bool dbServer::unfreeze(dbSession* session, int stmt_id)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -