server.cpp

来自「一个功能强大的内存数据库源代码,c++编写,有详细的注释」· C++ 代码 · 共 1,520 行 · 第 1/3 页

CPP
1,520
字号
//-< SERVER.CPP >----------------------------------------------------*--------*// FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *// (Main Memory Database Management System)                          *   /\|  *//                                                                   *  /  \  *//                          Created:     13-Jan-2000 K.A. Knizhnik   * / [] \ *//                          Last update: 13-Jan-2000 K.A. Knizhnik   * GARRET *//-------------------------------------------------------------------*--------*// CLI multithreaded server implementation//-------------------------------------------------------------------*--------*#include <ctype.h>#include "fastdb.h"#include "compiler.h"#include "subsql.h"#include "symtab.h"#include "hashtab.h"#include "ttree.h"#include "cli.h"#include "cliproto.h"#include "server.h"#include "localcli.h"#if !THREADS_SUPPORTED#error Server requires multithreading support#endifint dbColumnBinding::unpackArray(char* dst, size_t offs){    int len = this->len;    int i;    if (cliType >= cli_array_of_oid) { 	switch (sizeof_type[cliType - cli_array_of_oid]) { 	  case 1:	    memcpy(dst + offs, ptr + 4, len);	    break;	  case 2:	    for (i = 0; i < len; i++) {		unpack2(dst + offs + i*2, ptr + 4 + i*2);	    }	    break;	  case 4:	    for (i = 0; i < len; i++) {		unpack4(dst + offs + i*4, ptr + 4 + i*4);	    }	    break;	  case 8:	    for (i = 0; i < len; i++) {		unpack8(dst + offs + i*8, ptr + 4 + i*8);	    }	    break;	  default:	    assert(false);	}    } else { // string	memcpy(dst + offs, ptr + 4, len);    }    return len;}void dbColumnBinding::unpackScalar(char* dst){    if (cliType == cli_autoincrement) { 	assert(fd->type == dbField::tpInt4);#ifdef AUTOINCREMENT_SUPPORT	*(int4*)(dst+fd->dbsOffs) = fd->defTable->autoincrementCount;#else	*(int4*)(dst+fd->dbsOffs) = ((dbTable*)fd->defTable->db->getRow(fd->defTable->tableId))->nRows;#endif	return;    }     switch (fd->type) {       case dbField::tpBool:      case dbField::tpInt1:	switch (sizeof_type[cliType]) { 	  case 1:	    *(dst + fd->dbsOffs) = *ptr;	    break;	  case 2:	    *(dst + fd->dbsOffs) = (char)unpack2(ptr);	    break;	  case 4:	    *(dst + fd->dbsOffs) = (char)unpack4(ptr);	    break;	  case 8:	    *(dst + fd->dbsOffs) = (char)unpack8(ptr);	    break;	  default:	    assert(false);	}	break;		      case dbField::tpInt2:	switch (sizeof_type[cliType]) { 	  case 1:	    *(int2*)(dst+fd->dbsOffs) = *ptr;	    break;	  case 2:	    unpack2(dst+fd->dbsOffs, ptr);	    break;	  case 4:	    *(int2*)(dst+fd->dbsOffs) = (int2)unpack4(ptr);	    break;	  case 8:	    *(int2*)(dst+fd->dbsOffs) = (int2)unpack8(ptr);	    break;	  default:	    assert(false);	}	break;		      case dbField::tpInt4:	switch (sizeof_type[cliType]) { 	  case 1:	    *(int4*)(dst+fd->dbsOffs) = *ptr;	    break;	  case 2:	    *(int4*)(dst+fd->dbsOffs) = unpack2(ptr);	    break;	  case 4:	    unpack4(dst+fd->dbsOffs, ptr);	    break;	  case 8:	    *(int4*)(dst+fd->dbsOffs) = (int4)unpack8(ptr);	    break;	  default:	    assert(false);	}	break;		      case dbField::tpInt8:	switch (sizeof_type[cliType]) { 	  case 1:	    *(db_int8*)(dst+fd->dbsOffs) = *ptr;	    break;	  case 2:	    *(db_int8*)(dst+fd->dbsOffs) = unpack2(ptr);	    break;	  case 4:	    *(db_int8*)(dst+fd->dbsOffs) = unpack4(ptr);	    break;	  case 8:	    unpack8(dst+fd->dbsOffs, ptr);	    break;	  default:	    assert(false);	}	break;		      case dbField::tpReal4:	switch (cliType) { 	  case cli_real4:	    unpack4(dst+fd->dbsOffs, ptr);	    break;	  case cli_real8:	    {		real8 temp;		unpack8((char*)&temp, ptr);		*(real4*)(dst + fd->dbsOffs) = (real4)temp;	    }	    break;	  default:	    assert(false);	}	break;      case dbField::tpReal8:	switch (cliType) { 	  case cli_real4:	    {		real4 temp;		unpack4((char*)&temp, ptr);		*(real8*)(dst + fd->dbsOffs) = temp;	    }	    break;	  case cli_real8:	    unpack8(dst+fd->dbsOffs, ptr);	    break;	  default:	    assert(false);	}	break;      default:	assert(false);    }}void dbStatement::reset(){    dbColumnBinding *cb, *next;    for (cb = columns; cb != NULL; cb = next) { 	next = cb->next;	delete cb;    }    columns = NULL;    delete[] params;    params = NULL;    delete cursor;    cursor = NULL;    query.reset();    table = NULL;}int dbQueryScanner::get(){    int i = 0, ch, digits;        do { 	if ((ch = *p++) == '\0') { 	    return tkn_eof;	}    } while (isspace(ch));        if (ch == '*') { 	return tkn_all;    } else if (isdigit(ch) || ch == '+' || ch == '-') { 	do { 	    buf[i++] = ch;	    if (i == dbQueryMaxIdLength) { 	        // Numeric constant too long		return tkn_error;	    }	    ch = *p++;	} while (ch != '\0' 		 && (isdigit(ch) || ch == '+' || ch == '-' || ch == 'e' || 		     ch == 'E' || ch == '.'));	p -= 1;	buf[i] = '\0';	if (sscanf(buf, INT8_FORMAT "%n", &ival, &digits) != 1) { 	    // Bad integer constant	    return tkn_error;	}	if (digits != i) { 	    if (sscanf(buf, "%lf%n", &fval, &digits) != 1 || digits != i) {		// Bad float constant		return tkn_error;	    }	    return tkn_fconst;	} 	return tkn_iconst;    } else if (isalpha(ch) || ch == '$' || ch == '_') { 	do { 	    buf[i++] = ch;	    if (i == dbQueryMaxIdLength) { 		// Identifier too long		return tkn_error;	    }	    ch = *p++;	} while (ch != EOF && (isalnum(ch) || ch == '$' || ch == '_'));	p -= 1;	buf[i] = '\0';	ident = buf;	return dbSymbolTable::add(ident, tkn_ident);    } else { 	// Invalid symbol	return tkn_error;    }}dbServer* dbServer::chain;inline dbStatement* dbServer::findStatement(dbSession* session, int stmt_id){    for (dbStatement* stmt = session->stmts; stmt != NULL; stmt = stmt->next)    {	if (stmt->id == stmt_id) { 	    return stmt;	}    }    return NULL;}void thread_proc dbServer::serverThread(void* arg){    ((dbServer*)arg)->serveClient();}void thread_proc dbServer::acceptLocalThread(void* arg){    dbServer* server = (dbServer*)arg;    server->acceptConnection(server->localAcceptSock);}void thread_proc dbServer::acceptGlobalThread(void* arg){    dbServer* server = (dbServer*)arg;    server->acceptConnection(server->globalAcceptSock);}dbServer::dbServer(dbDatabase* db,		   char const* serverURL, 		   int optimalNumberOfThreads, 		   int connectionQueueLen){    char buf[256];    next = chain;    chain = this;    this->db = db;    this->optimalNumberOfThreads = optimalNumberOfThreads;    this->URL = new char[strlen(serverURL)+1];    strcpy(URL, serverURL);    globalAcceptSock = 	socket_t::create_global(serverURL, connectionQueueLen);    if (!globalAcceptSock->is_ok()) { 	globalAcceptSock->get_error_text(buf, sizeof buf);	dbTrace("Failed to create global socket: %s\n", buf);	delete globalAcceptSock;	globalAcceptSock = NULL;    }    localAcceptSock = 	socket_t::create_local(serverURL, connectionQueueLen);    if (!localAcceptSock->is_ok()) { 	localAcceptSock->get_error_text(buf, sizeof buf);	dbTrace("Failed to create local socket: %s\n", buf);	delete localAcceptSock;	localAcceptSock = NULL;    }    freeList = activeList = waitList = NULL;}   dbServer* dbServer::find(char const* URL){    for (dbServer* server = chain; server != NULL; server = server->next) { 	if (strcmp(URL, server->URL) == 0) { 	    return server;	}    }    return NULL;}void dbServer::cleanup(){    dbServer *server, *next;    for (server = chain; server != NULL; server = next) { 	next = server->next;	delete server;    } }void dbServer::start(){    nActiveThreads = nIdleThreads = 0;        cancelWait = cancelSession = cancelAccept = false;    go.open();    done.open();    if (globalAcceptSock != NULL) { 	globalAcceptThread.create(acceptGlobalThread, this);    }    if (localAcceptSock != NULL) { 	localAcceptThread.create(acceptLocalThread, this);    }}void dbServer::stop(){    cancelAccept = true;    if (globalAcceptSock != NULL) {	globalAcceptSock->cancel_accept();	globalAcceptThread.join();    }    delete globalAcceptSock;    globalAcceptSock = NULL;    if (localAcceptSock != NULL) { 	localAcceptSock->cancel_accept();	localAcceptThread.join();    }    delete localAcceptSock;    localAcceptSock = NULL;    dbCriticalSection cs(mutex);    cancelSession = true;    while (activeList != NULL) { 	activeList->sock->shutdown();	done.wait(mutex);    }    cancelWait = true;    while (nIdleThreads != 0) { 	go.signal();	done.wait(mutex);    }    while (waitList != NULL) {	dbSession* next = waitList->next;	delete waitList->sock;	waitList->next = freeList;	freeList = waitList;	waitList = next;    }    assert(nActiveThreads == 0);    done.close();    go.close();}bool dbServer::freeze(dbSession* session, int stmt_id){    dbStatement* stmt = findStatement(session, stmt_id);    int4 response = cli_ok;    if (stmt == NULL || stmt->cursor == NULL) { 	response = cli_bad_descriptor;    } else {         stmt->cursor->freeze();    }    pack4(response);    return session->sock->write(&response, sizeof response);}    bool dbServer::unfreeze(dbSession* session, int stmt_id){    dbStatement* stmt = findStatement(session, stmt_id);    int4 response = cli_ok;    if (stmt == NULL || stmt->cursor == NULL) { 	response = cli_bad_descriptor;    } else {         stmt->cursor->unfreeze();    }    pack4(response);    return session->sock->write(&response, sizeof response);}    bool dbServer::get_first(dbSession* session, int stmt_id){    dbStatement* stmt = findStatement(session, stmt_id);    int4 response;    if (stmt == NULL || stmt->cursor == NULL) { 	response = cli_bad_descriptor;    } else if (!stmt->cursor->gotoFirst()) { 	response = cli_not_found;    } else { 	return fetch(session, stmt);    }    pack4(response);    return session->sock->write(&response, sizeof response);}bool dbServer::get_last(dbSession* session, int stmt_id){    dbStatement* stmt = findStatement(session, stmt_id);    int4 response;    if (stmt == NULL || stmt->cursor == NULL) { 	response = cli_bad_descriptor;    } else if (!stmt->cursor->gotoLast()) { 	response = cli_not_found;    } else { 	return fetch(session, stmt);    }    pack4(response);    return session->sock->write(&response, sizeof response);}bool dbServer::get_next(dbSession* session, int stmt_id){    dbStatement* stmt = findStatement(session, stmt_id);    int4 response;    if (stmt == NULL || stmt->cursor == NULL) { 	response = cli_bad_descriptor;    }     else if (!((stmt->firstFetch && stmt->cursor->gotoFirst()) ||	       (!stmt->firstFetch && stmt->cursor->gotoNext())))     { 	response = cli_not_found;    } else { 	return fetch(session, stmt);    }    pack4(response);    return session->sock->write(&response, sizeof response);}bool dbServer::get_prev(dbSession* session, int stmt_id){    dbStatement* stmt = findStatement(session, stmt_id);    int4 response;    if (stmt == NULL || stmt->cursor == NULL) { 	response = cli_bad_descriptor;    }     else if (!((stmt->firstFetch && stmt->cursor->gotoLast()) ||	       (!stmt->firstFetch && stmt->cursor->gotoPrev())))     { 	response = cli_not_found;    } else { 	return fetch(session, stmt);    }    pack4(response);    return session->sock->write(&response, sizeof response);}bool dbServer::skip(dbSession* session, int stmt_id, char* buf){    dbStatement* stmt = findStatement(session, stmt_id);    int4 response;    if (stmt == NULL || stmt->cursor == NULL) {	response = cli_bad_descriptor;    } else { 	int n = unpack4(buf);	if ((n > 0 && !((stmt->firstFetch && stmt->cursor->gotoFirst() && stmt->cursor->skip(n-1)			 || (!stmt->firstFetch && stmt->cursor->skip(n)))))	    || (n < 0 && !((stmt->firstFetch && stmt->cursor->gotoLast() && stmt->cursor->skip(n+1)			    || (!stmt->firstFetch && stmt->cursor->skip(n))))))	{	    response = cli_not_found;	} else {	    return fetch(session, stmt);	}    }    pack4(response);    return session->sock->write(&response, sizeof response);}bool dbServer::fetch(dbSession* session, dbStatement* stmt){    int4 response;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?