📄 mastrepl.c
字号:
/***************************************************************** * * * Copyright (c) 2001-2006 McObject LLC. All Right Reserved. * * * *****************************************************************//* * ++ * * PROJECT: eXtremeDB-HA Application Framework * * SUBSYSTEM: HA support * * MODULE: mastrepl.c * * ABSTRACT: The main module of HA framework. * * * VERSION: 1.0 * * HISTORY: * 1.0- 1 AD 01-Mar-2003 Created it was * 2 AG 19-Mar-2003 Ported to Linux * 3 SS 19-Jul-2003 Reconstructed * 4 SS 11-Oct-2003 shared commit mode support was added * * -- *//* * This is the main module. It contains comand line parsing and * initialization code for master and replica mode. * * Schema for this sample is in monitors.mco file. * */#include "app.h"#include "mcoHA.h"#include "watchdog.h"#ifdef _PERFHA #include "perf2.h"#else //_PERFHA #include "monitorDB.h" #include "sensor.h"#endif //_PERFHA/* definitions & variables */THREAD_PROC_DECLARE(ListenToReplicas);THREAD_PROC_DECLARE(SynchCommit);DEFINE_PROCESS_MASKS(set);#ifdef _WIN32_WCE extern HWND hScr; extern HANDLE hMain;#endifextern timer_unit init_time;ha_h haInstances[MAX_HA_INSTANCES]; // the list of created databasesint NumberOfDb = 1; // current number of db instancesunsigned int CurrentDbInstance = 0; // number of current db instance for replica//int wdt_init = 0; // watchdog initialization flagint wdt_flag = 0; // the flag for shared commit watchdogint master_number = 0;int main_master = -1;int replica_mode = -1;long stop_flag = 0;long synch_flag = 1;int cancel_flag = 0; // turns on "replica cancellation" featureint async_quantum = 500;int p2_commit = 0; // 2-phase commit flagtimer_unit t_begin, t_start, t_stop, t_end;THREAD_ID thMastRepl;THREAD_ID thWinput;MUTEX_T mutex;char mastername[HOST_MAX_NAMELENGTH] = nw_ConnectionString;;#ifdef _PERFHAint nRecords = 100000;#elsestatic void WatchDog();#endifmco_db_h ReplDb = (mco_db_h)0;static mco_db_h masterdb = 0;#ifdef CFG_SHARED_COMMIT mco_HA_params_t MasterParams = {MCO_MULTIPROCESS_COMMIT, MAX_REPLICAS };#else mco_HA_params_t MasterParams = {MCO_MASTER_MODE, MAX_REPLICAS };#endifint test_Mode = 0;int test_ExitMode = 0;/*************************** * program banners * ***************************/void usage(void){ Printf ("Usage: \tmcoha [-m][number_of_replicated_databases] | \n"); Printf ("\t [-ms][number_of_replicated_databases] | \n"); Printf ("\t [-r][database_index] | \n"); Printf ("\t [-s][database_index] | \n"); Printf ("\t [-sm][database index] | \n"); Printf ("\t [-cs=hostname[:port]] | \n"); Printf ("\t [-hs] | \n");#ifdef MCO_CFG_HA_2PHASE_COMMIT Printf ("\t [-p2] | \n");#endif //MCO_CFG_HA_2PHASE_COMMIT Printf ("\t [-c] | \n"); Printf ("\t [-a] | \n"); Printf ("\t [-d] | \n");#ifdef _PERFHA Printf ("\t [-n][records number] | \n");#endif Printf ("\t [-h]\n"); }void help(void){#ifdef _PERFHA Printf ("Usage: \tperfha [OPTIONS]\n\n"); Printf ("Demonstrates the perfomance of the eXtremeDB High Availability.\n\n" );#else Printf ("Usage: \tmcoha [OPTIONS]\n\n"); Printf ("Demonstrates the eXtremeDB High Availability Framework.\n\n" "Note that this example program illustrates how the HA Framework \"can\" be\n" "used, not how it \"must\" be used. For instance, this example uses multiple\n" "replica processes, one for each database, when a \"master\" has more than one\n" "database to replicate. Another implementation could use multiple threads\n" "instead of processes.\n\n");#endif Printf ("Options:\n\n"); Printf ("-m[N] Runs the application as a \"main master\" that replicates\n" " N \"master\" databases.\n" " \"Main master\" creates a \"commit thread\" that provides the\n" " context in which all (main and secondary) master node\n" " applications execute their database commits\n" " N = 1,2,3 or 4, N = 1 by default\n\n" " Example: mcoha -m\n" " Runs as the \"main master\" process that replicates one database\n\n"); Printf ("-ms[N] Runs the application as a \"secondary master\" that replicates\n" " N \"master\" databases\n" " \"Secondary master\" database commits are executed in the context\n" " of the \"main master\" commit thread\n" " N = 1,2,3 or 4, N = 1 by default\n\n" " Example: mcoha -ms\n" " Runs as a \"secondary master\" process that replicates one database\n\n"); Printf ("-r[I] Runs the application as a \"replica\" attached to the database\n" " with the index I. When multiple master databases are replicated,\n" " each \"master\" database is replicated by its own \"replica\" process\n" " I = 1,2,3 or 4, I = 1 by default\n\n" " Example: mcoha -r\n" " Runs as a \"replica\" process attached to the first master\n" " database\n\n"); Printf ("-s[I] The same as rI, except the replica becomes the \"master\"\n" " if the current \"master\" process has failed\n" " I = 1,2,3 or 4, I = 1 by default\n\n" " Example: mcoha -s\n" " Runs as a \"replica\" process attached to the first replicated\n" " database and takes over in the case of the current \"master\"\n" " failure\n\n"); Printf ("-sm[I] Runs as a \"replica\". Synchronizes the database referenced by the\n" " index I and continues running as a \"master\" process.\n" " I = 1,2,3 or 4, I = 1 by default\n\n" " This mode is used when re-starting a failed \"master\" and you\n" " want it to resume the role of \"master\" after re-synchronizing\n" " with the replica that became master during the failover.\n" " The replica turned master will revert to a replica\n\n" " Example: mcoha -sm\n" " Runs as a \"replica\" process attached to the first replicated\n" " database and becomes a \"master\" after the initial database\n" " synchronization is complete\n\n"); Printf ("-hs Turns on Hot Synchronization\n\n");#ifdef MCO_CFG_HA_2PHASE_COMMIT Printf ("-p2 turn on 2-phase commit\n\n" );#endif //MCO_CFG_HA_2PHASE_COMMIT #ifdef _PERFHA Printf ("-n[I] Number of records database works with.\n" " I = 10 - 1000000. The default is 200000\n\n" " Example: mcoha -n\n" );#endif Printf ("-a Asynchronous mode of replication\n" " Example: mcoha -m -a\n\n"); Printf ("-d Master cycle delay in asynchronous mode of replication, imitates \n" " irregular updates of the database. By default is 500 ms.\n" " Example: mcoha -m -a -d1000\n\n"); Printf ("-c turns on \"replica cancellation\" feature. If this option is set\n" " in replica mode, then you may cancel replica entering \"b<Enter>\"\n\n"); Printf ("-cs sets the address of the master. Format of the option:\n" " -cs=hostname[:port] for TCP/UDP\n" " -cs=pipename for pipes. The last two symbols of port or pipename\n" " must be \"00\"\n\n" ); Printf ("-h help\n");}/* parsing command line */int parse_cmd(int argc, char **argv ){ int i, flag = 1; char *p; if (argc < 2) { usage(); return 1; } for (i=1;i<argc;i++) { p=argv[i]; if (strstr(p,"--test")) test_Mode=1; else { if ( (p[0] == '/')||(p[0] == '-') ) { if(p[1] == 'h') { if(p[2] == 's') { flag = 0; MasterParams.mode_flags |= MCO_HAMODE_HOTSYNCH; } else { help(); return 1; } } if(p[1] == 'r') { flag = 0; CurrentDbInstance = atoi((const char*) &p[2]); if(--CurrentDbInstance >= MAX_HA_INSTANCES) CurrentDbInstance = 0; continue; } if(p[1] == 'm') { flag = 0; if( p[2] == 's') { main_master = 0; if(!(NumberOfDb=atoi((const char*) &p[3]))) NumberOfDb = 1; else { if( NumberOfDb > MAX_HA_INSTANCES ) NumberOfDb = MAX_HA_INSTANCES; } continue; } else if( p[2] == 'c') { if( p[3] == 'n') { MasterParams.mode_flags |= (MCO_HAMODE_MCAST|MCO_HAMODE_MCAST_NOCONFIRM); } else { MasterParams.mode_flags |= (MCO_HAMODE_MCAST); } continue; } else { main_master = 1; if(!( NumberOfDb=atoi((const char*) &p[2]) ) ) NumberOfDb = 1; else { if( NumberOfDb > MAX_HA_INSTANCES ) NumberOfDb = MAX_HA_INSTANCES; } continue; } continue; } if(p[1] == 's') { flag = 0; replica_mode = 0; if(p[2] == 'm') { replica_mode++; CurrentDbInstance = atoi((const char*) &p[3]); if(--CurrentDbInstance >= MAX_HA_INSTANCES) CurrentDbInstance = 0; } else { CurrentDbInstance = atoi((const char*) &p[2]); if(--CurrentDbInstance >= MAX_HA_INSTANCES) CurrentDbInstance = 0; } continue; } if(p[1] == 'c') { flag = 0; if(p[2] == 's') { char * p1 = &p[3]; flag = 0; if((*p1 == '=') || ((*p1 == ':'))) p1++; strcpy(mastername,p1); } else cancel_flag = 1; continue; } if(p[1] == 'a') { flag = 0; MasterParams.mode_flags |= (MCO_HAMODE_ASYNCH); continue; } if(p[1] == 'd') { flag = 0; async_quantum = atoi((const char*) &p[2]); continue; } #ifdef MCO_CFG_HA_2PHASE_COMMIT if(p[1] == 'p') { if(p[2] == '2') { flag = 0; p2_commit = 1; continue; } } #endif //MCO_CFG_HA_2PHASE_COMMIT #ifdef _PERFHA if(p[1] == 'n') { flag = 0; if( (nRecords = atoi((const char*) &p[2])) == 0) nRecords = 200000; continue; } #endif } else { help(); return 1; } } } return flag;}void _SH_(void){ Printf("\neXtremeDB-HA runtime version %d.%d, build %d\n\n", MCO_COMP_VER_MAJOR, MCO_COMP_VER_MINOR, MCO_COMP_BUILD_NUM);}#ifdef CFG_ASYNCHRONOUS_REPLICATION/**************************************************** * Asynchronous commit thread ****************************************************/THREAD_PROC_DEFINE(AsyncCommit, arg){ ha_h ha = (ha_h)arg;// mco_trans_h t; // transaction handle mco_db_h db; MCO_RET rc; THREAD_PROC_MODE(); if((rc=mco_db_connect(ha->dbName, &db )) != MCO_S_OK) { Printf("AsyncCommit: error connecting database %d\n", rc); return; } for(;;) { if(stop_flag) return;/* wait for async commit event and output the portion of data to the stream */ if( (mco_HA_async_send_data_to_replicas( db )) != MCO_S_OK) break;/* This delay is necessary in order to optimize data transfer quantum*/ Sleep(1); }}THREAD_PROC_FINISH_DEFINE#endif //CFG_ASYNCHRONOUS_REPLICATION/**************************************************** * master - replica working thread ****************************************************/THREAD_PROC_DEFINE(Working, arg){ PROCESS_MASKS(); // for LINUX, QNX, SOLARIS THREAD_PROC_MODE(); if(main_master >= 0) master(1, (ha_h)arg); else replica((ha_h)arg);}THREAD_PROC_FINISH_DEFINE/**************************************************** * master - replica procedure (thread for win32) ****************************************************/#ifndef _WIN32void mastrepl(void *arg)#else //_WIN32THREAD_PROC_DEFINE(mastrepl, arg)#endif //_WIN32{ int i, j; mco_db_h db = 0;#ifndef _PERFHA make_strings();#endif //_PERFHA synch_flag = NumberOfDb; if(main_master < 0) {/****************************************************** * Replica Mode ******************************************************/ ha_h ha; wdt_flag = -1;/* * create the only instance of HA for replication. Database isn't open yet after the * call of this dunction. */ if( (ha=haInstances[0]=CreateHAinstance(CurrentDbInstance)) == 0) { Printf("Can't create HA instance"); EXIT(-1); } createThread(Working, ha, &haInstances[0]->hWorking); } else { /****************************************************** * Master Mode, initialize master's database ******************************************************/ Printf("\n*** Master is being started "); t_begin = mco_system_get_current_time(); // save time of test begining if(!main_master) { /********************************************************************** * slave master * attempt to connect to existing databases **********************************************************************/ Printf("in \"slave master\" mode ***\n"); for (i=0, j = 0; i < NumberOfDb; i++) { /* connect to the database, obtain a database handle */ if( (haInstances[j] = ConnectToDatabase( i )) != 0) { createThread(Working, (PVOID)haInstances[j], &haInstances[j]->hWorking ); j++; } } if((NumberOfDb=j) == 0) { Printf("\nMaster: could not find any existing database!\n\n"); KILL(); EXIT(-1); } SET_PROCESS_PRIORITY(NORMAL_PRIORITY-2); Printf("\nMaster: %d existing databases are found\n", j); } else { // /******************************************************************************* * "main" master * * if this is the "main" master process, it creates a new database. * The "main" master process must also create a separate thread for the shared commit * if it works in the MULTIMASTER_SHM mode *******************************************************************************/ Printf("in \"main master\" mode ***\n"); Printf("\nMaster: creating the new database(s)\n"); /* create so many databases, as it is set in the option in command line. Obtain database's handles */ for (i=0, j = 0; i < NumberOfDb; i++) { if( (haInstances[i] = CreateDatabase( i )) == 0) continue; createThread(Working, (PVOID)haInstances[j], &haInstances[j]->hWorking); j++; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -