⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 server.cpp

📁 实现内存数据库的源代码
💻 CPP
📖 第 1 页 / 共 3 页
字号:
//-< SERVER.CPP >----------------------------------------------------*--------*
// FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *
// (Main Memory Database Management System)                          *   /\|  *
//                                                                   *  /  \  *
//                          Created:     13-Jan-2000 K.A. Knizhnik   * / [] \ *
//                          Last update: 13-Jan-2000 K.A. Knizhnik   * GARRET *
//-------------------------------------------------------------------*--------*
// CLI multithreaded server implementation
//-------------------------------------------------------------------*--------*

#include <ctype.h>
#include "fastdb.h"
#include "compiler.h"
#include "subsql.h"
#include "symtab.h"
#include "hashtab.h"
#include "ttree.h"
#include "cli.h"
#include "cliproto.h"
#include "server.h"

#if !THREADS_SUPPORTED
#error Server requires multithreading support
#endif

bool dbQueryScanner::initialized; 

int dbColumnBinding::unpackArray(char* dst, size_t offs)
{
    int len = this->len;
    int i;
    if (cliType >= cli_array_of_oid) { 
	switch (sizeof_type[cliType - cli_array_of_oid]) { 
	  case 1:
	    memcpy(dst + offs, ptr + 4, len);
	    break;
	  case 2:
	    for (i = 0; i < len; i++) {
		unpack2(dst + offs + i*2, ptr + 4 + i*2);
	    }
	    break;
	  case 4:
	    for (i = 0; i < len; i++) {
		unpack4(dst + offs + i*4, ptr + 4 + i*4);
	    }
	    break;
	  case 8:
	    for (i = 0; i < len; i++) {
		unpack8(dst + offs + i*8, ptr + 4 + i*8);
	    }
	    break;
	  default:
	    assert(false);
	}
    } else { // string
	memcpy(dst + offs, ptr + 4, len);
    }
    return len;
}

void dbColumnBinding::unpackScalar(char* dst)
{
    if (cliType == cli_autoincrement) { 
	assert(fd->type == dbField::tpInt4);
#ifdef AUTOINCREMENT_SUPPORT
	*(int4*)(dst+fd->dbsOffs) = fd->defTable->autoincrementCount;
#else
	*(int4*)(dst+fd->dbsOffs) = ((dbTable*)fd->defTable->db->getRow(fd->defTable->tableId))->nRows;
#endif
	return;
    } 
    switch (fd->type) { 
      case dbField::tpBool:
      case dbField::tpInt1:
	switch (sizeof_type[cliType]) { 
	  case 1:
	    *(dst + fd->dbsOffs) = *ptr;
	    break;
	  case 2:
	    *(dst + fd->dbsOffs) = (char)unpack2(ptr);
	    break;
	  case 4:
	    *(dst + fd->dbsOffs) = (char)unpack4(ptr);
	    break;
	  case 8:
	    *(dst + fd->dbsOffs) = (char)unpack8(ptr);
	    break;
	  default:
	    assert(false);
	}
	break;		
      case dbField::tpInt2:
	switch (sizeof_type[cliType]) { 
	  case 1:
	    *(int2*)(dst+fd->dbsOffs) = *ptr;
	    break;
	  case 2:
	    unpack2(dst+fd->dbsOffs, ptr);
	    break;
	  case 4:
	    *(int2*)(dst+fd->dbsOffs) = (int2)unpack4(ptr);
	    break;
	  case 8:
	    *(int2*)(dst+fd->dbsOffs) = (int2)unpack8(ptr);
	    break;
	  default:
	    assert(false);
	}
	break;		
      case dbField::tpInt4:
	switch (sizeof_type[cliType]) { 
	  case 1:
	    *(int4*)(dst+fd->dbsOffs) = *ptr;
	    break;
	  case 2:
	    *(int4*)(dst+fd->dbsOffs) = unpack2(ptr);
	    break;
	  case 4:
	    unpack4(dst+fd->dbsOffs, ptr);
	    break;
	  case 8:
	    *(int4*)(dst+fd->dbsOffs) = (int4)unpack8(ptr);
	    break;
	  default:
	    assert(false);
	}
	break;		
      case dbField::tpInt8:
	switch (sizeof_type[cliType]) { 
	  case 1:
	    *(int8*)(dst+fd->dbsOffs) = *ptr;
	    break;
	  case 2:
	    *(int8*)(dst+fd->dbsOffs) = unpack2(ptr);
	    break;
	  case 4:
	    *(int8*)(dst+fd->dbsOffs) = unpack4(ptr);
	    break;
	  case 8:
	    unpack8(dst+fd->dbsOffs, ptr);
	    break;
	  default:
	    assert(false);
	}
	break;		
      case dbField::tpReal4:
	switch (cliType) { 
	  case cli_real4:
	    unpack4(dst+fd->dbsOffs, ptr);
	    break;
	  case cli_real8:
	    {
		real8 temp;
		unpack8((char*)&temp, ptr);
		*(real4*)(dst + fd->dbsOffs) = (real4)temp;
	    }
	    break;
	  default:
	    assert(false);
	}
	break;
      case dbField::tpReal8:
	switch (cliType) { 
	  case cli_real4:
	    {
		real4 temp;
		unpack4((char*)&temp, ptr);
		*(real8*)(dst + fd->dbsOffs) = temp;
	    }
	    break;
	  case cli_real8:
	    unpack8(dst+fd->dbsOffs, ptr);
	    break;
	  default:
	    assert(false);
	}
	break;
      default:
	assert(false);
    }
}

void dbStatement::reset()
{
    dbColumnBinding *cb, *next;
    for (cb = columns; cb != NULL; cb = next) { 
	next = cb->next;
	delete cb;
    }
    columns = NULL;
    delete[] params;
    params = NULL;
    delete cursor;
    cursor = NULL;
    query.reset();
    table = NULL;
}

dbQueryScanner::dbQueryScanner() { 
    buf = new char[buf_size = 1024];
    if (!initialized) { 
	initialized = true;
	static struct { 
	    char* name;
	    int   tag;
	} keywords[] = { 
	    {"insert",  tkn_insert},
	    {"into",    tkn_into},
	    {"select",  tkn_select},
	    {"table",   tkn_table}
	};
	for (int i = items(keywords); --i >= 0;) { 
	    dbSymbolTable::add(keywords[i].name, keywords[i].tag, FASTDB_CLONE_ANY_IDENTIFIER);    
	}
    } 
}

int dbQueryScanner::get()
{
    int i = 0, ch, digits;
    
    do { 
	if ((ch = *p++) == '\0') { 
	    return tkn_eof;
	}
    } while (isspace(ch));
    
    if (ch == '*') { 
	return tkn_all;
    } else if (isdigit(ch) || ch == '+' || ch == '-') { 
	do { 
	    buf[i++] = ch;
	    if (i == buf_size) { 
	        // Numeric constant too long
		return tkn_error;
	    }
	    ch = *p++;
	} while (ch != '\0' 
		 && (isdigit(ch) || ch == '+' || ch == '-' || ch == 'e' || 
		     ch == 'E' || ch == '.'));
	p -= 1;
	buf[i] = '\0';
	if (sscanf(buf, INT8_FORMAT "%n", &ival, &digits) != 1) { 
	    // Bad integer constant
	    return tkn_error;
	}
	if (digits != i) { 
	    if (sscanf(buf, "%lf%n", &fval, &digits) != 1 || digits != i) {
		// Bad float constant
		return tkn_error;
	    }
	    return tkn_fconst;
	} 
	return tkn_iconst;
    } else if (isalpha(ch) || ch == '$' || ch == '_') { 
	do { 
	    buf[i++] = ch;
	    if (i == buf_size) { 
		// Identifier too long
		return tkn_error;
	    }
	    ch = *p++;
	} while (ch != EOF && (isalnum(ch) || ch == '$' || ch == '_'));
	p -= 1;
	buf[i] = '\0';
	ident = buf;
	return dbSymbolTable::add(ident, tkn_ident);
    } else { 
	// Invalid symbol
	return tkn_error;
    }
}

dbServer* dbServer::chain;

inline dbStatement* dbServer::findStatement(dbSession* session, int stmt_id)
{
    for (dbStatement* stmt = session->stmts; stmt != NULL; stmt = stmt->next)
    {
	if (stmt->id == stmt_id) { 
	    return stmt;
	}
    }
    return NULL;
}

void thread_proc dbServer::serverThread(void* arg)
{
    ((dbServer*)arg)->serveClient();
}

void thread_proc dbServer::acceptLocalThread(void* arg)
{
    dbServer* server = (dbServer*)arg;
    server->acceptConnection(server->localAcceptSock);
}

void thread_proc dbServer::acceptGlobalThread(void* arg)
{
    dbServer* server = (dbServer*)arg;
    server->acceptConnection(server->globalAcceptSock);
}

dbServer::dbServer(dbDatabase* db,
		   char const* serverURL, 
		   int optimalNumberOfThreads, 
		   int connectionQueueLen)
{
    char buf[256];
    next = chain;
    chain = this;
    this->db = db;
    this->optimalNumberOfThreads = optimalNumberOfThreads;
    this->URL = new char[strlen(serverURL)+1];
    strcpy(URL, serverURL);
    globalAcceptSock = 
	socket_t::create_global(serverURL, connectionQueueLen);
    if (!globalAcceptSock->is_ok()) { 
	globalAcceptSock->get_error_text(buf, sizeof buf);
	dbTrace("Failed to create global socket: %s\n", buf);
	delete globalAcceptSock;
	globalAcceptSock = NULL;
    }
    localAcceptSock = 
	socket_t::create_local(serverURL, connectionQueueLen);
    if (!localAcceptSock->is_ok()) { 
	localAcceptSock->get_error_text(buf, sizeof buf);
	dbTrace("Failed to create local socket: %s\n", buf);
	delete localAcceptSock;
	localAcceptSock = NULL;
    }
    freeList = activeList = waitList = NULL;
}
   
dbServer* dbServer::find(char const* URL)
{
    for (dbServer* server = chain; server != NULL; server = server->next) { 
	if (strcmp(URL, server->URL) == 0) { 
	    return server;
	}
    }
    return NULL;
}

void dbServer::cleanup()
{
    dbServer *server, *next;
    for (server = chain; server != NULL; server = next) { 
	next = server->next;
	delete server;
    } 
}

void dbServer::start()
{
    nActiveThreads = nIdleThreads = 0;    
    cancelWait = cancelSession = cancelAccept = false;
    go.open();
    done.open();
    if (globalAcceptSock != NULL) { 
	globalAcceptThread.create(acceptGlobalThread, this);
    }
    if (localAcceptSock != NULL) { 
	localAcceptThread.create(acceptLocalThread, this);
    }
}

void dbServer::stop()
{
    cancelAccept = true;
    if (globalAcceptSock != NULL) {
	globalAcceptSock->cancel_accept();
	globalAcceptThread.join();
    }
    delete globalAcceptSock;
    globalAcceptSock = NULL;

    if (localAcceptSock != NULL) { 
	localAcceptSock->cancel_accept();
	localAcceptThread.join();
    }
    delete localAcceptSock;
    localAcceptSock = NULL;

    dbCriticalSection cs(mutex);
    cancelSession = true;
    while (activeList != NULL) { 
	activeList->sock->shutdown();
	done.wait(mutex);
    }

    cancelWait = true;
    while (nIdleThreads != 0) { 
	go.signal();
	done.wait(mutex);
    }

    while (waitList != NULL) {
	dbSession* next = waitList->next;
	delete waitList->sock;
	waitList->next = freeList;
	freeList = waitList;
	waitList = next;
    }
    assert(nActiveThreads == 0);
    done.close();
    go.close();
}

bool dbServer::get_first(dbSession* session, int stmt_id)
{
    dbStatement* stmt = findStatement(session, stmt_id);
    int4 response;
    if (stmt == NULL || stmt->cursor == NULL) { 
	response = cli_bad_descriptor;
    } else if (!stmt->cursor->gotoFirst()) { 
	response = cli_not_found;
    } else { 
	return fetch(session, stmt);
    }
    pack4(response);
    return session->sock->write(&response, sizeof response);
}

bool dbServer::get_last(dbSession* session, int stmt_id)
{
    dbStatement* stmt = findStatement(session, stmt_id);
    int4 response;
    if (stmt == NULL || stmt->cursor == NULL) { 
	response = cli_bad_descriptor;
    } else if (!stmt->cursor->gotoLast()) { 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -