📄 server.cpp
字号:
int newFlags = *data++ & 0xFF;
int4 response = dbCLI::alter_index(db, tableName, fieldName, newFlags);
pack4(response);
return session->sock->write(&response, sizeof response);
}
bool dbServer::select(dbSession* session, int stmt_id, char* msg, bool prepare)
{
int4 response;
int i, n_params, tkn, n_columns;
dbStatement* stmt = findStatement(session, stmt_id);
dbCursorType cursorType;
dbTableDescriptor* desc;
if (prepare) {
if (stmt == NULL) {
stmt = new dbStatement(stmt_id);
stmt->next = session->stmts;
session->stmts = stmt;
} else {
stmt->reset();
}
stmt->n_params = *msg++;
stmt->n_columns = n_columns = *msg++ & 0xFF;
stmt->params = new dbParameterBinding[stmt->n_params];
int len = unpack2(msg);
msg += 2;
session->scanner.reset(msg);
char *p, *end = msg + len;
if (session->scanner.get() != tkn_select) {
response = cli_bad_statement;
goto return_response;
}
if ((tkn = session->scanner.get()) == tkn_all) {
tkn = session->scanner.get();
}
if (tkn == tkn_from && session->scanner.get() == tkn_ident) {
if ((desc = db->findTable(session->scanner.ident)) != NULL) {
msg = checkColumns(stmt, n_columns, desc, end, response);
if (response != cli_ok) {
goto return_response;
}
stmt->cursor = new dbAnyCursor(*desc, dbCursorViewOnly, NULL);
stmt->cursor->setPrefetchMode(false);
} else {
response = cli_table_not_found;
goto return_response;
}
} else {
response = cli_bad_statement;
goto return_response;
}
p = session->scanner.p;
for (i = 0; p < end; i++) {
stmt->query.append(dbQueryElement::qExpression, p);
p += strlen(p) + 1;
if (p < end) {
int cliType = *p++;
static const dbQueryElement::ElementType type_map[] = {
dbQueryElement::qVarReference, // cli_oid
dbQueryElement::qVarBool, // cli_bool
dbQueryElement::qVarInt1, // cli_int1
dbQueryElement::qVarInt2, // cli_int2
dbQueryElement::qVarInt4, // cli_int4
dbQueryElement::qVarInt8, // cli_int8
dbQueryElement::qVarReal4, // cli_real4
dbQueryElement::qVarReal8, // cli_real8
dbQueryElement::qVarUnknown, // cli_decimal
dbQueryElement::qVarStringPtr, // cli_asciiz
dbQueryElement::qVarStringPtr, // cli_pasciiz
dbQueryElement::qVarUnknown, // cli_cstring
dbQueryElement::qVarUnknown, // cli_array_of_oid,
dbQueryElement::qVarUnknown, // cli_array_of_bool
dbQueryElement::qVarUnknown, // cli_array_of_int1
dbQueryElement::qVarUnknown, // cli_array_of_int2
dbQueryElement::qVarUnknown, // cli_array_of_int4
dbQueryElement::qVarUnknown, // cli_array_of_int8
dbQueryElement::qVarUnknown, // cli_array_of_real4
dbQueryElement::qVarUnknown, // cli_array_of_real8
dbQueryElement::qVarUnknown, // cli_array_of_decimal
dbQueryElement::qVarUnknown, // cli_array_of_string
dbQueryElement::qVarUnknown, // cli_any
dbQueryElement::qVarInt4, // cli_datetime
dbQueryElement::qVarUnknown, // cli_autoincrement
dbQueryElement::qVarRectangle,
dbQueryElement::qVarUnknown, // cli_unknown
};
stmt->params[i].type = cliType;
stmt->query.append(type_map[cliType], &stmt->params[i].u);
}
}
} else {
if (stmt == NULL) {
response = cli_bad_descriptor;
goto return_response;
}
}
stmt->firstFetch = true;
cursorType = *msg++ ? dbCursorForUpdate : dbCursorViewOnly;
for (i = 0, n_params = stmt->n_params; i < n_params; i++) {
switch (stmt->params[i].type) {
case cli_oid:
stmt->params[i].u.oid = unpack_oid(msg);
msg += sizeof(cli_oid_t);
break;
case cli_int1:
stmt->params[i].u.i1 = *msg++;
break;
case cli_int2:
msg = unpack2((char*)&stmt->params[i].u.i2, msg);
break;
case cli_int4:
msg = unpack4((char*)&stmt->params[i].u.i4, msg);
break;
case cli_int8:
msg = unpack8((char*)&stmt->params[i].u.i8, msg);
break;
case cli_real4:
msg = unpack4((char*)&stmt->params[i].u.r4, msg);
break;
case cli_real8:
msg = unpack8((char*)&stmt->params[i].u.r8, msg);
break;
case cli_bool:
stmt->params[i].u.b = *msg++;
break;
case cli_asciiz:
case cli_pasciiz:
stmt->params[i].u.str = msg;
msg += strlen(msg) + 1;
break;
case cli_rectangle:
assert(sizeof(cli_rectangle_t) == sizeof(rectangle));
msg = unpack_rectangle((cli_rectangle_t*)&stmt->params[i].u.rect, msg);
break;
default:
response = cli_bad_statement;
goto return_response;
}
}
#ifdef THROW_EXCEPTION_ON_ERROR
try {
response = stmt->cursor->select(stmt->query, cursorType);
} catch (dbException const& x) {
response = (x.getErrCode() == dbDatabase::QueryError)
? cli_bad_statement : cli_runtime_error;
}
#else
{
dbDatabaseThreadContext* ctx = db->threadContext.get();
ctx->catched = true;
int errorCode = setjmp(ctx->unwind);
if (errorCode == 0) {
response = stmt->cursor->select(stmt->query, cursorType);
} else {
response = (errorCode == dbDatabase::QueryError)
? cli_bad_statement : cli_runtime_error;
}
ctx->catched = false;
}
#endif
return_response:
pack4(response);
return session->sock->write(&response, sizeof response);
}
void dbServer::serveClient()
{
dbStatement *sp, **spp;
db->attach();
while (true) {
dbSession* session;
{
dbCriticalSection cs(mutex);
do {
go.wait(mutex);
if (cancelWait) {
nIdleThreads -= 1;
done.signal();
db->detach();
return;
}
} while (waitList == NULL);
session = waitList;
waitList = waitList->next;
session->next = activeList;
activeList = session;
nIdleThreads -= 1;
nActiveThreads += 1;
waitListLength -= 1;
}
cli_request req;
int4 response = cli_ok;
bool online = true;
while (online && session->sock->read(&req, sizeof req)) {
req.unpack();
int length = req.length - sizeof(req);
dbSmallBuffer msg(length);
if (length > 0) {
if (!session->sock->read(msg, length)) {
break;
}
}
switch(req.cmd) {
case cli_cmd_close_session:
while (session->dropped_tables != NULL) {
dbTableDescriptor* next = session->dropped_tables->nextDbTable;
delete session->dropped_tables;
session->dropped_tables = next;
}
db->commit();
session->in_transaction = false;
online = false;
break;
case cli_cmd_prepare_and_execute:
online = select(session, req.stmt_id, msg, true);
session->in_transaction = true;
break;
case cli_cmd_execute:
online = select(session, req.stmt_id, msg, false);
break;
case cli_cmd_get_first:
online = get_first(session, req.stmt_id);
break;
case cli_cmd_get_last:
online = get_last(session, req.stmt_id);
break;
case cli_cmd_get_next:
online = get_next(session, req.stmt_id);
break;
case cli_cmd_get_prev:
online = get_prev(session, req.stmt_id);
break;
case cli_cmd_skip:
online = skip(session, req.stmt_id, msg);
break;
case cli_cmd_seek:
online = seek(session, req.stmt_id, msg);
break;
case cli_cmd_freeze:
online = freeze(session, req.stmt_id);
break;
case cli_cmd_unfreeze:
online = unfreeze(session, req.stmt_id);
break;
case cli_cmd_free_statement:
for (spp = &session->stmts; (sp = *spp) != NULL; spp = &sp->next)
{
if (sp->id == req.stmt_id) {
*spp = sp->next;
delete sp;
break;
}
}
break;
case cli_cmd_abort:
while (session->dropped_tables != NULL) {
dbTableDescriptor* next = session->dropped_tables->nextDbTable;
db->linkTable(session->dropped_tables, session->dropped_tables->tableId);
session->dropped_tables = next;
}
if (session->existed_tables != NULL) {
while (db->tables != session->existed_tables) {
dbTableDescriptor* table = db->tables;
db->unlinkTable(table);
delete table;
}
session->existed_tables = NULL;
}
db->rollback();
session->in_transaction = false;
online = session->sock->write(&response, sizeof response);
break;
case cli_cmd_commit:
while (session->dropped_tables != NULL) {
dbTableDescriptor* next = session->dropped_tables->nextDbTable;
delete session->dropped_tables;
session->dropped_tables = next;
}
session->existed_tables = NULL;
db->commit();
session->in_transaction = false;
online = session->sock->write(&response, sizeof response);
break;
case cli_cmd_precommit:
db->precommit();
online = session->sock->write(&response, sizeof response);
break;
case cli_cmd_update:
online = update(session, req.stmt_id, msg);
break;
case cli_cmd_remove:
online = remove(session, req.stmt_id);
break;
case cli_cmd_remove_current:
online = remove_current(session, req.stmt_id);
break;
case cli_cmd_prepare_and_insert:
online = insert(session, req.stmt_id, msg, true);
session->in_transaction = true;
break;
case cli_cmd_insert:
online = insert(session, req.stmt_id, msg, false);
break;
case cli_cmd_describe_table:
online = describe_table(session, (char*)msg);
break;
case cli_cmd_show_tables:
online = show_tables(session);
break;
case cli_cmd_create_table:
online = create_table(session, msg, true);
break;
case cli_cmd_alter_table:
online = create_table(session, msg, false);
break;
case cli_cmd_drop_table:
online = drop_table(session, msg);
break;
case cli_cmd_alter_index:
online = alter_index(session, msg);
break;
}
}
if (session->in_transaction) {
while (session->dropped_tables != NULL) {
dbTableDescriptor* next = session->dropped_tables->nextDbTable;
db->linkTable(session->dropped_tables, session->dropped_tables->tableId);
session->dropped_tables = next;
}
if (session->existed_tables != NULL) {
while (db->tables != session->existed_tables) {
dbTableDescriptor* table = db->tables;
db->unlinkTable(table);
delete table;
}
session->existed_tables = NULL;
}
db->rollback();
}
// Finish session
{
dbCriticalSection cs(mutex);
dbSession** spp;
delete session->sock;
for (spp = &activeList; *spp != session; spp = &(*spp)->next);
*spp = session->next;
session->next = freeList;
freeList = session;
nActiveThreads -= 1;
if (cancelSession) {
done.signal();
break;
}
if (nActiveThreads + nIdleThreads >= optimalNumberOfThreads) {
break;
}
nIdleThreads += 1;
}
}
db->detach();
}
void dbServer::acceptConnection(socket_t* acceptSock)
{
while (true) {
socket_t* sock = acceptSock->accept();
dbCriticalSection cs(mutex);
if (cancelAccept) {
return;
}
if (sock != NULL) {
if (freeList == NULL) {
freeList = new dbSession;
freeList->next = NULL;
}
dbSession* session = freeList;
freeList = session->next;
session->sock = sock;
session->stmts = NULL;
session->next = waitList;
session->in_transaction = false;
session->existed_tables = NULL;
session->dropped_tables = NULL;
waitList = session;
waitListLength += 1;
if (nIdleThreads < waitListLength) {
dbThread thread;
nIdleThreads += 1;
thread.create(serverThread, this);
thread.detach();
}
go.signal();
}
}
}
dbServer::~dbServer()
{
dbServer** spp;
for (spp = &chain; *spp != this; spp = &(*spp)->next);
*spp = next;
delete globalAcceptSock;
delete localAcceptSock;
delete[] URL;
}
END_FASTDB_NAMESPACE
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -