📄 replica.c
字号:
/***************************************************************** * * * replica.c * * * * This file is a part of the eXtremeDB-HA Application Framework * * It demonstrates McObject High Availabitity support and * * explains how to work with it. * * * ***************************************************************** * Copyright (c) 2001-2006 McObject LLC * * All Rights Reserved * *****************************************************************//* * ++ * * PROJECT: Ha Framework * * SUBSYSTEM: HA support * * MODULE: replica.c * * ABSTRACT: replica part of the HA framework * * * VERSION: 1.0 * * HISTORY: * 1.0- 1 AD 01-Mar-2003 Created it was * 2 AG 19-Mar-2003 Ported to Linux * 3 SS 08-Jul-2003 Reconstructed * 4 SS 10-Jan-2005 Replica notification was added * 4 SS 14-Feb-2005 Replica synchronous events were added * * -- *//* * This code performs the connection to the master and * replicaton of master's database */#include "app.h"#include "mcoHA.h"#ifdef _PERFHA #include "perf2.h"#else //_PERFHA #include "monitorDB.h" #include "sensor.h"#endif //_PERFHA#include "signal.h"#include "stdio.h"extern void errhandler( int n );extern int wdt_flag; // the flag for shared commit watchdogextern int main_master;extern int replica_mode;extern long synch_flag;extern long stop_flag;extern THREAD_ID stopwait;extern THREAD_ID thMastRepl;extern char mastername[];extern ha_h haInstances[MAX_HA_INSTANCES];extern timer_unit t_begin, t_start, t_stop, t_end;extern mco_HA_params_t MasterParams;static ha_h Replha = NULL;THREAD_PROC_DECLARE(StopWait);extern void make_strings( void );static mco_bool ReplicaFailCallback(MCO_RET ErrorCode, mco_bool commit_flag);/* the prototype for replica notifying procedure */static void replica_notifying( uint2 notification_code, /* notification code*/ uint4 param1, /* reserved for special cases */ void * param2, /* reserved for special cases */ void * context /* pointer to user defined context */ ); /* * This structure describes timeouts that are necessary to set the finite times to functions * of attaching and detaching replicas and of transaction commits */static mco_connection_param_t replica_conpar ={ TM_INFINITE, TM_ATTACH_REPLICA, TM_REPLICA_COMMIT, TM_WAIT_FOR_DATA, TM_SYNCH, replica_notifying, /* pointer to the notification procedure */ 0 /* notification context pointer */#ifdef MCO_CFG_HA_2PHASE_COMMIT /* special commit interface *//* set Replica Fail Callback */ ,ReplicaFailCallback#endif //MCO_CFG_HA_2PHASE_COMMIT};/* User defined replica cancel routine. It is platform dependent. */static mco_bool ReplicaFailCallback(MCO_RET ErrorCode, mco_bool commit_flag){ printf("Replica Fail Callback\n"); printf("Error code = %d\n", ErrorCode); if(commit_flag) printf("transaction is complete\n"); else printf("transaction is incomplete\n"); return 0;}/** cancels replica commit loop */void replica_cancel(nw_channel_h chan){#ifdef _WIN32 chan->event1 = Replha->baseChan.event1;#endif if(Replha != NULL) { printf("*** cancelling replication ***\n"); nw_cancel(chan); }}#ifdef CFG_ASYNCHRONOUS_EVENTSstatic ThrParam all_tp[NATHREADS];extern void threads(ThrParam *all_tp_ptr);#endif //CFG_ASYNCHRONOUS_EVENTS#ifdef CFG_SYNCHRONOUS_EVENTS/* Handler for the "<new>" event. Reads the autoid and prints it out */MCO_RET new_handler(mco_trans_h t, Sensor1 * obj, MCO_EVENT_TYPE et, /*INOUT */void *param) { printf("Event \"New object\"\n"); return MCO_S_OK;}/* Handler for the "<delete>" event. Note that the handler is called before * the current transaction is committed. Therefore, the object is still * valid; the object handle is passed back to the handler and is used to obtain the * autoid of the object. * The event's handler return value is passed into the "delete" function and is * later examined by the mco_trans_commit(). If the value is anything but * MCO_S_OK, the transaction is rollbacked. In this sample every other delete * transaction is rolled back. */MCO_RET delete_handler( mco_trans_h t, Sensor1 * obj, MCO_EVENT_TYPE et, /*INOUT*/ void *user_param) { printf("Event \"Object is being deleted\"\n"); return MCO_S_OK;}/* Handler for the "update" event.This handler is called after the update transaction * was commited - hence the value of the field being changed is reported unchanged yet. */MCO_RET update_handler1( mco_trans_h t, Sensor1 * obj, MCO_EVENT_TYPE et, /*INOUT*/ void *param) { int4 v4; Sensor1_value4_get ( obj, &v4); printf("Event \"Update (before commit)\"\n", v4); return MCO_S_OK;}static int register_events(mco_db_h db) { MCO_RET rc; mco_trans_h t; mco_trans_start(db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t); mco_register_newEvent_handler( t, & new_handler, (void*)0 ); mco_register_deleteEvent_handler(t, & delete_handler, (void * )0); mco_register_update1Event_handler( t, & update_handler1, (void *)0, MCO_AFTER_UPDATE ); rc = mco_trans_commit(t); return rc;}#endif //CFG_SYNCHRONOUS_EVENTS/* the prototype for replica notifying procedure */static void replica_notifying( uint2 notification_code, /* notification code*/ uint4 param1, /* reserved for special cases */ void * param2, /* reserved for special cases */ void * context /* pointer to user defined context */ ){ switch(notification_code) { default: Printf("\r\n** Notification ** Replica's been notified code = %d, param1 = %ld, param2 = %08lx\r\n", notification_code, /* notification code*/ param1, /* reserved for special cases */ param2 /* reserved for special cases */ ); break; case MCO_REPL_NOTIFY_CONNECTED: /* "connected" notification */ Printf("\r\n** Notification ** Replica's been connected\r\n" ); break; case MCO_REPL_NOTIFY_CONNECT_FAILED: /* "connect failed" notification */ Printf("\r\n** Notification ** Connection failed error = %d\r\n", param1 ); break; case MCO_REPL_NOTIFY_DB_EQUAL: /* "no need to load DB" notification */ Printf("\r\n** Notification ** Master & Replica databases are equal\r\n" ); break; case MCO_REPL_NOTIFY_DB_LOAD_BEGIN: /* "begin loading DB" notification */ Printf("\r\n** Notification ** Replica's begun loading\r\n" ); break; case MCO_REPL_NOTIFY_DB_LOAD_FAILED: /* "loading failed" notification, "param1" of notification callback contains MCO_RET code */ Printf("\r\n** Notification ** Database loading failed error = %d\r\n", param1 ); break; case MCO_REPL_NOTIFY_DB_LOAD_OK: /* "succesful loading" notification, "param1" of notification callback contains MCO_E_HA_REPLICA_STOP_REASON code */ Printf("\r\n** Notification ** Database's been loaded OK\r\n" );#ifdef CFG_ASYNCHRONOUS_EVENTS { int i; for(i=0; i< NATHREADS; i++) { all_tp[i].db = &Replha->db; } } threads( all_tp);#endif //CFG_ASYNCHRONOUS_EVENTS#ifdef CFG_SYNCHRONOUS_EVENTS { MCO_RET rc; if ((rc = register_events(Replha->db)) != MCO_S_OK) { printf("Error registering events, %d\n", rc); } }#endif //CFG_SYNCHRONOUS_EVENTS break; case MCO_REPL_NOTIFY_COMMIT_FAILED: /* "commit failed" notification, "param1" of notification callback contains MCO_RET code */ Printf("\r\n** Notification ** Commit failed error = %d\r\n", param1 ); break; case MCO_REPL_NOTIFY_REPLICA_STOPPED: /* "stopped" notification, */ { char *reason; switch (param1) { default: reason = "Replica stopped by the keysroke"; break; case MCO_HA_REPLICA_HANDSHAKE_FAILED: reason = "MCO_HA_REPLICA_HANDSHAKE_FAILED"; test_ExitCode=10; break; case MCO_HA_REPLICA_CONNECTION_ABORTED: reason = "MCO_HA_REPLICA_CONNECTION_ABORTED"; test_ExitCode=11; break; case MCO_HA_REPLICA_MASTER_REQUESTED_DISCONNECT: reason = "MCO_HA_REPLICA_MASTER_REQUESTED_DISCONNECT"; break; case MCO_HA_REPLICA_STOPPED_BY_LOCAL_REQUEST: reason = "MCO_HA_REPLICA_STOPPED_BY_LOCAL_REQUEST"; break; case MCO_HA_REPLICA_BECOMES_MASTER: reason = "MCO_HA_REPLICA_BECOMES_MASTER"; break; } Printf ("\n** Notification ** Replica stopped, reason: %d (%s)\n", param1, reason); } break; case MCO_REPL_NOTIFY_HOTSYNC: /* "hot sync" notification */ Printf("\r\n** Notification ** Hot sync's being started\r\n"); break; case MCO_REPL_NOTIFY_EOHOTSYNC: /* "end of hot sync" notification */ Printf("\r\n** Notification ** End of hot sync\r\n"); break; }}/****************************************************** * Replica Mode procedure, * replica's commit loop *****************************************************/#define ARG_NONE 0#define ARG_BECOME_MASTER 1void replica(ha_h ha){ mco_db_h db = 0; char * reason = 0; MCO_RET rc; MCO_E_HA_REPLICA_STOP_REASON StopReason = MCO_HA_REPLICA_CONNECTION_ABORTED; t_begin = mco_system_get_current_time(); //test begining time wdt_flag = -1; if(replica_mode >= 0) { // analize replica mode if(replica_mode) Printf("\n*** Replica is being started in special mode ***\n\n"); else Printf("\n*** Replica is being started in switch-to-master mode ***\n\n"); } else { Printf("\n*** Replica is being started in passive mode ***\n\n"); } Replha = ha; mco_error_set_handler( &errhandler );/************************************** try to connect to the master **************************************/ /* add HA instance index to master logical name */#if defined(CFG_TCP_SOCKET_CHANNEL) || defined(CFG_UDP_SOCKET_CHANNEL) if(strchr(mastername, ':') == 0) { sprintf(mastername,"%s:%s",mastername,nw_DefaultPort); }#endif sprintf(&mastername[strlen(mastername)-2],"%02d", ha->id*10); Printf ( "Replica: Open communication channel %s, attaching to master (blocking call) .... \n", mastername); sprintf(ha->dbName,"monitorDB%d",ha->id); ha->db = 0; t_start = mco_system_get_current_time(); //test start time /* try to attach to current master */ rc = mco_nw_attach_master(&ha->db, mastername, &replica_conpar, (MCO_E_HA_REPLICA_STOP_REASON*)&StopReason, ha->dbName,#ifdef _PERFHA perf2_get_dictionary(),#else //_PERFHA monitorDB_get_dictionary(),#endif //_PERFHA ha->start_mem, DBSIZE, TM_CONNECTION_TIMEOUT, (PVOID)ha ); t_stop = mco_system_get_current_time(); //test stop time#ifdef CFG_ASYNCHRONOUS_EVENTS { int i; for(i=0; i< NATHREADS; i++) { cancelThread(all_tp[i].tid); } }#endif //CFG_ASYNCHRONOUS_EVENTS /* print the reason by wich replica is disconnected from master */ switch (StopReason) { default: reason = "Replica stopped by the keysroke"; break; case MCO_HA_REPLICA_HANDSHAKE_FAILED: reason = "MCO_HA_REPLICA_HANDSHAKE_FAILED";test_ExitCode=10; break; case MCO_HA_REPLICA_CONNECTION_ABORTED: reason = "MCO_HA_REPLICA_CONNECTION_ABORTED";test_ExitCode=11; break; case MCO_HA_REPLICA_MASTER_REQUESTED_DISCONNECT: reason = "MCO_HA_REPLICA_MASTER_REQUESTED_DISCONNECT"; break; case MCO_HA_REPLICA_STOPPED_BY_LOCAL_REQUEST: reason = "MCO_HA_REPLICA_STOPPED_BY_LOCAL_REQUEST"; break; case MCO_HA_REPLICA_BECOMES_MASTER: reason = "MCO_HA_REPLICA_BECOMES_MASTER"; break; } Printf ("\nReplica: stopped, reason: %d (%s)\n", StopReason, reason); db = ha->db; switch(StopReason) { case MCO_HA_REPLICA_CONNECTION_ABORTED: case MCO_HA_REPLICA_BECOMES_MASTER: if (replica_mode >= 0) // analize -s switch { Printf("\n*** Replica switching to master ***\n"); /* * This timeout is used in order to give the kernel * opportunity to detach master's socket from the faulty process * before the running replica on the same machine uses the socket. */ Sleep(500); wdt_flag = 0; ha->isMainMaster= main_master = 1; InitHAthreads(ha); t_start = mco_system_get_current_time(); mco_HA_set_mode(db, &MasterParams); // set MASTER mode synch_flag++; master(0, ha); // flag = 0 - database is already initialized break; } default: break; }/*create the report file*/ if(ha->db != 0) { ReplicaReport(); } synch_flag = 0; ExitHandler(test_ExitCode); /* terminate the program gracefully */ if( !stop_flag ) { EXIT(test_ExitCode); KILL(); }}void ReplicaReport(){#ifndef _PERFHA int i; FILE * f; ha_h ha; mco_db_h db; mco_trans_h t; // transaction handle MCO_RET rc; char s[200]; Printf("\n*** Replica complete ***\n"); Printf( "\nGenerating reports, please wait..." ); for ( i=1; ; i++ ) { sprintf(s, "%smonrepl%d.out",LOG_PATH, i); if((f = fopen(s, "r"))) { fclose(f); } else break; } f = fopen(s, "w"); Printf ("\n File %s is opened\n", s); ha = haInstances[0]; db = ha->db; fprintf( f, "\ndatabase instance %d ***\n", i ); if ( (rc=mco_trans_start( db, MCO_READ_ONLY, MCO_TRANS_FOREGROUND, &ha->t )) ) { Printf( "\nReplica: error starting read transaction %d\n", rc ); exit( 1 ); } t = ha->t; s1_output(t, f); Printf (" Sensor1 data is written\n"); mco_trans_commit(t);#endif //_PERFHA}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -