📄 rep_record.c
字号:
/*- * See the file LICENSE for redistribution information. * * Copyright (c) 2001-2002 * Sleepycat Software. All rights reserved. */#include "db_config.h"#ifndef lintstatic const char revid[] = "$Id: rep_record.c,v 1.111 2002/09/11 19:39:11 bostic Exp $";#endif /* not lint */#ifndef NO_SYSTEM_INCLUDES#include <stdlib.h>#include <string.h>#endif#include "db_int.h"#include "dbinc/db_page.h"#include "dbinc/db_am.h"#include "dbinc/log.h"#include "dbinc/rep.h"#include "dbinc/txn.h"static int __rep_apply __P((DB_ENV *, REP_CONTROL *, DBT *));static int __rep_collect_txn __P((DB_ENV *, DB_LSN *, LSN_COLLECTION *));static int __rep_lsn_cmp __P((const void *, const void *));static int __rep_newfile __P((DB_ENV *, REP_CONTROL *, DBT *, DB_LSN *));#define IS_SIMPLE(R) ((R) != DB___txn_regop && \ (R) != DB___txn_ckp && (R) != DB___dbreg_register)/* * __rep_process_message -- * * This routine takes an incoming message and processes it. * * control: contains the control fields from the record * rec: contains the actual record * eidp: contains the machine id of the sender of the message; * in the case of a DB_NEWMASTER message, returns the eid * of the new master. * * PUBLIC: int __rep_process_message __P((DB_ENV *, DBT *, DBT *, int *)); */int__rep_process_message(dbenv, control, rec, eidp) DB_ENV *dbenv; DBT *control, *rec; int *eidp;{ DB_LOG *dblp; DB_LOGC *logc; DB_LSN init_lsn, lsn, newfilelsn, oldfilelsn; DB_REP *db_rep; DBT *d, data_dbt, lsndbt, mylog; LOG *lp; REP *rep; REP_CONTROL *rp; REP_VOTE_INFO *vi; u_int32_t bytes, gen, gbytes, type, unused; int check_limit, cmp, done, do_req, i; int master, old, recovering, ret, t_ret, *tally; PANIC_CHECK(dbenv); ENV_REQUIRES_CONFIG(dbenv, dbenv->tx_handle, "rep_stat", DB_INIT_TXN); /* Control argument must be non-Null. */ if (control == NULL || control->size == 0) { __db_err(dbenv, "DB_ENV->rep_process_message: control argument must be specified"); return (EINVAL); } ret = 0; db_rep = dbenv->rep_handle; rep = db_rep->region; dblp = dbenv->lg_handle; lp = dblp->reginfo.primary; MUTEX_LOCK(dbenv, db_rep->mutexp); gen = rep->gen; recovering = F_ISSET(rep, REP_F_RECOVER); rep->stat.st_msgs_processed++; MUTEX_UNLOCK(dbenv, db_rep->mutexp); rp = (REP_CONTROL *)control->data;#if 0 __rep_print_message(dbenv, *eidp, rp, "rep_process_message");#endif /* Complain if we see an improper version number. */ if (rp->rep_version != DB_REPVERSION) { __db_err(dbenv, "unexpected replication message version %d, expected %d", rp->rep_version, DB_REPVERSION); return (EINVAL); } if (rp->log_version != DB_LOGVERSION) { __db_err(dbenv, "unexpected log record version %d, expected %d", rp->log_version, DB_LOGVERSION); return (EINVAL); } /* * Check for generation number matching. Ignore any old messages * except requests that are indicative of a new client that needs * to get in sync. */ if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ && rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ) { /* * We don't hold the rep mutex, and could miscount if we race. */ rep->stat.st_msgs_badgen++; return (0); } if (rp->gen > gen && rp->rectype != REP_ALIVE && rp->rectype != REP_NEWMASTER) return (__rep_send_message(dbenv, DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0)); /* * We need to check if we're in recovery and if we are * then we need to ignore any messages except VERIFY, VOTE, * ELECT (the master might fail while we are recovering), and * ALIVE_REQ. */ if (recovering) switch(rp->rectype) { case REP_ALIVE: case REP_ALIVE_REQ: case REP_ELECT: case REP_NEWCLIENT: case REP_NEWMASTER: case REP_NEWSITE: case REP_VERIFY: R_LOCK(dbenv, &dblp->reginfo); cmp = log_compare(&lp->verify_lsn, &rp->lsn); R_UNLOCK(dbenv, &dblp->reginfo); if (cmp != 0) goto skip; /* FALLTHROUGH */ case REP_VOTE1: case REP_VOTE2: break; default:skip: /* * We don't hold the rep mutex, and could * miscount if we race. */ rep->stat.st_msgs_recover++; /* Check for need to retransmit. */ R_LOCK(dbenv, &dblp->reginfo); do_req = *eidp == rep->master_id && ++lp->rcvd_recs >= lp->wait_recs; if (do_req) { lp->wait_recs *= 2; if (lp->wait_recs + rep->max_gap) lp->wait_recs = rep->max_gap; lp->rcvd_recs = 0; lsn = lp->verify_lsn; } R_UNLOCK(dbenv, &dblp->reginfo); if (do_req) ret = __rep_send_message(dbenv, *eidp, REP_VERIFY_REQ, &lsn, NULL, 0); return (ret); } switch(rp->rectype) { case REP_ALIVE: ANYSITE(dbenv); if (rp->gen > gen && rp->flags) return (__rep_new_master(dbenv, rp, *eidp)); break; case REP_ALIVE_REQ: ANYSITE(dbenv); dblp = dbenv->lg_handle; R_LOCK(dbenv, &dblp->reginfo); lsn = ((LOG *)dblp->reginfo.primary)->lsn; R_UNLOCK(dbenv, &dblp->reginfo); return (__rep_send_message(dbenv, *eidp, REP_ALIVE, &lsn, NULL, F_ISSET(dbenv, DB_ENV_REP_MASTER) ? 1 : 0)); case REP_ALL_REQ: MASTER_ONLY(dbenv); gbytes = bytes = 0; MUTEX_LOCK(dbenv, db_rep->mutexp); gbytes = rep->gbytes; bytes = rep->bytes; MUTEX_UNLOCK(dbenv, db_rep->mutexp); check_limit = gbytes != 0 || bytes != 0; if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0) return (ret); memset(&data_dbt, 0, sizeof(data_dbt)); oldfilelsn = lsn = rp->lsn; type = REP_LOG; for (ret = logc->get(logc, &rp->lsn, &data_dbt, DB_SET); ret == 0 && type == REP_LOG; ret = logc->get(logc, &lsn, &data_dbt, DB_NEXT)) { /* * lsn.offset will only be 0 if this is the * beginning of the log; DB_SET, but not DB_NEXT, * can set the log cursor to [n][0]. */ if (lsn.offset == 0) ret = __rep_send_message(dbenv, *eidp, REP_NEWFILE, &lsn, NULL, 0); else { /* * DB_NEXT will never run into offsets * of 0; thus, when a log file changes, * we'll have a real log record with * some lsn [n][m], and we'll also want to send * a NEWFILE message with lsn [n][0]. * So that the client can detect gaps, * send in the rec parameter the * last LSN in the old file. */ if (lsn.file != oldfilelsn.file) { newfilelsn.file = lsn.file; newfilelsn.offset = 0; memset(&lsndbt, 0, sizeof(DBT)); lsndbt.size = sizeof(DB_LSN); lsndbt.data = &oldfilelsn; if ((ret = __rep_send_message(dbenv, *eidp, REP_NEWFILE, &newfilelsn, &lsndbt, 0)) != 0) break; } if (check_limit) { /* * data_dbt.size is only the size of * the log record; it doesn't count * the size of the control structure. * Factor that in as well so we're * not off by a lot if our log * records are small. */ while (bytes < data_dbt.size + sizeof(REP_CONTROL)) { if (gbytes > 0) { bytes += GIGABYTE; --gbytes; continue; } /* * We don't hold the rep mutex, * and may miscount. */ rep->stat.st_nthrottles++; type = REP_LOG_MORE; goto send; } bytes -= (data_dbt.size + sizeof(REP_CONTROL)); }send: ret = __rep_send_message(dbenv, *eidp, type, &lsn, &data_dbt, 0); } /* * In case we're about to change files and need it * for a NEWFILE message, save the current LSN. */ oldfilelsn = lsn; } if (ret == DB_NOTFOUND) ret = 0; if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0) ret = t_ret; return (ret); case REP_ELECT: if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) { R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); MUTEX_LOCK(dbenv, db_rep->mutexp); rep->gen++; MUTEX_UNLOCK(dbenv, db_rep->mutexp); return (__rep_send_message(dbenv, *eidp, REP_NEWMASTER, &lsn, NULL, 0)); } MUTEX_LOCK(dbenv, db_rep->mutexp); ret = IN_ELECTION(rep) ? 0 : DB_REP_HOLDELECTION; MUTEX_UNLOCK(dbenv, db_rep->mutexp); return (ret);#ifdef NOTYET case REP_FILE: /* TODO */ CLIENT_ONLY(dbenv); break; case REP_FILE_REQ: MASTER_ONLY(dbenv); return (__rep_send_file(dbenv, rec, *eidp)); break;#endif case REP_LOG: case REP_LOG_MORE: CLIENT_ONLY(dbenv); if ((ret = __rep_apply(dbenv, rp, rec)) != 0) return (ret); if (rp->rectype == REP_LOG_MORE) { MUTEX_LOCK(dbenv, db_rep->db_mutexp); master = rep->master_id; MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); ret = __rep_send_message(dbenv, master, REP_ALL_REQ, &lsn, NULL, 0); } return (ret); case REP_LOG_REQ: MASTER_ONLY(dbenv); if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0) return (ret); memset(&data_dbt, 0, sizeof(data_dbt)); lsn = rp->lsn; /* * There are three different cases here. * 1. We asked for a particular LSN and got it. * 2. We asked for an LSN of X,0 which is invalid and got the * first log record in a particular file. * 3. We asked for an LSN and it's not found because it is * beyond the end of a log file and we need a NEWFILE msg. */ ret = logc->get(logc, &rp->lsn, &data_dbt, DB_SET); cmp = log_compare(&lsn, &rp->lsn); if (ret == 0 && cmp == 0) /* Case 1 */ ret = __rep_send_message(dbenv, *eidp, REP_LOG, &rp->lsn, &data_dbt, 0); else if (ret == DB_NOTFOUND || (ret == 0 && cmp < 0 && rp->lsn.offset == 0)) /* Cases 2 and 3: Send a NEWFILE message. */ ret = __rep_send_message(dbenv, *eidp, REP_NEWFILE, &lsn, NULL, 0); if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0) ret = t_ret; return (ret); case REP_NEWSITE: /* We don't hold the rep mutex, and may miscount. */ rep->stat.st_newsites++; /* This is a rebroadcast; simply tell the application. */ if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) { dblp = dbenv->lg_handle; lp = dblp->reginfo.primary; R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); (void)__rep_send_message(dbenv, *eidp, REP_NEWMASTER, &lsn, NULL, 0); } return (DB_REP_NEWSITE); case REP_NEWCLIENT: /* * This message was received and should have resulted in the * application entering the machine ID in its machine table. * We respond to this with an ALIVE to send relevant information * to the new client. But first, broadcast the new client's * record to all the clients. */ if ((ret = __rep_send_message(dbenv, DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0)) != 0) return (ret); if (F_ISSET(dbenv, DB_ENV_REP_CLIENT)) return (0); /* FALLTHROUGH */ case REP_MASTER_REQ: ANYSITE(dbenv); if (F_ISSET(dbenv, DB_ENV_REP_CLIENT)) return (0); dblp = dbenv->lg_handle; lp = dblp->reginfo.primary; R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); return (__rep_send_message(dbenv, *eidp, REP_NEWMASTER, &lsn, NULL, 0)); case REP_NEWFILE: CLIENT_ONLY(dbenv); return (__rep_apply(dbenv, rp, rec)); case REP_NEWMASTER: ANYSITE(dbenv); if (F_ISSET(dbenv, DB_ENV_REP_MASTER) && *eidp != dbenv->rep_eid) { /* We don't hold the rep mutex, and may miscount. */ rep->stat.st_dupmasters++; return (DB_REP_DUPMASTER); } return (__rep_new_master(dbenv, rp, *eidp)); case REP_PAGE: /* TODO */ CLIENT_ONLY(dbenv); break; case REP_PAGE_REQ: /* TODO */ MASTER_ONLY(dbenv); break; case REP_PLIST: /* TODO */ CLIENT_ONLY(dbenv); break; case REP_PLIST_REQ: /* TODO */ MASTER_ONLY(dbenv); break; case REP_VERIFY: CLIENT_ONLY(dbenv); DB_ASSERT((F_ISSET(rep, REP_F_RECOVER) && !IS_ZERO_LSN(lp->verify_lsn)) || (!F_ISSET(rep, REP_F_RECOVER) && IS_ZERO_LSN(lp->verify_lsn))); if (IS_ZERO_LSN(lp->verify_lsn)) return (0); if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0) return (ret); memset(&mylog, 0, sizeof(mylog)); if ((ret = logc->get(logc, &rp->lsn, &mylog, DB_SET)) != 0) goto rep_verify_err; if (mylog.size == rec->size && memcmp(mylog.data, rec->data, rec->size) == 0) { /* * If we're a logs-only client, we can simply truncate * the log to the point where it last agreed with the * master's; otherwise, recover to that point. */ R_LOCK(dbenv, &dblp->reginfo); ZERO_LSN(lp->verify_lsn); R_UNLOCK(dbenv, &dblp->reginfo); if (F_ISSET(dbenv, DB_ENV_REP_LOGSONLY)) { INIT_LSN(init_lsn); if ((ret = dbenv->log_flush(dbenv, &rp->lsn)) != 0 || (ret = __log_vtruncate(dbenv, &rp->lsn, &init_lsn)) != 0) goto rep_verify_err; } else if ((ret = __db_apprec(dbenv, &rp->lsn, 0)) != 0) goto rep_verify_err; /* * The log has been truncated (either by __db_apprec or * directly). We want to make sure we're waiting for * the LSN at the new end-of-log, not some later point. */ R_LOCK(dbenv, &dblp->reginfo); lp->ready_lsn = lp->lsn; ZERO_LSN(lp->waiting_lsn); R_UNLOCK(dbenv, &dblp->reginfo); /* * Discard any log records we have queued; we're * about to re-request them, and can't trust the * ones in the queue. */ MUTEX_LOCK(dbenv, db_rep->db_mutexp); if ((ret = db_rep->rep_db->truncate(db_rep->rep_db, NULL, &unused, 0)) != 0) { MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); goto rep_verify_err; } rep->stat.st_log_queued = 0; MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); MUTEX_LOCK(dbenv, db_rep->mutexp); F_CLR(rep, REP_F_RECOVER); /* * If the master_id is invalid, this means that since * the last record was sent, somebody declared an * election and we may not have a master to request * things of. * * This is not an error; when we find a new master, * we'll re-negotiate where the end of the log is and * try to bring ourselves up to date again anyway. */ if ((master = rep->master_id) == DB_EID_INVALID) { DB_ASSERT(IN_ELECTION(rep)); MUTEX_UNLOCK(dbenv, db_rep->mutexp); ret = 0; } else { MUTEX_UNLOCK(dbenv, db_rep->mutexp); ret = __rep_send_message(dbenv, master, REP_ALL_REQ, &rp->lsn, NULL, 0); } } else if ((ret = logc->get(logc, &lsn, &mylog, DB_PREV)) == 0) { R_LOCK(dbenv, &dblp->reginfo); lp->verify_lsn = lsn; lp->rcvd_recs = 0; lp->wait_recs = rep->request_gap; R_UNLOCK(dbenv, &dblp->reginfo);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -