server.cpp

来自「一个功能强大的内存数据库源代码,c++编写,有详细的注释」· C++ 代码 · 共 1,520 行 · 第 1/3 页

CPP
1,520
字号
{    dbTableDescriptor* desc = db->findTableByName(table);    if (desc == NULL) {	char response[8];	pack4(response, 0);	pack4(response+4, -1);	return session->sock->write(response, sizeof response);    } else { 	int i, length = 0;	dbFieldDescriptor* fd = desc->columns; 	for (i = desc->nColumns; --i >= 0;) { 	    length += strlen(fd->name)+2;	    fd = fd->next;	}	dbSmallBuffer response(length+8);	char* p = (char*)response;	pack4(p, length);	pack4(p+4, desc->nColumns);	p += 8;	for (i = desc->nColumns, fd = desc->columns; --i >= 0;) { 	    *p = map_type(fd);	    p += 1;	    strcpy(p, fd->name);	    p += strlen(fd->name)+1;	    fd = fd->next;	}	return session->sock->write(response, length+8);    }}bool dbServer::show_tables(dbSession* session){    dbTableDescriptor* desc=db->tables;    if (desc == NULL) {        char response[8];        pack4(response, 0);        pack4(response+4, -1);        return session->sock->write(response, sizeof response);    } else {	int length = 0, n = 0;        for (desc=db->tables; desc != NULL; desc=desc->nextDbTable) {            if (strcmp(desc->name, "Metatable")) {                length += strlen(desc->name)+1;                n++;            }        }        dbSmallBuffer response(length+8);        char* p = (char*)response;        pack4(p, length);        pack4(p+4, n);        p += 8;        for (desc=db->tables; desc != NULL; desc=desc->nextDbTable) {            if (strcmp(desc->name, "Metatable")) {                strcpy(p, desc->name);                p += strlen(desc->name)+1;            }        }        return session->sock->write(response, length+8);    }}bool dbServer::create_table(dbSession* session, char* data){    db->beginTransaction(dbDatabase::dbExclusiveLock);    db->modified = true;    char* tableName = data;    data += strlen(data) + 1;    int nColumns = *data++ & 0xFF;    cli_field_descriptor* columns = new cli_field_descriptor[nColumns];        for (int i = 0; i < nColumns; i++) {        columns[i].type = (cli_var_type)*data++;        columns[i].flags = *data++ & 0xFF;        columns[i].name = data;        data += strlen(data) + 1;        if (*data != 0) {             columns[i].refTableName = data;            data += strlen(data) + 1;        } else {             columns[i].refTableName = NULL;            data += 1;        }        if (*data != 0) {             columns[i].inverseRefFieldName = data;            data += strlen(data) + 1;        } else {             columns[i].inverseRefFieldName = NULL;            data += 1;        }    }    if (session->existed_tables == NULL) {         session->existed_tables = db->tables;    }    int4 response = dbCLI::create_table(db, tableName, nColumns, columns);    pack4(response);    return session->sock->write(&response, sizeof response);}bool dbServer::drop_table(dbSession* session, char* tableName){    db->beginTransaction(dbDatabase::dbExclusiveLock);    dbTableDescriptor* desc = db->findTableByName(tableName);    int4 response = cli_ok;    if (desc != NULL) {        db->dropTable(desc);        if (desc == session->existed_tables) {             session->existed_tables = desc->nextDbTable;        }        db->unlinkTable(desc);        desc->nextDbTable = session->dropped_tables;        session->dropped_tables = desc;    } else {         response = cli_table_not_found;    }    pack4(response);    return session->sock->write(&response, sizeof response);}bool dbServer::alter_index(dbSession* session, char* data){    char* tableName = data;    data += strlen(data) + 1;    char* fieldName = data;    data += strlen(data) + 1;    int newFlags = *data++ & 0xFF;    int4 response = dbCLI::alter_index(db, tableName, fieldName, newFlags);    pack4(response);    return session->sock->write(&response, sizeof response);}bool dbServer::select(dbSession* session, int stmt_id, char* msg, bool prepare){    int4 response;    int i, n_params, tkn, n_columns;    dbStatement* stmt = findStatement(session, stmt_id);    dbCursorType cursorType;    dbTableDescriptor* desc;    if (prepare) { 	if (stmt == NULL) { 	    stmt = new dbStatement(stmt_id);	    stmt->next = session->stmts;	    session->stmts = stmt;	} else { 	    stmt->reset();	}	stmt->n_params = *msg++;	stmt->n_columns = n_columns = *msg++;	stmt->params = new dbParameterBinding[stmt->n_params];	stmt->firstFetch = true;	int len = unpack2(msg);	msg += 2;	session->scanner.reset(msg);	char *p, *end = msg + len;	if (session->scanner.get() != tkn_select) { 	    response = cli_bad_statement;	    goto return_response;	}	if ((tkn = session->scanner.get()) == tkn_all) { 	    tkn = session->scanner.get();	}	if (tkn == tkn_from && session->scanner.get() == tkn_ident) { 	    if ((desc = db->findTable(session->scanner.ident)) != NULL) { 		msg = checkColumns(stmt, n_columns, desc, end, response);		if (response != cli_ok) {		    goto return_response;		}		stmt->cursor = new dbAnyCursor(*desc, dbCursorViewOnly, NULL);		stmt->cursor->setPrefetchMode(false);	    } else { 		response = cli_table_not_found;		goto return_response;	    }			} else { 	    response = cli_bad_statement;	    goto return_response;	}	p = session->scanner.p;	for (i = 0; p < end; i++) { 	    stmt->query.append(dbQueryElement::qExpression, p);	    p += strlen(p) + 1;	    if (p < end) { 		int cliType = *p++;		static const dbQueryElement::ElementType type_map[] = { 		    dbQueryElement::qVarReference, // cli_oid		    dbQueryElement::qVarBool,      // cli_bool		    dbQueryElement::qVarInt1,      // cli_int1 		    dbQueryElement::qVarInt2,      // cli_int2		    dbQueryElement::qVarInt4,      // cli_int4		    dbQueryElement::qVarInt8,      // cli_int8		    dbQueryElement::qVarReal4,     // cli_real4		    dbQueryElement::qVarReal8,     // cli_real8		    dbQueryElement::qVarStringPtr, // cli_asciiz		    dbQueryElement::qVarStringPtr, // cli_pasciiz		};		stmt->params[i].type = cliType;		stmt->query.append(type_map[cliType], &stmt->params[i].u);	    }	}    } else { 	if (stmt == NULL) { 	    response = cli_bad_descriptor;	    goto return_response;	}    }    cursorType = *msg++ ? dbCursorForUpdate : dbCursorViewOnly;    for (i = 0, n_params = stmt->n_params; i < n_params; i++) { 	switch (stmt->params[i].type) { 	  case cli_oid:	    stmt->params[i].u.oid = unpack_oid(msg);	    msg += sizeof(cli_oid_t);	    break;	  case cli_int1:	    stmt->params[i].u.i1 = *msg++;	    break;	  case cli_int2:	    msg = unpack2((char*)&stmt->params[i].u.i2, msg);	    break;	  case cli_int4:	    msg = unpack4((char*)&stmt->params[i].u.i4, msg);	    break;	  case cli_int8:	    msg = unpack8((char*)&stmt->params[i].u.i8, msg);	    break;	  case cli_real4:	    msg = unpack4((char*)&stmt->params[i].u.r4, msg);	    break;	  case cli_real8:	    msg = unpack8((char*)&stmt->params[i].u.r8, msg);	    break;	  case cli_bool:	    stmt->params[i].u.b = *msg++;	    break;	  case cli_asciiz:	  case cli_pasciiz:	    stmt->params[i].u.str = msg;	    msg += strlen(msg) + 1;	    break;	  default:	    response = cli_bad_statement;	    goto return_response;	    	}    } #ifdef THROW_EXCEPTION_ON_ERROR    try { 	response = stmt->cursor->select(stmt->query, cursorType);    } catch (dbException const& x) { 	response = (x.getErrCode() == dbDatabase::QueryError)	    ? cli_bad_statement : cli_runtime_error;    }#else    { 	dbDatabaseThreadContext* ctx = db->threadContext.get();	ctx->catched = true;	int errorCode = setjmp(ctx->unwind);	if (errorCode == 0) { 	    response = stmt->cursor->select(stmt->query, cursorType);	} else { 	    response = (errorCode == dbDatabase::QueryError)		? cli_bad_statement : cli_runtime_error;	}	ctx->catched = false;    }#endif	  return_response:    pack4(response);    return session->sock->write(&response, sizeof response);}void dbServer::serveClient(){    dbStatement *sp, **spp;    db->attach();    while (true) {	dbSession* session; 	{   	    dbCriticalSection cs(mutex);	    do { 		go.wait(mutex);		if (cancelWait) { 		    nIdleThreads -= 1;		    done.signal();		    db->detach();		    return;		}	    } while (waitList == NULL);	    session = waitList;	    waitList = waitList->next;	    session->next = activeList;	    activeList = session;	    nIdleThreads -= 1;	    nActiveThreads += 1;	}	cli_request req;	int4 response = cli_ok;	bool online = true;	while (online && session->sock->read(&req, sizeof req)) { 	    req.unpack();	    int length = req.length - sizeof(req);	    dbSmallBuffer msg(length);	    if (length > 0) { 		if (!session->sock->read(msg, length)) {		    break;		}	    }	    switch(req.cmd) { 	      case cli_cmd_close_session:                while (session->dropped_tables != NULL) {                    dbTableDescriptor* next = session->dropped_tables->nextDbTable;                    delete session->dropped_tables;                    session->dropped_tables = next;                } 		db->commit();		session->in_transaction = false;		online = false;		break;	      case cli_cmd_prepare_and_execute:		online = select(session, req.stmt_id, msg, true); 		session->in_transaction = true;		break;	      case cli_cmd_execute:		online = select(session, req.stmt_id, msg, false); 		break;	      case cli_cmd_get_first:		online = get_first(session, req.stmt_id);		break;	      case cli_cmd_get_last:		online = get_last(session, req.stmt_id);		break;	      case cli_cmd_get_next:		online = get_next(session, req.stmt_id);		break;	      case cli_cmd_get_prev:		online = get_prev(session, req.stmt_id);		break;              case cli_cmd_skip:                online = skip(session, req.stmt_id, msg);                break;              case cli_cmd_freeze:                online = freeze(session, req.stmt_id);                break;              case cli_cmd_unfreeze:                online = unfreeze(session, req.stmt_id);                break;  	      case cli_cmd_free_statement:		for (spp = &session->stmts; (sp = *spp) != NULL; spp = &sp->next)		{		    if (sp->id == req.stmt_id) { 			*spp = sp->next;			delete sp;			break;		    }		}		break;	      case cli_cmd_abort:                while (session->dropped_tables != NULL) {                    dbTableDescriptor* next = session->dropped_tables->nextDbTable;                    db->linkTable(session->dropped_tables, session->dropped_tables->tableId);                    session->dropped_tables = next;                }                if (session->existed_tables != NULL) {                     while (db->tables != session->existed_tables) {                         dbTableDescriptor* table = db->tables;                        db->unlinkTable(table);                        delete table;                    }                    session->existed_tables = NULL;                }		db->rollback();		session->in_transaction = false;		online = session->sock->write(&response, sizeof response);		break;	      case cli_cmd_commit:		                while (session->dropped_tables != NULL) {                    dbTableDescriptor* next = session->dropped_tables->nextDbTable;                    delete session->dropped_tables;                    session->dropped_tables = next;                }                session->existed_tables = NULL;		db->commit();		session->in_transaction = false;		online = session->sock->write(&response, sizeof response);		break;	      case cli_cmd_precommit:		db->precommit();		online = session->sock->write(&response, sizeof response);		break;	      case cli_cmd_update:		update(session, req.stmt_id, msg);		break;			      case cli_cmd_remove:		remove(session, req.stmt_id);		break;			      case cli_cmd_prepare_and_insert:		insert(session, req.stmt_id, msg, true);		session->in_transaction = true;		break;			      case cli_cmd_insert:		insert(session, req.stmt_id, msg, false);		break;			      case cli_cmd_describe_table:		describe_table(session, (char*)msg);		break;              case cli_cmd_show_tables:                show_tables(session);                break;              case cli_cmd_create_table:                online = create_table(session, msg);                break;              case cli_cmd_drop_table:                online = drop_table(session, msg);                break;              case cli_cmd_alter_index:                online = alter_index(session, msg);                break;	    }	}		if (session->in_transaction) {             while (session->dropped_tables != NULL) {                dbTableDescriptor* next = session->dropped_tables->nextDbTable;                db->linkTable(session->dropped_tables, session->dropped_tables->tableId);                session->dropped_tables = next;            }            if (session->existed_tables != NULL) {                 while (db->tables != session->existed_tables) {                     dbTableDescriptor* table = db->tables;                    db->unlinkTable(table);                    delete table;                }                session->existed_tables = NULL;            }	    db->rollback();	}	// Finish session	{   	    dbCriticalSection cs(mutex);	    dbSession** spp;	    delete session->sock;	    for (spp = &activeList; *spp != session; spp = &(*spp)->next); 	    *spp = session->next;	    session->next = freeList;	    freeList = session;	    nActiveThreads -= 1;	    if (cancelSession) { 		done.signal();		break;	    }	    if (nActiveThreads + nIdleThreads >= optimalNumberOfThreads) {		break;	    }	    nIdleThreads += 1;	}     }    db->detach();}void dbServer::acceptConnection(socket_t* acceptSock){    while (true) { 	socket_t* sock = acceptSock->accept();	dbCriticalSection cs(mutex);	if (cancelAccept) { 	    return;	}	if (sock != NULL) { 	    if (freeList == NULL) { 		freeList = new dbSession;		freeList->next = NULL;	    }	    dbSession* session = freeList;	    freeList = session->next;	    session->sock = sock;	    session->stmts = NULL;	    session->next = waitList;	    session->in_transaction = false;            session->existed_tables = NULL;            session->dropped_tables = NULL; 	    waitList = session;	    if (nIdleThreads == 0) { 		dbThread thread;		nIdleThreads = 1;		thread.create(serverThread, this);		thread.detach();	    }	    go.signal();	}    }}dbServer::~dbServer(){    dbServer** spp;    for (spp = &chain; *spp != this; spp = &(*spp)->next);    *spp = next;    delete globalAcceptSock;    delete localAcceptSock;    delete[] URL;}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?