📄 server.cpp
字号:
bool dbServer::alter_index(dbSession* session, char* data){ char* tableName = data; data += strlen(data) + 1; char* fieldName = data; data += strlen(data) + 1; int newFlags = *data++ & 0xFF; int4 response = dbCLI::alter_index(db, tableName, fieldName, newFlags); pack4(response); return session->sock->write(&response, sizeof response);}bool dbServer::select(dbSession* session, int stmt_id, char* msg, bool prepare){ int4 response; int i, n_params, tkn, n_columns; dbStatement* stmt = findStatement(session, stmt_id); dbCursorType cursorType; dbTableDescriptor* desc; if (prepare) { if (stmt == NULL) { stmt = new dbStatement(stmt_id); stmt->next = session->stmts; session->stmts = stmt; } else { stmt->reset(); } stmt->n_params = *msg++; stmt->n_columns = n_columns = *msg++; stmt->params = new dbParameterBinding[stmt->n_params]; int len = unpack2(msg); msg += 2; session->scanner.reset(msg); char *p, *end = msg + len; if (session->scanner.get() != tkn_select) { response = cli_bad_statement; goto return_response; } if ((tkn = session->scanner.get()) == tkn_all) { tkn = session->scanner.get(); } if (tkn == tkn_from && session->scanner.get() == tkn_ident) { if ((desc = db->findTable(session->scanner.ident)) != NULL) { msg = checkColumns(stmt, n_columns, desc, end, response); if (response != cli_ok) { goto return_response; } stmt->cursor = new dbAnyCursor(*desc, dbCursorViewOnly, NULL); stmt->cursor->setPrefetchMode(false); } else { response = cli_table_not_found; goto return_response; } } else { response = cli_bad_statement; goto return_response; } p = session->scanner.p; for (i = 0; p < end; i++) { stmt->query.append(dbQueryElement::qExpression, p); p += strlen(p) + 1; if (p < end) { int cliType = *p++; static const dbQueryElement::ElementType type_map[] = { dbQueryElement::qVarReference, // cli_oid dbQueryElement::qVarBool, // cli_bool dbQueryElement::qVarInt1, // cli_int1 dbQueryElement::qVarInt2, // cli_int2 dbQueryElement::qVarInt4, // cli_int4 dbQueryElement::qVarInt8, // cli_int8 dbQueryElement::qVarReal4, // cli_real4 dbQueryElement::qVarReal8, // cli_real8 dbQueryElement::qVarStringPtr, // cli_asciiz dbQueryElement::qVarStringPtr, // cli_pasciiz }; stmt->params[i].type = cliType; stmt->query.append(type_map[cliType], &stmt->params[i].u); } } } else { if (stmt == NULL) { response = cli_bad_descriptor; goto return_response; } } stmt->firstFetch = true; cursorType = *msg++ ? dbCursorForUpdate : dbCursorViewOnly; for (i = 0, n_params = stmt->n_params; i < n_params; i++) { switch (stmt->params[i].type) { case cli_oid: stmt->params[i].u.oid = unpack_oid(msg); msg += sizeof(cli_oid_t); break; case cli_int1: stmt->params[i].u.i1 = *msg++; break; case cli_int2: msg = unpack2((char*)&stmt->params[i].u.i2, msg); break; case cli_int4: msg = unpack4((char*)&stmt->params[i].u.i4, msg); break; case cli_int8: msg = unpack8((char*)&stmt->params[i].u.i8, msg); break; case cli_real4: msg = unpack4((char*)&stmt->params[i].u.r4, msg); break; case cli_real8: msg = unpack8((char*)&stmt->params[i].u.r8, msg); break; case cli_bool: stmt->params[i].u.b = *msg++; break; case cli_asciiz: case cli_pasciiz: stmt->params[i].u.str = msg; msg += strlen(msg) + 1; break; default: response = cli_bad_statement; goto return_response; } } #ifdef THROW_EXCEPTION_ON_ERROR try { response = stmt->cursor->select(stmt->query, cursorType); } catch (dbException const& x) { response = (x.getErrCode() == dbDatabase::QueryError) ? cli_bad_statement : cli_runtime_error; }#else { dbDatabaseThreadContext* ctx = db->threadContext.get(); ctx->catched = true; int errorCode = setjmp(ctx->unwind); if (errorCode == 0) { response = stmt->cursor->select(stmt->query, cursorType); } else { response = (errorCode == dbDatabase::QueryError) ? cli_bad_statement : cli_runtime_error; } ctx->catched = false; }#endif return_response: pack4(response); return session->sock->write(&response, sizeof response);}void dbServer::serveClient(){ dbStatement *sp, **spp; db->attach(); while (true) { dbSession* session; { dbCriticalSection cs(mutex); do { go.wait(mutex); if (cancelWait) { nIdleThreads -= 1; done.signal(); db->detach(); return; } } while (waitList == NULL); session = waitList; waitList = waitList->next; session->next = activeList; activeList = session; nIdleThreads -= 1; nActiveThreads += 1; waitListLength -= 1; } cli_request req; int4 response = cli_ok; bool online = true; while (online && session->sock->read(&req, sizeof req)) { req.unpack(); int length = req.length - sizeof(req); dbSmallBuffer msg(length); if (length > 0) { if (!session->sock->read(msg, length)) { break; } } switch(req.cmd) { case cli_cmd_close_session: while (session->dropped_tables != NULL) { dbTableDescriptor* next = session->dropped_tables->nextDbTable; delete session->dropped_tables; session->dropped_tables = next; } db->commit(); session->in_transaction = false; online = false; break; case cli_cmd_prepare_and_execute: online = select(session, req.stmt_id, msg, true); session->in_transaction = true; break; case cli_cmd_execute: online = select(session, req.stmt_id, msg, false); break; case cli_cmd_get_first: online = get_first(session, req.stmt_id); break; case cli_cmd_get_last: online = get_last(session, req.stmt_id); break; case cli_cmd_get_next: online = get_next(session, req.stmt_id); break; case cli_cmd_get_prev: online = get_prev(session, req.stmt_id); break; case cli_cmd_skip: online = skip(session, req.stmt_id, msg); break; case cli_cmd_seek: online = seek(session, req.stmt_id, msg); break; case cli_cmd_freeze: online = freeze(session, req.stmt_id); break; case cli_cmd_unfreeze: online = unfreeze(session, req.stmt_id); break; case cli_cmd_free_statement: for (spp = &session->stmts; (sp = *spp) != NULL; spp = &sp->next) { if (sp->id == req.stmt_id) { *spp = sp->next; delete sp; break; } } break; case cli_cmd_abort: while (session->dropped_tables != NULL) { dbTableDescriptor* next = session->dropped_tables->nextDbTable; db->linkTable(session->dropped_tables, session->dropped_tables->tableId); session->dropped_tables = next; } if (session->existed_tables != NULL) { while (db->tables != session->existed_tables) { dbTableDescriptor* table = db->tables; db->unlinkTable(table); delete table; } session->existed_tables = NULL; } db->rollback(); session->in_transaction = false; online = session->sock->write(&response, sizeof response); break; case cli_cmd_commit: while (session->dropped_tables != NULL) { dbTableDescriptor* next = session->dropped_tables->nextDbTable; delete session->dropped_tables; session->dropped_tables = next; } session->existed_tables = NULL; db->commit(); session->in_transaction = false; online = session->sock->write(&response, sizeof response); break; case cli_cmd_precommit: db->precommit(); online = session->sock->write(&response, sizeof response); break; case cli_cmd_update: update(session, req.stmt_id, msg); break; case cli_cmd_remove: remove(session, req.stmt_id); break; case cli_cmd_prepare_and_insert: insert(session, req.stmt_id, msg, true); session->in_transaction = true; break; case cli_cmd_insert: insert(session, req.stmt_id, msg, false); break; case cli_cmd_describe_table: describe_table(session, (char*)msg); break; case cli_cmd_show_tables: show_tables(session); break; case cli_cmd_create_table: online = create_table(session, msg, true); break; case cli_cmd_alter_table: online = create_table(session, msg, false); break; case cli_cmd_drop_table: online = drop_table(session, msg); break; case cli_cmd_alter_index: online = alter_index(session, msg); break; } } if (session->in_transaction) { while (session->dropped_tables != NULL) { dbTableDescriptor* next = session->dropped_tables->nextDbTable; db->linkTable(session->dropped_tables, session->dropped_tables->tableId); session->dropped_tables = next; } if (session->existed_tables != NULL) { while (db->tables != session->existed_tables) { dbTableDescriptor* table = db->tables; db->unlinkTable(table); delete table; } session->existed_tables = NULL; } db->rollback(); } // Finish session { dbCriticalSection cs(mutex); dbSession** spp; delete session->sock; for (spp = &activeList; *spp != session; spp = &(*spp)->next); *spp = session->next; session->next = freeList; freeList = session; nActiveThreads -= 1; if (cancelSession) { done.signal(); break; } if (nActiveThreads + nIdleThreads >= optimalNumberOfThreads) { break; } nIdleThreads += 1; } } db->detach();}void dbServer::acceptConnection(socket_t* acceptSock){ while (true) { socket_t* sock = acceptSock->accept(); dbCriticalSection cs(mutex); if (cancelAccept) { return; } if (sock != NULL) { if (freeList == NULL) { freeList = new dbSession; freeList->next = NULL; } dbSession* session = freeList; freeList = session->next; session->sock = sock; session->stmts = NULL; session->next = waitList; session->in_transaction = false; session->existed_tables = NULL; session->dropped_tables = NULL; waitList = session; waitListLength += 1; if (nIdleThreads < waitListLength) { dbThread thread; nIdleThreads += 1; thread.create(serverThread, this); thread.detach(); } go.signal(); } }}dbServer::~dbServer(){ dbServer** spp; for (spp = &chain; *spp != this; spp = &(*spp)->next); *spp = next; delete globalAcceptSock; delete localAcceptSock; delete[] URL;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -