server.cpp
来自「一个功能强大的内存数据库源代码,c++编写,有详细的注释」· C++ 代码 · 共 1,520 行 · 第 1/3 页
CPP
1,520 行
{ dbTableDescriptor* desc = db->findTableByName(table); if (desc == NULL) { char response[8]; pack4(response, 0); pack4(response+4, -1); return session->sock->write(response, sizeof response); } else { int i, length = 0; dbFieldDescriptor* fd = desc->columns; for (i = desc->nColumns; --i >= 0;) { length += strlen(fd->name)+2; fd = fd->next; } dbSmallBuffer response(length+8); char* p = (char*)response; pack4(p, length); pack4(p+4, desc->nColumns); p += 8; for (i = desc->nColumns, fd = desc->columns; --i >= 0;) { *p = map_type(fd); p += 1; strcpy(p, fd->name); p += strlen(fd->name)+1; fd = fd->next; } return session->sock->write(response, length+8); }}bool dbServer::show_tables(dbSession* session){ dbTableDescriptor* desc=db->tables; if (desc == NULL) { char response[8]; pack4(response, 0); pack4(response+4, -1); return session->sock->write(response, sizeof response); } else { int length = 0, n = 0; for (desc=db->tables; desc != NULL; desc=desc->nextDbTable) { if (strcmp(desc->name, "Metatable")) { length += strlen(desc->name)+1; n++; } } dbSmallBuffer response(length+8); char* p = (char*)response; pack4(p, length); pack4(p+4, n); p += 8; for (desc=db->tables; desc != NULL; desc=desc->nextDbTable) { if (strcmp(desc->name, "Metatable")) { strcpy(p, desc->name); p += strlen(desc->name)+1; } } return session->sock->write(response, length+8); }}bool dbServer::create_table(dbSession* session, char* data){ db->beginTransaction(dbDatabase::dbExclusiveLock); db->modified = true; char* tableName = data; data += strlen(data) + 1; int nColumns = *data++ & 0xFF; cli_field_descriptor* columns = new cli_field_descriptor[nColumns]; for (int i = 0; i < nColumns; i++) { columns[i].type = (cli_var_type)*data++; columns[i].flags = *data++ & 0xFF; columns[i].name = data; data += strlen(data) + 1; if (*data != 0) { columns[i].refTableName = data; data += strlen(data) + 1; } else { columns[i].refTableName = NULL; data += 1; } if (*data != 0) { columns[i].inverseRefFieldName = data; data += strlen(data) + 1; } else { columns[i].inverseRefFieldName = NULL; data += 1; } } if (session->existed_tables == NULL) { session->existed_tables = db->tables; } int4 response = dbCLI::create_table(db, tableName, nColumns, columns); pack4(response); return session->sock->write(&response, sizeof response);}bool dbServer::drop_table(dbSession* session, char* tableName){ db->beginTransaction(dbDatabase::dbExclusiveLock); dbTableDescriptor* desc = db->findTableByName(tableName); int4 response = cli_ok; if (desc != NULL) { db->dropTable(desc); if (desc == session->existed_tables) { session->existed_tables = desc->nextDbTable; } db->unlinkTable(desc); desc->nextDbTable = session->dropped_tables; session->dropped_tables = desc; } else { response = cli_table_not_found; } pack4(response); return session->sock->write(&response, sizeof response);}bool dbServer::alter_index(dbSession* session, char* data){ char* tableName = data; data += strlen(data) + 1; char* fieldName = data; data += strlen(data) + 1; int newFlags = *data++ & 0xFF; int4 response = dbCLI::alter_index(db, tableName, fieldName, newFlags); pack4(response); return session->sock->write(&response, sizeof response);}bool dbServer::select(dbSession* session, int stmt_id, char* msg, bool prepare){ int4 response; int i, n_params, tkn, n_columns; dbStatement* stmt = findStatement(session, stmt_id); dbCursorType cursorType; dbTableDescriptor* desc; if (prepare) { if (stmt == NULL) { stmt = new dbStatement(stmt_id); stmt->next = session->stmts; session->stmts = stmt; } else { stmt->reset(); } stmt->n_params = *msg++; stmt->n_columns = n_columns = *msg++; stmt->params = new dbParameterBinding[stmt->n_params]; stmt->firstFetch = true; int len = unpack2(msg); msg += 2; session->scanner.reset(msg); char *p, *end = msg + len; if (session->scanner.get() != tkn_select) { response = cli_bad_statement; goto return_response; } if ((tkn = session->scanner.get()) == tkn_all) { tkn = session->scanner.get(); } if (tkn == tkn_from && session->scanner.get() == tkn_ident) { if ((desc = db->findTable(session->scanner.ident)) != NULL) { msg = checkColumns(stmt, n_columns, desc, end, response); if (response != cli_ok) { goto return_response; } stmt->cursor = new dbAnyCursor(*desc, dbCursorViewOnly, NULL); stmt->cursor->setPrefetchMode(false); } else { response = cli_table_not_found; goto return_response; } } else { response = cli_bad_statement; goto return_response; } p = session->scanner.p; for (i = 0; p < end; i++) { stmt->query.append(dbQueryElement::qExpression, p); p += strlen(p) + 1; if (p < end) { int cliType = *p++; static const dbQueryElement::ElementType type_map[] = { dbQueryElement::qVarReference, // cli_oid dbQueryElement::qVarBool, // cli_bool dbQueryElement::qVarInt1, // cli_int1 dbQueryElement::qVarInt2, // cli_int2 dbQueryElement::qVarInt4, // cli_int4 dbQueryElement::qVarInt8, // cli_int8 dbQueryElement::qVarReal4, // cli_real4 dbQueryElement::qVarReal8, // cli_real8 dbQueryElement::qVarStringPtr, // cli_asciiz dbQueryElement::qVarStringPtr, // cli_pasciiz }; stmt->params[i].type = cliType; stmt->query.append(type_map[cliType], &stmt->params[i].u); } } } else { if (stmt == NULL) { response = cli_bad_descriptor; goto return_response; } } cursorType = *msg++ ? dbCursorForUpdate : dbCursorViewOnly; for (i = 0, n_params = stmt->n_params; i < n_params; i++) { switch (stmt->params[i].type) { case cli_oid: stmt->params[i].u.oid = unpack_oid(msg); msg += sizeof(cli_oid_t); break; case cli_int1: stmt->params[i].u.i1 = *msg++; break; case cli_int2: msg = unpack2((char*)&stmt->params[i].u.i2, msg); break; case cli_int4: msg = unpack4((char*)&stmt->params[i].u.i4, msg); break; case cli_int8: msg = unpack8((char*)&stmt->params[i].u.i8, msg); break; case cli_real4: msg = unpack4((char*)&stmt->params[i].u.r4, msg); break; case cli_real8: msg = unpack8((char*)&stmt->params[i].u.r8, msg); break; case cli_bool: stmt->params[i].u.b = *msg++; break; case cli_asciiz: case cli_pasciiz: stmt->params[i].u.str = msg; msg += strlen(msg) + 1; break; default: response = cli_bad_statement; goto return_response; } } #ifdef THROW_EXCEPTION_ON_ERROR try { response = stmt->cursor->select(stmt->query, cursorType); } catch (dbException const& x) { response = (x.getErrCode() == dbDatabase::QueryError) ? cli_bad_statement : cli_runtime_error; }#else { dbDatabaseThreadContext* ctx = db->threadContext.get(); ctx->catched = true; int errorCode = setjmp(ctx->unwind); if (errorCode == 0) { response = stmt->cursor->select(stmt->query, cursorType); } else { response = (errorCode == dbDatabase::QueryError) ? cli_bad_statement : cli_runtime_error; } ctx->catched = false; }#endif return_response: pack4(response); return session->sock->write(&response, sizeof response);}void dbServer::serveClient(){ dbStatement *sp, **spp; db->attach(); while (true) { dbSession* session; { dbCriticalSection cs(mutex); do { go.wait(mutex); if (cancelWait) { nIdleThreads -= 1; done.signal(); db->detach(); return; } } while (waitList == NULL); session = waitList; waitList = waitList->next; session->next = activeList; activeList = session; nIdleThreads -= 1; nActiveThreads += 1; } cli_request req; int4 response = cli_ok; bool online = true; while (online && session->sock->read(&req, sizeof req)) { req.unpack(); int length = req.length - sizeof(req); dbSmallBuffer msg(length); if (length > 0) { if (!session->sock->read(msg, length)) { break; } } switch(req.cmd) { case cli_cmd_close_session: while (session->dropped_tables != NULL) { dbTableDescriptor* next = session->dropped_tables->nextDbTable; delete session->dropped_tables; session->dropped_tables = next; } db->commit(); session->in_transaction = false; online = false; break; case cli_cmd_prepare_and_execute: online = select(session, req.stmt_id, msg, true); session->in_transaction = true; break; case cli_cmd_execute: online = select(session, req.stmt_id, msg, false); break; case cli_cmd_get_first: online = get_first(session, req.stmt_id); break; case cli_cmd_get_last: online = get_last(session, req.stmt_id); break; case cli_cmd_get_next: online = get_next(session, req.stmt_id); break; case cli_cmd_get_prev: online = get_prev(session, req.stmt_id); break; case cli_cmd_skip: online = skip(session, req.stmt_id, msg); break; case cli_cmd_freeze: online = freeze(session, req.stmt_id); break; case cli_cmd_unfreeze: online = unfreeze(session, req.stmt_id); break; case cli_cmd_free_statement: for (spp = &session->stmts; (sp = *spp) != NULL; spp = &sp->next) { if (sp->id == req.stmt_id) { *spp = sp->next; delete sp; break; } } break; case cli_cmd_abort: while (session->dropped_tables != NULL) { dbTableDescriptor* next = session->dropped_tables->nextDbTable; db->linkTable(session->dropped_tables, session->dropped_tables->tableId); session->dropped_tables = next; } if (session->existed_tables != NULL) { while (db->tables != session->existed_tables) { dbTableDescriptor* table = db->tables; db->unlinkTable(table); delete table; } session->existed_tables = NULL; } db->rollback(); session->in_transaction = false; online = session->sock->write(&response, sizeof response); break; case cli_cmd_commit: while (session->dropped_tables != NULL) { dbTableDescriptor* next = session->dropped_tables->nextDbTable; delete session->dropped_tables; session->dropped_tables = next; } session->existed_tables = NULL; db->commit(); session->in_transaction = false; online = session->sock->write(&response, sizeof response); break; case cli_cmd_precommit: db->precommit(); online = session->sock->write(&response, sizeof response); break; case cli_cmd_update: update(session, req.stmt_id, msg); break; case cli_cmd_remove: remove(session, req.stmt_id); break; case cli_cmd_prepare_and_insert: insert(session, req.stmt_id, msg, true); session->in_transaction = true; break; case cli_cmd_insert: insert(session, req.stmt_id, msg, false); break; case cli_cmd_describe_table: describe_table(session, (char*)msg); break; case cli_cmd_show_tables: show_tables(session); break; case cli_cmd_create_table: online = create_table(session, msg); break; case cli_cmd_drop_table: online = drop_table(session, msg); break; case cli_cmd_alter_index: online = alter_index(session, msg); break; } } if (session->in_transaction) { while (session->dropped_tables != NULL) { dbTableDescriptor* next = session->dropped_tables->nextDbTable; db->linkTable(session->dropped_tables, session->dropped_tables->tableId); session->dropped_tables = next; } if (session->existed_tables != NULL) { while (db->tables != session->existed_tables) { dbTableDescriptor* table = db->tables; db->unlinkTable(table); delete table; } session->existed_tables = NULL; } db->rollback(); } // Finish session { dbCriticalSection cs(mutex); dbSession** spp; delete session->sock; for (spp = &activeList; *spp != session; spp = &(*spp)->next); *spp = session->next; session->next = freeList; freeList = session; nActiveThreads -= 1; if (cancelSession) { done.signal(); break; } if (nActiveThreads + nIdleThreads >= optimalNumberOfThreads) { break; } nIdleThreads += 1; } } db->detach();}void dbServer::acceptConnection(socket_t* acceptSock){ while (true) { socket_t* sock = acceptSock->accept(); dbCriticalSection cs(mutex); if (cancelAccept) { return; } if (sock != NULL) { if (freeList == NULL) { freeList = new dbSession; freeList->next = NULL; } dbSession* session = freeList; freeList = session->next; session->sock = sock; session->stmts = NULL; session->next = waitList; session->in_transaction = false; session->existed_tables = NULL; session->dropped_tables = NULL; waitList = session; if (nIdleThreads == 0) { dbThread thread; nIdleThreads = 1; thread.create(serverThread, this); thread.detach(); } go.signal(); } }}dbServer::~dbServer(){ dbServer** spp; for (spp = &chain; *spp != this; spp = &(*spp)->next); *spp = next; delete globalAcceptSock; delete localAcceptSock; delete[] URL;}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?