📄 rep_record.c
字号:
/*- * See the file LICENSE for redistribution information. * * Copyright (c) 2001-2004 * Sleepycat Software. All rights reserved. * * $Id: rep_record.c,v 1.255 2004/11/04 18:35:29 sue Exp $ */#include "db_config.h"#ifndef NO_SYSTEM_INCLUDES#if TIME_WITH_SYS_TIME#include <sys/time.h>#include <time.h>#else#if HAVE_SYS_TIME_H#include <sys/time.h>#else#include <time.h>#endif#endif#include <stdlib.h>#include <string.h>#endif#include "db_int.h"#include "dbinc/db_page.h"#include "dbinc/db_shash.h"#include "dbinc/db_am.h"#include "dbinc/lock.h"#include "dbinc/log.h"#include "dbinc/mp.h"#include "dbinc/txn.h"static int __rep_apply __P((DB_ENV *, REP_CONTROL *, DBT *, DB_LSN *, int *));static int __rep_collect_txn __P((DB_ENV *, DB_LSN *, LSN_COLLECTION *));static int __rep_do_ckp __P((DB_ENV *, DBT *, REP_CONTROL *));static int __rep_dorecovery __P((DB_ENV *, DB_LSN *, DB_LSN *));static int __rep_getnext __P((DB_ENV *));static int __rep_lsn_cmp __P((const void *, const void *));static int __rep_newfile __P((DB_ENV *, REP_CONTROL *, DB_LSN *));static int __rep_process_rec __P((DB_ENV *, REP_CONTROL *, DBT *, u_int32_t *, DB_LSN *));static int __rep_remfirst __P((DB_ENV *, DBT *, DBT *));static int __rep_resend_req __P((DB_ENV *, int));static int __rep_verify_match __P((DB_ENV *, DB_LSN *, time_t));/* Used to consistently designate which messages ought to be received where. */#define MASTER_ONLY(rep, rp) do { \ if (!F_ISSET(rep, REP_F_MASTER)) { \ RPRINT(dbenv, rep, \ (dbenv, &mb, "Master record received on client")); \ REP_PRINT_MESSAGE(dbenv, \ *eidp, rp, "rep_process_message"); \ ret = EINVAL; \ goto errlock; \ } \} while (0)#define CLIENT_ONLY(rep, rp) do { \ if (!F_ISSET(rep, REP_F_CLIENT)) { \ RPRINT(dbenv, rep, \ (dbenv, &mb, "Client record received on master")); \ REP_PRINT_MESSAGE(dbenv, \ *eidp, rp, "rep_process_message"); \ (void)__rep_send_message(dbenv, \ DB_EID_BROADCAST, REP_DUPMASTER, NULL, NULL, 0); \ ret = DB_REP_DUPMASTER; \ goto errlock; \ } \} while (0)#define MASTER_CHECK(dbenv, eid, rep) do { \ if (rep->master_id == DB_EID_INVALID) { \ RPRINT(dbenv, rep, (dbenv, &mb, \ "Received record from %d, master is INVALID", eid));\ ret = 0; \ (void)__rep_send_message(dbenv, \ DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0); \ goto errlock; \ } \ if (eid != rep->master_id) { \ __db_err(dbenv, \ "Received master record from %d, master is %d", \ eid, rep->master_id); \ ret = EINVAL; \ goto errlock; \ } \} while (0)#define MASTER_UPDATE(dbenv, renv) do { \ MUTEX_LOCK((dbenv), &(renv)->mutex); \ F_SET((renv), DB_REGENV_REPLOCKED); \ (void)time(&(renv)->op_timestamp); \ MUTEX_UNLOCK((dbenv), &(renv)->mutex); \} while (0)#define ANYSITE(rep)/* * __rep_process_message -- * * This routine takes an incoming message and processes it. * * control: contains the control fields from the record * rec: contains the actual record * eidp: contains the machine id of the sender of the message; * in the case of a DB_NEWMASTER message, returns the eid * of the new master. * ret_lsnp: On DB_REP_ISPERM and DB_REP_NOTPERM returns, contains the * lsn of the maximum permanent or current not permanent log record * (respectively). * * PUBLIC: int __rep_process_message __P((DB_ENV *, DBT *, DBT *, int *, * PUBLIC: DB_LSN *)); */int__rep_process_message(dbenv, control, rec, eidp, ret_lsnp) DB_ENV *dbenv; DBT *control, *rec; int *eidp; DB_LSN *ret_lsnp;{ DB_LOG *dblp; DB_LOGC *logc; DB_LSN endlsn, lsn, oldfilelsn; DB_REP *db_rep; DBT *d, data_dbt, mylog; LOG *lp; REGENV *renv; REGINFO *infop; REP *rep; REP_CONTROL *rp; REP_VOTE_INFO *vi; u_int32_t bytes, egen, flags, gen, gbytes, rectype, type; int check_limit, cmp, done, do_req, is_dup; int master, match, old, recovering, ret, t_ret; time_t savetime;#ifdef DIAGNOSTIC DB_MSGBUF mb;#endif PANIC_CHECK(dbenv); ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_process_message", DB_INIT_REP); /* Control argument must be non-Null. */ if (control == NULL || control->size == 0) { __db_err(dbenv, "DB_ENV->rep_process_message: control argument must be specified"); return (EINVAL); } if (!IS_REP_MASTER(dbenv) && !IS_REP_CLIENT(dbenv)) { __db_err(dbenv, "Environment not configured as replication master or client"); return (EINVAL); } ret = 0; db_rep = dbenv->rep_handle; rep = db_rep->region; dblp = dbenv->lg_handle; lp = dblp->reginfo.primary; infop = dbenv->reginfo; renv = infop->primary; rp = (REP_CONTROL *)control->data; if (ret_lsnp != NULL) ZERO_LSN(*ret_lsnp); /* * Acquire the replication lock. */ MUTEX_LOCK(dbenv, db_rep->rep_mutexp); if (rep->start_th != 0) { /* * If we're racing with a thread in rep_start, then * just ignore the message and return. */ RPRINT(dbenv, rep, (dbenv, &mb, "Racing rep_start, ignore message.")); MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); goto out; } rep->msg_th++; gen = rep->gen; recovering = rep->in_recovery || F_ISSET(rep, REP_F_RECOVER_MASK); savetime = renv->rep_timestamp; rep->stat.st_msgs_processed++; MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); REP_PRINT_MESSAGE(dbenv, *eidp, rp, "rep_process_message"); /* Complain if we see an improper version number. */ if (rp->rep_version != DB_REPVERSION) { __db_err(dbenv, "unexpected replication message version %lu, expected %d", (u_long)rp->rep_version, DB_REPVERSION); ret = EINVAL; goto errlock; } if (rp->log_version != DB_LOGVERSION) { __db_err(dbenv, "unexpected log record version %lu, expected %d", (u_long)rp->log_version, DB_LOGVERSION); ret = EINVAL; goto errlock; } /* * Check for generation number matching. Ignore any old messages * except requests that are indicative of a new client that needs * to get in sync. */ if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ && rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ && rp->rectype != REP_DUPMASTER) { /* * We don't hold the rep mutex, and could miscount if we race. */ rep->stat.st_msgs_badgen++; goto errlock; } if (rp->gen > gen) { /* * If I am a master and am out of date with a lower generation * number, I am in bad shape and should downgrade. */ if (F_ISSET(rep, REP_F_MASTER)) { rep->stat.st_dupmasters++; ret = DB_REP_DUPMASTER; if (rp->rectype != REP_DUPMASTER) (void)__rep_send_message(dbenv, DB_EID_BROADCAST, REP_DUPMASTER, NULL, NULL, 0); goto errlock; } /* * I am a client and am out of date. If this is an election, * or a response from the first site I contacted, then I can * accept the generation number and participate in future * elections and communication. Otherwise, I need to hear about * a new master and sync up. */ if (rp->rectype == REP_ALIVE || rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) { MUTEX_LOCK(dbenv, db_rep->rep_mutexp); RPRINT(dbenv, rep, (dbenv, &mb, "Updating gen from %lu to %lu", (u_long)gen, (u_long)rp->gen)); rep->master_id = DB_EID_INVALID; gen = rep->gen = rp->gen; /* * Updating of egen will happen when we process the * message below for each message type. */ MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); if (rp->rectype == REP_ALIVE) (void)__rep_send_message(dbenv, DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0); } else if (rp->rectype != REP_NEWMASTER) { (void)__rep_send_message(dbenv, DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0); goto errlock; } /* * If you get here, then you're a client and either you're * in an election or you have a NEWMASTER or an ALIVE message * whose processing will do the right thing below. */ } /* * We need to check if we're in recovery and if we are * then we need to ignore any messages except VERIFY*, VOTE*, * NEW* and ALIVE_REQ, or backup related messages: UPDATE*, * PAGE* and FILE*. We need to also accept LOG messages * if we're copying the log for recovery/backup. */ if (recovering) { switch (rp->rectype) { case REP_VERIFY: MUTEX_LOCK(dbenv, db_rep->db_mutexp); cmp = log_compare(&lp->verify_lsn, &rp->lsn); MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); if (cmp != 0) goto skip; break; case REP_NEWFILE: case REP_LOG: case REP_LOG_MORE: if (!F_ISSET(rep, REP_F_RECOVER_LOG)) goto skip; /* * If we're recovering the log we only want * log records that are in the range we need * to recover. Otherwise we can end up storing * a huge number of "new" records, only to * truncate the temp database later after we * run recovery. */ if (log_compare(&rp->lsn, &rep->last_lsn) > 0) goto skip; break; case REP_ALIVE: case REP_ALIVE_REQ: case REP_DUPMASTER: case REP_FILE_FAIL: case REP_NEWCLIENT: case REP_NEWMASTER: case REP_NEWSITE: case REP_PAGE: case REP_PAGE_FAIL: case REP_PAGE_MORE: case REP_PAGE_REQ: case REP_UPDATE: case REP_UPDATE_REQ: case REP_VERIFY_FAIL: case REP_VOTE1: case REP_VOTE2: break; default:skip: /* Check for need to retransmit. */ /* Not holding rep_mutex, may miscount */ rep->stat.st_msgs_recover++; MUTEX_LOCK(dbenv, db_rep->db_mutexp); do_req = __rep_check_doreq(dbenv, rep); MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); if (do_req) { /* * Don't respond to a MASTER_REQ with * a MASTER_REQ. */ if (rep->master_id == DB_EID_INVALID && rp->rectype != REP_MASTER_REQ) (void)__rep_send_message(dbenv, DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0); else if (*eidp == rep->master_id) ret = __rep_resend_req(dbenv, *eidp); } goto errlock; } } switch (rp->rectype) { case REP_ALIVE: ANYSITE(rep); egen = *(u_int32_t *)rec->data; MUTEX_LOCK(dbenv, db_rep->rep_mutexp); RPRINT(dbenv, rep, (dbenv, &mb, "Received ALIVE egen of %lu, mine %lu", (u_long)egen, (u_long)rep->egen)); if (egen > rep->egen) { /* * We're changing egen, need to clear out any old * election information. */ __rep_elect_done(dbenv, rep); rep->egen = egen; } MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); break; case REP_ALIVE_REQ: ANYSITE(rep); dblp = dbenv->lg_handle; R_LOCK(dbenv, &dblp->reginfo); lsn = ((LOG *)dblp->reginfo.primary)->lsn; R_UNLOCK(dbenv, &dblp->reginfo); MUTEX_LOCK(dbenv, db_rep->rep_mutexp); egen = rep->egen; MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); data_dbt.data = &egen; data_dbt.size = sizeof(egen); (void)__rep_send_message(dbenv, *eidp, REP_ALIVE, &lsn, &data_dbt, 0); goto errlock; case REP_DUPMASTER: if (F_ISSET(rep, REP_F_MASTER)) ret = DB_REP_DUPMASTER; goto errlock; case REP_ALL_REQ: MASTER_ONLY(rep, rp); gbytes = bytes = 0; MUTEX_LOCK(dbenv, db_rep->rep_mutexp); gbytes = rep->gbytes; bytes = rep->bytes; MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); check_limit = gbytes != 0 || bytes != 0; if ((ret = __log_cursor(dbenv, &logc)) != 0) goto errlock; memset(&data_dbt, 0, sizeof(data_dbt)); oldfilelsn = lsn = rp->lsn; type = REP_LOG; flags = IS_ZERO_LSN(rp->lsn) || IS_INIT_LSN(rp->lsn) ? DB_FIRST : DB_SET; for (ret = __log_c_get(logc, &lsn, &data_dbt, flags); ret == 0 && type == REP_LOG; ret = __log_c_get(logc, &lsn, &data_dbt, DB_NEXT)) { /* * When a log file changes, we'll have a real log * record with some lsn [n][m], and we'll also want * to send a NEWFILE message with lsn [n-1][MAX]. */ if (lsn.file != oldfilelsn.file) (void)__rep_send_message(dbenv, *eidp, REP_NEWFILE, &oldfilelsn, NULL, 0); if (check_limit) { /* * data_dbt.size is only the size of the log * record; it doesn't count the size of the * control structure. Factor that in as well * so we're not off by a lot if our log records * are small. */ while (bytes < data_dbt.size + sizeof(REP_CONTROL)) { if (gbytes > 0) { bytes += GIGABYTE; --gbytes; continue; } /* * We don't hold the rep mutex, * and may miscount. */ rep->stat.st_nthrottles++; type = REP_LOG_MORE; goto send; } bytes -= (data_dbt.size + sizeof(REP_CONTROL)); }send: if (__rep_send_message(dbenv, *eidp, type, &lsn, &data_dbt, DB_LOG_RESEND) != 0) break; /* * If we are about to change files, then we'll need the * last LSN in the previous file. Save it here. */ oldfilelsn = lsn; oldfilelsn.offset += logc->c_len; } if (ret == DB_NOTFOUND) ret = 0; if ((t_ret = __log_c_close(logc)) != 0 && ret == 0) ret = t_ret; goto errlock;#ifdef NOTYET case REP_FILE: /* TODO */ CLIENT_ONLY(rep, rp); MASTER_CHECK(dbenv, *eidp, rep); break; case REP_FILE_REQ: MASTER_ONLY(rep, rp);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -