⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rep_method.c

📁 这是国外的resip协议栈
💻 C
📖 第 1 页 / 共 3 页
字号:
/*- * See the file LICENSE for redistribution information. * * Copyright (c) 2001-2004 *	Sleepycat Software.  All rights reserved. * * $Id: rep_method.c,v 1.167 2004/10/07 17:20:12 bostic Exp $ */#include "db_config.h"#ifndef NO_SYSTEM_INCLUDES#include <sys/types.h>#ifdef HAVE_RPC#include <rpc/rpc.h>#endif#include <stdlib.h>#include <string.h>#endif#ifdef HAVE_RPC#include "db_server.h"#endif#include "db_int.h"#include "dbinc/db_page.h"#include "dbinc/btree.h"#include "dbinc/log.h"#include "dbinc/txn.h"#ifdef HAVE_RPC#include "dbinc_auto/rpc_client_ext.h"#endifstatic int __rep_abort_prepared __P((DB_ENV *));static int __rep_bt_cmp __P((DB *, const DBT *, const DBT *));static int __rep_elect	       __P((DB_ENV *, int, int, int, u_int32_t, int *, u_int32_t));static int __rep_elect_init	       __P((DB_ENV *, DB_LSN *, int, int, int, int *, u_int32_t *));static int __rep_flush __P((DB_ENV *));static int __rep_restore_prepared __P((DB_ENV *));static int __rep_get_limit __P((DB_ENV *, u_int32_t *, u_int32_t *));static int __rep_set_limit __P((DB_ENV *, u_int32_t, u_int32_t));static int __rep_set_request __P((DB_ENV *, u_int32_t, u_int32_t));static int __rep_set_rep_transport __P((DB_ENV *, int,    int (*)(DB_ENV *, const DBT *, const DBT *, const DB_LSN *,    int, u_int32_t)));static int __rep_start __P((DB_ENV *, DBT *, u_int32_t));static int __rep_wait __P((DB_ENV *, u_int32_t, int *, u_int32_t));/* * __rep_dbenv_create -- *	Replication-specific initialization of the DB_ENV structure. * * PUBLIC: void __rep_dbenv_create __P((DB_ENV *)); */void__rep_dbenv_create(dbenv)	DB_ENV *dbenv;{#ifdef HAVE_RPC	if (F_ISSET(dbenv, DB_ENV_RPCCLIENT)) {		dbenv->rep_elect = __dbcl_rep_elect;		dbenv->rep_flush = __dbcl_rep_flush;		dbenv->rep_process_message = __dbcl_rep_process_message;		dbenv->rep_start = __dbcl_rep_start;		dbenv->rep_stat = __dbcl_rep_stat;		dbenv->rep_stat_print = NULL;		dbenv->get_rep_limit = __dbcl_rep_get_limit;		dbenv->set_rep_limit = __dbcl_rep_set_limit;		dbenv->set_rep_request = __dbcl_rep_set_request;		dbenv->set_rep_transport = __dbcl_rep_set_rep_transport;	} else#endif	{		dbenv->rep_elect = __rep_elect;		dbenv->rep_flush = __rep_flush;		dbenv->rep_process_message = __rep_process_message;		dbenv->rep_start = __rep_start;		dbenv->rep_stat = __rep_stat_pp;		dbenv->rep_stat_print = __rep_stat_print_pp;		dbenv->get_rep_limit = __rep_get_limit;		dbenv->set_rep_limit = __rep_set_limit;		dbenv->set_rep_request = __rep_set_request;		dbenv->set_rep_transport = __rep_set_rep_transport;	}}/* * __rep_open -- *	Replication-specific initialization of the DB_ENV structure. * * PUBLIC: int __rep_open __P((DB_ENV *)); */int__rep_open(dbenv)	DB_ENV *dbenv;{	DB_REP *db_rep;	int ret;	if ((ret = __os_calloc(dbenv, 1, sizeof(DB_REP), &db_rep)) != 0)		return (ret);	dbenv->rep_handle = db_rep;	ret = __rep_region_init(dbenv);	return (ret);}/* * __rep_start -- *	Become a master or client, and start sending messages to participate * in the replication environment.  Must be called after the environment * is open. * * We must protect rep_start, which may change the world, with the rest * of the DB library.  Each API interface will count itself as it enters * the library.  Rep_start checks the following: * * rep->msg_th - this is the count of threads currently in rep_process_message * rep->start_th - this is set if a thread is in rep_start. * rep->handle_cnt - number of threads actively using a dbp in library. * rep->txn_cnt - number of active txns. * REP_F_READY - Replication flag that indicates that we wish to run * recovery, and want to prohibit new transactions from entering and cause * existing ones to return immediately (with a DB_LOCK_DEADLOCK error). * * There is also the renv->rep_timestamp which is updated whenever significant * events (i.e., new masters, log rollback, etc).  Upon creation, a handle * is associated with the current timestamp.  Each time a handle enters the * library it must check if the handle timestamp is the same as the one * stored in the replication region.  This prevents the use of handles on * clients that reference non-existent files whose creation was backed out * during a synchronizing recovery. */static int__rep_start(dbenv, dbt, flags)	DB_ENV *dbenv;	DBT *dbt;	u_int32_t flags;{	DB_LOG *dblp;	DB_LSN lsn;	DB_REP *db_rep;	REP *rep;	u_int32_t repflags;	int announce, init_db, redo_prepared, ret, role_chg;	int sleep_cnt, t_ret;#ifdef DIAGNOSTIC	DB_MSGBUF mb;#endif	PANIC_CHECK(dbenv);	ENV_ILLEGAL_BEFORE_OPEN(dbenv, "DB_ENV->rep_start");	ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_start", DB_INIT_REP);	db_rep = dbenv->rep_handle;	rep = db_rep->region;	if ((ret = __db_fchk(dbenv, "DB_ENV->rep_start", flags,	    DB_REP_CLIENT | DB_REP_MASTER)) != 0)		return (ret);	/* Exactly one of CLIENT and MASTER must be specified. */	if ((ret = __db_fcchk(dbenv,	    "DB_ENV->rep_start", flags, DB_REP_CLIENT, DB_REP_MASTER)) != 0)		return (ret);	if (!LF_ISSET(DB_REP_CLIENT | DB_REP_MASTER)) {		__db_err(dbenv,	"DB_ENV->rep_start: replication mode must be specified");		return (EINVAL);	}	/* We need a transport function. */	if (dbenv->rep_send == NULL) {		__db_err(dbenv,    "DB_ENV->set_rep_transport must be called before DB_ENV->rep_start");		return (EINVAL);	}	/*	 * If we are about to become (or stay) a master.  Let's flush the log	 * to close any potential holes that might happen when upgrading from	 * client to master status.	 */	if (LF_ISSET(DB_REP_MASTER) && (ret = __log_flush(dbenv, NULL)) != 0)		return (ret);	MUTEX_LOCK(dbenv, db_rep->rep_mutexp);	/*	 * We only need one thread to start-up replication, so if	 * there is another thread in rep_start, we'll let it finish	 * its work and have this thread simply return.	 */	if (rep->start_th != 0) {		/*		 * There is already someone in rep_start.  Return.		 */		RPRINT(dbenv, rep, (dbenv, &mb, "Thread already in rep_start"));		goto err;	} else		rep->start_th = 1;	role_chg = (F_ISSET(rep, REP_F_CLIENT) && LF_ISSET(DB_REP_MASTER)) ||	    (F_ISSET(rep, REP_F_MASTER) && LF_ISSET(DB_REP_CLIENT));	/*	 * Wait for any active txns or mpool ops to complete, and	 * prevent any new ones from occurring, only if we're	 * changing roles.  If we are not changing roles, then we	 * only need to coordinate with msg_th.	 */	if (role_chg)		__rep_lockout(dbenv, db_rep, rep, 0);	else {		for (sleep_cnt = 0; rep->msg_th != 0;) {			if (++sleep_cnt % 60 == 0)				__db_err(dbenv,	"DB_ENV->rep_start waiting %d minutes for replication message thread",				    sleep_cnt / 60);			MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);			__os_sleep(dbenv, 1, 0);			MUTEX_LOCK(dbenv, db_rep->rep_mutexp);		}	}	if (rep->eid == DB_EID_INVALID)		rep->eid = dbenv->rep_eid;	if (LF_ISSET(DB_REP_MASTER)) {		if (role_chg) {			/*			 * If we're upgrading from having been a client,			 * preclose, so that we close our temporary database.			 *			 * Do not close files that we may have opened while			 * doing a rep_apply;  they'll get closed when we			 * finally close the environment, but for now, leave			 * them open, as we don't want to recycle their			 * fileids, and we may need the handles again if			 * we become a client and the original master			 * that opened them becomes a master again.			 */			if ((ret = __rep_preclose(dbenv, 0)) != 0)				goto errunlock;		}		redo_prepared = 0;		if (!F_ISSET(rep, REP_F_MASTER)) {			/* Master is not yet set. */			if (role_chg) {				if (rep->w_gen > rep->recover_gen)					rep->gen = ++rep->w_gen;				else if (rep->gen > rep->recover_gen)					rep->gen++;				else					rep->gen = rep->recover_gen + 1;				/*				 * There could have been any number of failed				 * elections, so jump the gen if we need to now.				 */				if (rep->egen > rep->gen)					rep->gen = rep->egen;				redo_prepared = 1;			} else if (rep->gen == 0)				rep->gen = rep->recover_gen + 1;			if (F_ISSET(rep, REP_F_MASTERELECT)) {				__rep_elect_done(dbenv, rep);				F_CLR(rep, REP_F_MASTERELECT);			}			if (rep->egen <= rep->gen)				rep->egen = rep->gen + 1;			RPRINT(dbenv, rep, (dbenv, &mb,			    "New master gen %lu, egen %lu",			    (u_long)rep->gen, (u_long)rep->egen));		}		rep->master_id = rep->eid;		/*		 * Note, setting flags below implicitly clears out		 * REP_F_NOARCHIVE, REP_F_INIT and REP_F_READY.		 */		rep->flags = REP_F_MASTER;		rep->start_th = 0;		MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);		dblp = (DB_LOG *)dbenv->lg_handle;		R_LOCK(dbenv, &dblp->reginfo);		lsn = ((LOG *)dblp->reginfo.primary)->lsn;		R_UNLOCK(dbenv, &dblp->reginfo);		/*		 * Send the NEWMASTER message first so that clients know		 * subsequent messages are coming from the right master.		 * We need to perform all actions below no master what		 * regarding errors.		 */		(void)__rep_send_message(dbenv,		    DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0);		ret = 0;		if (role_chg) {			ret = __txn_reset(dbenv);			MUTEX_LOCK(dbenv, db_rep->rep_mutexp);			F_CLR(rep, REP_F_READY);			rep->in_recovery = 0;			MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);		}		/*		 * Take a transaction checkpoint so that our new generation		 * number get written to the log.		 */		if ((t_ret = __txn_checkpoint(dbenv, 0, 0, DB_FORCE)) != 0 &&		    ret == 0)			ret = t_ret;		if (redo_prepared &&		    (t_ret = __rep_restore_prepared(dbenv)) != 0 && ret == 0)			ret = t_ret;	} else {		init_db = 0;		announce = role_chg || rep->master_id == DB_EID_INVALID;		/*		 * If we're changing roles from master to client or if		 * we never were any role at all, we need to init the db.		 */		if (role_chg || !F_ISSET(rep, REP_F_CLIENT)) {			rep->master_id = DB_EID_INVALID;			init_db = 1;		}		/* Zero out everything except recovery and tally flags. */		repflags = F_ISSET(rep, REP_F_NOARCHIVE |		    REP_F_RECOVER_MASK | REP_F_TALLY);		FLD_SET(repflags, REP_F_CLIENT);		rep->flags = repflags;		MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);		/*		 * Abort any prepared transactions that were restored		 * by recovery.  We won't be able to create any txns of		 * our own until they're resolved, but we can't resolve		 * them ourselves;  the master has to.  If any get		 * resolved as commits, we'll redo them when commit		 * records come in.  Aborts will simply be ignored.		 */		if ((ret = __rep_abort_prepared(dbenv)) != 0)			goto errlock;		MUTEX_LOCK(dbenv, db_rep->db_mutexp);		ret = __rep_client_dbinit(dbenv, init_db, REP_DB);		MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);		if (ret != 0)			goto errlock;		MUTEX_LOCK(dbenv, db_rep->rep_mutexp);		rep->start_th = 0;		if (role_chg) {			F_CLR(rep, REP_F_READY);			rep->in_recovery = 0;		}		MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);		/*		 * If this client created a newly replicated environment,		 * then announce the existence of this client.  The master		 * should respond with a message that will tell this client		 * the current generation number and the current LSN.  This		 * will allow the client to either perform recovery or		 * simply join in.		 */		if (announce)			(void)__rep_send_message(dbenv,			    DB_EID_BROADCAST, REP_NEWCLIENT, NULL, dbt, 0);		else			(void)__rep_send_message(dbenv,			    DB_EID_BROADCAST, REP_ALIVE_REQ, NULL, NULL, 0);	}	if (0) {		/*		 * We have separate labels for errors.  If we're returning an		 * error before we've set start_th, we use 'err'.  If		 * we are erroring while holding the rep_mutex, then we use		 * 'errunlock' label.  If we're erroring without holding the rep		 * mutex we must use 'errlock'.		 */errlock:		MUTEX_LOCK(dbenv, db_rep->rep_mutexp);errunlock:		rep->start_th = 0;		if (role_chg) {			F_CLR(rep, REP_F_READY);			rep->in_recovery = 0;		}err:		MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);	}	return (ret);}/* * __rep_client_dbinit -- * * Initialize the LSN database on the client side.  This is called from the * client initialization code.  The startup flag value indicates if * this is the first thread/process starting up and therefore should create * the LSN database.  This routine must be called once by each process acting * as a client. * * Assumes caller holds appropriate mutex. * * PUBLIC: int __rep_client_dbinit __P((DB_ENV *, int, repdb_t)); */int__rep_client_dbinit(dbenv, startup, which)	DB_ENV *dbenv;	int startup;	repdb_t which;{	DB_REP *db_rep;	DB *dbp, **rdbpp;	REP *rep;	int ret, t_ret;	u_int32_t flags;	const char *name;	PANIC_CHECK(dbenv);	db_rep = dbenv->rep_handle;	rep = db_rep->region;	dbp = NULL;#define	REPDBNAME	"__db.rep.db"#define	REPPAGENAME     "__db.reppg.db"	if (which == REP_DB) {		name = REPDBNAME;		rdbpp = &db_rep->rep_db;	} else {		name = REPPAGENAME;		rdbpp = &rep->file_dbp;	}	/* Check if this has already been called on this environment. */	if (*rdbpp != NULL)		return (0);	if (startup) {		if ((ret = db_create(&dbp, dbenv, DB_REP_CREATE)) != 0)			goto err;		/*		 * Ignore errors, because if the file doesn't exist, this		 * is perfectly OK.		 */		(void)__db_remove(dbp, NULL, name, NULL, DB_FORCE);	}	if ((ret = db_create(&dbp, dbenv, DB_REP_CREATE)) != 0)		goto err;	if (which == REP_DB &&	    (ret = __bam_set_bt_compare(dbp, __rep_bt_cmp)) != 0)		goto err;	/* Allow writes to this database on a client. */	F_SET(dbp, DB_AM_CL_WRITER);	flags = DB_NO_AUTO_COMMIT |	    (startup ? DB_CREATE : 0) |	    (F_ISSET(dbenv, DB_ENV_THREAD) ? DB_THREAD : 0);	if ((ret = __db_open(dbp, NULL, name, NULL,	    (which == REP_DB ? DB_BTREE : DB_RECNO),	    flags, 0, PGNO_BASE_MD)) != 0)		goto err;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -