📄 mastrepl.c
字号:
} masterdb = db; // for exit handler t_start = mco_system_get_current_time(); }#ifdef _WIN32 while(synch_flag) SLEEP(1000);#endif //_WIN32}THREAD_PROC_FINISH_DEFINE/***************************************************************** ***************************************************************** * * main program entry * ***************************************************************** *****************************************************************/#ifdef MCO_HA_MASTERint argc_int = 2;char * argv_int[2] = { 0, "-m" };#endif /* MCO_HA_MASTER */#ifdef MCO_HA_REPLICAint argc_int = 2;char * argv_int[2] = { 0, "-s" };#endif /* MCO_HA_REPLICA */#ifndef WIN32_WCEint main( int argc, char **argv )#elseint __cdecl main( int argc, char **argv )#endif{/* The control channel for cancelling replica. Is used by main thread when "-c" option of command line is set. */ nw_channel_t chan; PROCESS_MASKS(); // for LINUX, QNX, SOLARIS MUTEX_INIT(&mutex); _SH_();/* parse command line */#ifndef _INTEGRITY if (0!=parse_cmd(argc,argv )) {#ifdef _WIN32_WCE char str[128]; printf("\npress ENTER to exit\n"); gets(str);#endif PROG_EXIT(0); }#else argv_int[0] = argv[0]; if (0!=parse_cmd(argc_int,argv_int )) { PROG_EXIT(0); }#endif /* _INTEGRITY */ mco_error_set_handler( &errhandler );#ifdef CFG_QNXMSG_CHANNEL if(NumberOfDb > 1) { printf("\nmultiple master's databases support is not implemented yet\n" "in QNX Messaging channel\n"); }#endif/* set current master mode, which could has been modified in parse_cmd() */#ifndef _WIN32 mastrepl(0); /* run working procedure */#else //_WIN32 createThread(mastrepl, 0, &thMastRepl); /* run working thread */#endif //_WIN32/* for POSIX/Linux only. WIN32 uses events.*/#ifndef _WIN32 if( cancel_flag ) {/************************************************** Waiting for cancellation point connection. Works if "-c" command line option is set. **************************************************/ if(main_master < 0 ) // if "replica" mode { MCO_RET rc;/* Initialize the socket support */ nw_init( &chan );/* Set "listen for cancel point connection" state *//* wait for the cancel point connection */ printf("*** listen for cancellation point connection ***\n"); if( (rc = nw_accept_cancel_point( &chan, CANCELPOINT_PORT, TM_ACCEPT_TIMEOUT ) ) != MCO_S_OK ) { printf("error accepting cancellation socket %ld\n", rc); EXIT(-1); } printf("*** cancellation point socket connected ***\n"); } }#endif //_WIN32/************************************************** * wait for a keystroke to stop test **************************************************/ if (main_master && test_Mode) Sleep(30000); {#ifdef _VXWORKS int i; FILE * f = stdin;#endif char s[256]; s[0] = 0; while(stop_flag == 0){#ifdef _VXWORKS i = 0; while(i == 0) { ioctl (f->_file, FIONREAD, (int)&i); Sleep(100); if( synch_flag == 0) { Printf("\n\nApplication is stopped\n"); return 0; } }#endif if((fgets(s,256,stdin)) == 0) { continue; } if (s[0] ==0 ) continue; if(s[0] == 'b') {/* input 'b' - cancel replica */ stop_flag = 2; replica_cancel( &chan ); break; } if( (s[0] != '\r') && (s[0] != '\n') ) continue; stop_flag = 1; break; } while( synch_flag ) Sleep(50); /* wait for terminatin of working threads */ if(stop_flag == 1) Printf("\n\nApplication interrupted by keystroke\n"); if(stop_flag == 2) { Printf("\n\nApplication is cancelled via nw_cancel()\n"); Printf("\n\nPress <Enter>\n"); fgets(s,256,stdin); } t_end = mco_system_get_current_time(); t_end -= t_stop; t_stop -= t_start; t_start -= t_begin; Printf("\n\n" "Initialization time:%7d.%03d\n" "Processing time: %7d.%03d\n" "Shutdown time: %7d.%03d\n" "Saving database to replica time: %ld ms\n", (int)t_start/1000,(int) t_start%1000, (int)t_stop/1000, (int)t_stop%1000, (int)t_end/1000, (int)t_end%1000, init_time ); if(main_master < 0 ) // if "replica" mode { if(haInstances[0]->db != (mco_db_h)0) { mco_HA_replica_stop( haInstances[0]->db ); } } ExitHandler(test_ExitCode); /* terminate the program gracefully */ } PROG_EXIT(test_ExitCode);}/***************************************************** ha_t & application classes methods *****************************************************//* close all database instances */void CloseDatabases(int flag){ int i; ha_h ha; mco_runtime_info_t run_info; mco_get_runtime_info( &run_info); for (i=0; i < NumberOfDb; i++) { ha = haInstances[i]; /* kill ListenToReplicas() thread */ if(ha->hConnThread != (THREAD_ID)INVALID_HANDLE_VALUE) cancelThread(ha->hConnThread); if(!flag) mco_nw_close((PVOID)ha); // close connections to replicas#ifdef CFG_SHARED_COMMIT if( (run_info.mco_shm_supported != 0) ) { /* kill SynchCommit() thread */ if(ha->hSynchcommit != (THREAD_ID)INVALID_HANDLE_VALUE) cancelThread(ha->hSynchcommit); } #endif if(ha->db != 0) { mco_db_disconnect(ha->db); // disconnect from the database #ifdef _SOLARIS if(main_master) #endif mco_db_close(ha->dbName); // close the database ha->db = 0; } /* kill AsyncCommit() thread */ if(MasterParams.mode_flags & MCO_HAMODE_ASYNCH) { cancelThread( ha->hAsyncWr ); } if( (0 == run_info.mco_shm_supported) && (ha->start_mem!=0) ) { free(ha->start_mem); // free database's memory ha->start_mem = 0; } } mco_runtime_stop(); printf("\ndatabases are closed\n");}/* initialize HA instance */void InitHAinstance(ha_h ha, int instance){ ha->baseChan.status = 0; ha->id = instance; ha->db = 0; ha->hConnThread = INVALID_HANDLE_VALUE; ha->hSynchcommit = INVALID_HANDLE_VALUE; // SynchCommit() thread handle ha->replicaMode = (replica_mode>0); // replica_mode >0 = "special mode" ha->isMainMaster= (main_master>0); // main_master >0 = "main_master" sprintf(ha->dbName,"monitorDB%d", instance); strcpy(ha->endpoint, nw_MasterName); sprintf(&ha->endpoint[strlen(ha->endpoint)-2],"%02d", ha->id*10);}/* create HA instance */ha_h CreateHAinstance(int instance){ ha_h ha; mco_runtime_info_t run_info; int size; if( (ha=(ha_h)malloc(sizeof(ha_t))) == 0) { Printf("\nCouldn't create HA instance!!!\n"); EXIT(-1); } InitHAinstance( ha, instance); mco_get_runtime_info( &run_info); if( run_info.mco_shm_supported ) {#ifdef _WIN32_WCE /* * shared memory is NOT supported under Windows CE! */ Printf("\n Shared memory is NOT supported under Windows CE !!!\n"); free(ha); EXIT(-1);#endif size = DBSIZE & (~(SHM_PAGESIZE-1)); if( (DBSIZE & (SHM_PAGESIZE-1))) size += SHM_PAGESIZE; ha->start_mem = (char*)MAP_ADDRESS+(size*instance); } else { if ((ha->start_mem = (char*)malloc(DBSIZE)) == 0) { Printf("Couldn't allocate memory\n"); free(ha); EXIT(-1); } } return ha; }/* create threads for HA instance */void InitHAthreads(ha_h ha){ mco_runtime_info_t run_info; mco_get_runtime_info( &run_info); if (main_master) { // for "main" master only /* start the ListenToReplicas thread*/ Sleep(1); createThread(ListenToReplicas, ha, &ha->hConnThread);#ifdef CFG_SHARED_COMMIT if( (run_info.mco_shm_supported != 0) ) { /* * Creation of the thread that implements shared commit. It is necessary * if you need to run several masters sharing the same database in the shared memory segment. */ Sleep(1); MUTEX_LOCK(&mutex); createThread(SynchCommit, (PVOID)ha->db, &ha->hSynchcommit); MUTEX_LOCK(&mutex); MUTEX_UNLOCK(&mutex); /************************************************************** * * the watchdog allows to shutdown the process if the "commit" * thread is interrupted. Only needed in the MULIMASTER_SHM mode * **************************************************************/ /* create watchdog */ if(!wdt_init) { mco_create_watchdog(COMMIT_WATCHDOG_TIME, (MCO_PWATCHDOG)WatchDog); wdt_init++; } }#endif //CFG_SHARED_COMMIT#ifdef CFG_ASYNCHRONOUS_REPLICATION /* if asynchronous commit mode then create the asynchronous commit thread for each database instance */ if(MasterParams.mode_flags & MCO_HAMODE_ASYNCH) { createThread(AsyncCommit, (PVOID)ha, &ha->hAsyncWr); }#endif //CFG_ASYNCHRONOUS_REPLICATION }}/* creation of the new database instance */ha_h CreateDatabase(int instance){ ha_h ha; MCO_RET rc; ha = CreateHAinstance(instance); mco_db_kill(ha->dbName);#ifdef _PERFHA rc = mco_db_open(ha->dbName, perf2_get_dictionary(), ha->start_mem, DBSIZE, (uint2) DB_PAGESIZE );#else //_PERFHA rc = mco_db_open(ha->dbName, monitorDB_get_dictionary(), ha->start_mem, DBSIZE, (uint2) DB_PAGESIZE );#endif //_PERFHA if ( rc ) { Printf( "\nerror creating database^ %d\n", rc); free(ha); EXIT(-1); } /* connect to the database, obtain a database handle */ if( (rc=mco_db_connect(ha->dbName, &ha->db )) !=0) { Printf( "\nCouldn't connect to database^ %d\n", rc); free(ha); EXIT(-1); }/* reserve space for the data buffer for asynchronous replication */ MasterParams.async_databuf = malloc(ASYNC_DATABUF_SIZE); MasterParams.bufsize = ASYNC_DATABUF_SIZE; mco_HA_set_mode(ha->db, &MasterParams); // set MASTER mode InitHAthreads(ha); free_mem( ha->db, 1 ); return ha;}/* connect to existing database instance */ha_h ConnectToDatabase( int instance){ ha_h ha; MCO_RET rc; int i; ha = (ha_h)malloc(sizeof(ha_t)); InitHAinstance( ha, instance); sprintf(ha->dbName,"monitorDB%d", instance); for (i=0;i < 10; i++) { /* connect to the database, obtain a database handle */ if(!(rc=mco_db_connect(ha->dbName, &ha->db ))) { break; } sprintf(ha->dbName,"monitorDB%d%d",instance, i); } if(i == 10) { free(ha); ha = 0; } else {#ifdef CFG_SHARED_COMMIT mco_runtime_info_t run_info; mco_get_runtime_info( &run_info); if( (run_info.mco_shm_supported != 0) ) { /************************************************************** * * the watchdog allows to shutdown the process if the "commit" * thread is interrupted. Only needed in the MULIMASTER_SHM mode * **************************************************************/ /* create watchdog */ if(!wdt_init) { mco_create_watchdog(COMMIT_WATCHDOG_TIME, (MCO_PWATCHDOG)WatchDog); wdt_init++; } }#endif //CFG_SHARED_COMMIT } return ha;}/***************************************************** * eXtremeDB error handler *****************************************************/void errhandler( int n ){ Printf( "\neXtremeDB runtime fatal error: %d\n", n ); EXIT(-1);}/* * Exit handler provides emergency & normal exit from the program * It is called by working thread or by watchog handler */void ExitHandler(int flag){ wdt_flag = -1; mco_kill_watchdog(); CloseDatabases(flag);}/* returns amount of database's free memory */uint4 free_mem( mco_db_h db, int verbose){// uint4 totalpg, freepg; mco_puint totalpg, freepg; uint2 pgsize; mco_db_free_pages(db, &freepg); mco_db_total_pages(db, &totalpg); mco_db_page_size(db, &pgsize); if (verbose) Printf ("Maximum database size is %dK, avialable %dK\n", totalpg*pgsize / 1024, freepg*pgsize / 1024); return (freepg*pgsize / 1024);}/* * Watchdog callback procedure */static void WatchDog(){ if(wdt_flag > 0) { wdt_flag = -1;#if ( !defined(_WIN32) && !defined(_QNX) ) cancelThread(thMastRepl); /* stop working thread */#endif printf("\n\nWatchdog time is expired, exiting\n"); ExitHandler(-1); EXIT(-1); } if(!wdt_flag) wdt_flag++;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -