📄 testconc.cpp
字号:
//-< TESTCONC.CPP >--------------------------------------------------*--------*// FastDB Version 1.0 (c) 1999 GARRET * ? *// (Main Memory Database Management System) * /\| *// * / \ *// Created: 31-Nov-2002 K.A. Knizhnik * / [] \ *// Last update: 31-Nov-2002 K.A. Knizhnik * GARRET *//-------------------------------------------------------------------*--------*// Test for concurrent update model//-------------------------------------------------------------------*--------*#include <stdio.h>#include "fastdb.h"class Bank { public: int8 total; bool online; TYPE_DESCRIPTOR((FIELD(total), FIELD(online)));};REGISTER(Bank);class Account { public: int4 id; int8 amount; TYPE_DESCRIPTOR((KEY(id, HASHED), FIELD(amount)));};REGISTER(Account);dbLocalEvent timer;const int nUpdateIterations = 10000;const int nMaxAccounts = 1000;void thread_proc inspector(void* arg){ dbDatabase* db = (dbDatabase*)arg; db->attach(); dbCursor<Account> accounts; dbCursor<Bank> bank; dbMutex mutex; int n = 0; bool online; mutex.lock(); do { int8 total = 0; if (accounts.select() > 0) { do { total += accounts->amount; } while (accounts.next()); } bank.select(); assert(total == bank->total); online = bank->online; db->commit(); n += 1; timer.wait(mutex, 1); } while (online); mutex.unlock(); printf("Thread performs %d inspection iterations\n", n);}int main(int argc, char* argv[]){ int i; int nThreads = 4;#ifdef REPLICATION_SUPPORT char dbName[16]; if (argc < 3) { fprintf(stderr, "Usage: testconc (update|inspect) <REPLICATION-NODE-ID> [number-of-inspection-threads]\n"); return 1; } int replNodeId = atoi(argv[2]); sprintf(dbName, "testconc%d", replNodeId); if (argc == 4) { nThreads = atoi(argv[3]); }#else char* dbName = "testconc"; if (argc < 2) { fprintf(stderr, "Usage: testconc (update|inspect) [number-of-inspection-threads]\n"); return 1; } if (argc == 3) { nThreads = atoi(argv[2]); }#endif if (strcmp(argv[1], "inspect") == 0) { dbDatabase db(dbDatabase::dbConcurrentRead); if (db.open(dbName)) { printf("Start %d inspection threads\n", nThreads); dbThread* thread = new dbThread[nThreads]; timer.open(false); for (i = 0; i < nThreads; i++) { thread[i].create(inspector, &db); } for (i = 0; i < nThreads; i++) { thread[i].join(); } timer.close(); db.close(); return EXIT_SUCCESS; } } else {#ifdef REPLICATION_SUPPORT char* servers[2] = {"localhost:6101", "localhost:6102"}; char fileName[32]; sprintf(fileName, "%s.fdb", dbName); dbReplicatedDatabase db(dbDatabase::dbConcurrentUpdate); if (db.open(dbName, fileName, replNodeId, servers, items(servers))) { printf("Update process started, now you should start inspectors: \"testconc inspector 1 N\", where N is number of inspectpors\n"); #else dbDatabase db(dbDatabase::dbConcurrentUpdate); if (db.open(dbName)) { printf("Update process started, now you should start inspectors: \"testconc inspector N\", where N is number of inspectpors\n"); #endif dbCursor<Bank> bank(dbCursorForUpdate); dbCursor<Account> accounts(dbCursorForUpdate); dbQuery q; int id; q = "id=",id; // Initialization if (bank.select() == 0) { Bank theBank; theBank.total = 0; theBank.online = true; insert(theBank); } else { bank->online = true; bank.update(); } db.commit(); for (i = 0; i < nUpdateIterations; i++) { int nAccounts = accounts.select(); id = 0; int8 delta = i; if (accounts.last()) { id = accounts->id + 1; accounts->amount += i; accounts.update(); delta += i; } Account acc; acc.amount = i; acc.id = id; insert(acc); if (nAccounts > nMaxAccounts) { accounts.first(); id = accounts->id; int n = accounts.select(q); assert(n == 1); delta -= accounts->amount; accounts.remove(); } bank.select(); bank->total += delta; bank.update(); db.commit(); } printf("End of %d update iterations\n", nUpdateIterations); bank.select(); bank->online = false; bank.update(); db.close(); return EXIT_SUCCESS; } } printf("Failed to open database\n"); return EXIT_FAILURE;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -