📄 rep_record.c
字号:
ret = __rep_send_message(dbenv, *eidp, REP_VERIFY_REQ, &lsn, NULL, 0); }rep_verify_err: if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0) ret = t_ret; return (ret); case REP_VERIFY_FAIL: rep->stat.st_outdated++; return (DB_REP_OUTDATED); case REP_VERIFY_REQ: MASTER_ONLY(dbenv); type = REP_VERIFY; if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0) return (ret); d = &data_dbt; memset(d, 0, sizeof(data_dbt)); F_SET(logc, DB_LOG_SILENT_ERR); ret = logc->get(logc, &rp->lsn, d, DB_SET); /* * If the LSN was invalid, then we might get a not * found, we might get an EIO, we could get anything. * If we get a DB_NOTFOUND, then there is a chance that * the LSN comes before the first file present in which * case we need to return a fail so that the client can return * a DB_OUTDATED. */ if (ret == DB_NOTFOUND && __log_is_outdated(dbenv, rp->lsn.file, &old) == 0 && old != 0) type = REP_VERIFY_FAIL; if (ret != 0) d = NULL; ret = __rep_send_message(dbenv, *eidp, type, &rp->lsn, d, 0); if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0) ret = t_ret; return (ret); case REP_VOTE1: if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {#ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) __db_err(dbenv, "Master received vote");#endif R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); return (__rep_send_message(dbenv, *eidp, REP_NEWMASTER, &lsn, NULL, 0)); } vi = (REP_VOTE_INFO *)rec->data; MUTEX_LOCK(dbenv, db_rep->mutexp); /* * If you get a vote and you're not in an election, simply * return an indicator to hold an election which will trigger * this site to send its vote again. */ if (!IN_ELECTION(rep)) {#ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) __db_err(dbenv, "Not in election, but received vote1");#endif ret = DB_REP_HOLDELECTION; goto unlock; } if (F_ISSET(rep, REP_F_EPHASE2)) goto unlock; /* Check if this site knows about more sites than we do. */ if (vi->nsites > rep->nsites) rep->nsites = vi->nsites; /* Check if we've heard from this site already. */ tally = R_ADDR((REGINFO *)dbenv->reginfo, rep->tally_off); for (i = 0; i < rep->sites; i++) { if (tally[i] == *eidp) /* Duplicate vote. */ goto unlock; } /* * We are keeping vote, let's see if that changes our count of * the number of sites. */ if (rep->sites + 1 > rep->nsites) rep->nsites = rep->sites + 1; if (rep->nsites > rep->asites && (ret = __rep_grow_sites(dbenv, rep->nsites)) != 0) goto unlock; tally[rep->sites] = *eidp; rep->sites++; /* * Change winners if the incoming record has a higher * priority, or an equal priority but a larger LSN, or * an equal priority and LSN but higher "tiebreaker" value. */#ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) { __db_err(dbenv, "%s(eid)%d (pri)%d (gen)%d (sites)%d [%d,%d]", "Existing vote: ", rep->winner, rep->w_priority, rep->w_gen, rep->sites, rep->w_lsn.file, rep->w_lsn.offset); __db_err(dbenv, "Incoming vote: (eid)%d (pri)%d (gen)%d [%d,%d]", *eidp, vi->priority, rp->gen, rp->lsn.file, rp->lsn.offset); }#endif cmp = log_compare(&rp->lsn, &rep->w_lsn); if (vi->priority > rep->w_priority || (vi->priority != 0 && vi->priority == rep->w_priority && (cmp > 0 || (cmp == 0 && vi->tiebreaker > rep->w_tiebreaker)))) {#ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) __db_err(dbenv, "Accepting new vote");#endif rep->winner = *eidp; rep->w_priority = vi->priority; rep->w_lsn = rp->lsn; rep->w_gen = rp->gen; } master = rep->winner; lsn = rep->w_lsn; done = rep->sites == rep->nsites && rep->w_priority != 0; if (done) {#ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) { __db_err(dbenv, "Phase1 election done"); __db_err(dbenv, "Voting for %d%s", master, master == rep->eid ? "(self)" : ""); }#endif F_CLR(rep, REP_F_EPHASE1); F_SET(rep, REP_F_EPHASE2); } if (done && master == rep->eid) { rep->votes++; MUTEX_UNLOCK(dbenv, db_rep->mutexp); return (0); } MUTEX_UNLOCK(dbenv, db_rep->mutexp); /* Vote for someone else. */ if (done) return (__rep_send_message(dbenv, master, REP_VOTE2, NULL, NULL, 0)); /* Election is still going on. */ break; case REP_VOTE2:#ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) __db_err(dbenv, "We received a vote%s", F_ISSET(dbenv, DB_ENV_REP_MASTER) ? " (master)" : "");#endif if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) { R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); rep->stat.st_elections_won++; return (__rep_send_message(dbenv, *eidp, REP_NEWMASTER, &lsn, NULL, 0)); } MUTEX_LOCK(dbenv, db_rep->mutexp); /* If we have priority 0, we should never get a vote. */ DB_ASSERT(rep->priority != 0); if (!IN_ELECTION(rep)) {#ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) __db_err(dbenv, "Not in election, got vote");#endif MUTEX_UNLOCK(dbenv, db_rep->mutexp); return (DB_REP_HOLDELECTION); } /* avoid counting duplicates. */ rep->votes++; done = rep->votes > rep->nsites / 2; if (done) { rep->master_id = rep->eid; rep->gen = rep->w_gen + 1; ELECTION_DONE(rep); F_CLR(rep, REP_F_UPGRADE); F_SET(rep, REP_F_MASTER); *eidp = rep->master_id;#ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) __db_err(dbenv, "Got enough votes to win; election done; winner is %d", rep->master_id);#endif } MUTEX_UNLOCK(dbenv, db_rep->mutexp); if (done) { R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); /* Declare me the winner. */#ifdef DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) __db_err(dbenv, "I won, sending NEWMASTER");#endif rep->stat.st_elections_won++; if ((ret = __rep_send_message(dbenv, DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0)) != 0) break; return (DB_REP_NEWMASTER); } break; default: __db_err(dbenv, "DB_ENV->rep_process_message: unknown replication message: type %lu", (u_long)rp->rectype); return (EINVAL); } return (0);unlock: MUTEX_UNLOCK(dbenv, db_rep->mutexp); return (ret);}/* * __rep_apply -- * * Handle incoming log records on a client, applying when possible and * entering into the bookkeeping table otherwise. This is the guts of * the routine that handles the state machine that describes how we * process and manage incoming log records. */static int__rep_apply(dbenv, rp, rec) DB_ENV *dbenv; REP_CONTROL *rp; DBT *rec;{ __dbreg_register_args dbreg_args; __txn_ckp_args ckp_args; DB_REP *db_rep; DBT control_dbt, key_dbt, lsn_dbt, nextrec_dbt, rec_dbt; DB *dbp; DBC *dbc; DB_LOG *dblp; DB_LSN ckp_lsn, lsn, newfile_lsn, next_lsn, waiting_lsn; LOG *lp; REP *rep; REP_CONTROL lsn_rc; u_int32_t rectype, txnid; int cmp, do_req, eid, have_mutex, ret, t_ret; db_rep = dbenv->rep_handle; rep = db_rep->region; dbp = db_rep->rep_db; dbc = NULL; have_mutex = ret = 0; memset(&control_dbt, 0, sizeof(control_dbt)); memset(&rec_dbt, 0, sizeof(rec_dbt)); /* * If this is a log record and it's the next one in line, simply * write it to the log. If it's a "normal" log record, i.e., not * a COMMIT or CHECKPOINT or something that needs immediate processing, * just return. If it's a COMMIT, CHECKPOINT or LOG_REGISTER (i.e., * not SIMPLE), handle it now. If it's a NEWFILE record, then we * have to be prepared to deal with a logfile change. */ dblp = dbenv->lg_handle; R_LOCK(dbenv, &dblp->reginfo); lp = dblp->reginfo.primary; cmp = log_compare(&rp->lsn, &lp->ready_lsn); /* * This is written to assume that you don't end up with a lot of * records after a hole. That is, it optimizes for the case where * there is only a record or two after a hole. If you have a lot * of records after a hole, what you'd really want to do is write * all of them and then process all the commits, checkpoints, etc. * together. That is more complicated processing that we can add * later if necessary. * * That said, I really don't want to do db operations holding the * log mutex, so the synchronization here is tricky. */ if (cmp == 0) { /* We got the log record that we are expecting. */ if (rp->rectype == REP_NEWFILE) {newfile: ret = __rep_newfile(dbenv, rp, rec, &lp->ready_lsn); /* Make this evaluate to a simple rectype. */ rectype = 0; } else { DB_ASSERT(log_compare(&rp->lsn, &lp->lsn) == 0); ret = __log_rep_put(dbenv, &rp->lsn, rec); lp->ready_lsn = lp->lsn; memcpy(&rectype, rec->data, sizeof(rectype)); if (ret == 0) /* * We may miscount if we race, since we * don't currently hold the rep mutex. */ rep->stat.st_log_records++; } while (ret == 0 && IS_SIMPLE(rectype) && log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0) { /* * We just filled in a gap in the log record stream. * Write subsequent records to the log. */gap_check: lp->wait_recs = 0; lp->rcvd_recs = 0; R_UNLOCK(dbenv, &dblp->reginfo); if (have_mutex == 0) { MUTEX_LOCK(dbenv, db_rep->db_mutexp); have_mutex = 1; } if (dbc == NULL && (ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0) goto err; /* The DBTs need to persist through another call. */ F_SET(&control_dbt, DB_DBT_REALLOC); F_SET(&rec_dbt, DB_DBT_REALLOC); if ((ret = dbc->c_get(dbc, &control_dbt, &rec_dbt, DB_RMW | DB_FIRST)) != 0) goto err; rp = (REP_CONTROL *)control_dbt.data; rec = &rec_dbt; memcpy(&rectype, rec->data, sizeof(rectype)); R_LOCK(dbenv, &dblp->reginfo); /* * We need to check again, because it's possible that * some other thread of control changed the waiting_lsn * or removed that record from the database. */ if (log_compare(&lp->ready_lsn, &rp->lsn) == 0) { if (rp->rectype != REP_NEWFILE) { DB_ASSERT(log_compare (&rp->lsn, &lp->lsn) == 0); ret = __log_rep_put(dbenv, &rp->lsn, rec); lp->ready_lsn = lp->lsn; /* * We may miscount if we race, since we * don't currently hold the rep mutex. */ if (ret == 0) rep->stat.st_log_records++; } else { ret = __rep_newfile(dbenv, rp, rec, &lp->ready_lsn); rectype = 0; } waiting_lsn = lp->waiting_lsn; R_UNLOCK(dbenv, &dblp->reginfo); if ((ret = dbc->c_del(dbc, 0)) != 0) goto err; /* * We may miscount, as we don't hold the rep * mutex. */ --rep->stat.st_log_queued; /* * Update waiting_lsn. We need to move it * forward to the LSN of the next record * in the queue. */ memset(&lsn_dbt, 0, sizeof(lsn_dbt)); F_SET(&lsn_dbt, DB_DBT_USERMEM); lsn_dbt.data = &lsn_rc; lsn_dbt.ulen = sizeof(lsn_rc); memset(&lsn_rc, 0, sizeof(lsn_rc)); /* * If the next item in the database is a log * record--the common case--we're not * interested in its contents, just in its LSN. * If it's a newfile message, though, the * data field may be the LSN of the last * record in the old file, and we need to use * that to determine whether or not there's * a gap. * * Optimize both these cases by doing a partial * get of the data item. If it's a newfile * record, we'll get the whole LSN, and if * it's not, we won't waste time allocating. */ memset(&nextrec_dbt, 0, sizeof(nextrec_dbt)); F_SET(&nextrec_dbt, DB_DBT_USERMEM | DB_DBT_PARTIAL); nextrec_dbt.ulen = nextrec_dbt.dlen = sizeof(newfile_lsn); ZERO_LSN(newfile_lsn); nextrec_dbt.data = &newfile_lsn; ret = dbc->c_get(dbc, &lsn_dbt, &nextrec_dbt, DB_NEXT); if (ret != DB_NOTFOUND && ret != 0) goto err; R_LOCK(dbenv, &dblp->reginfo); if (ret == DB_NOTFOUND) { /* * Do a quick double-check to make * sure waiting_lsn hasn't changed. * It's possible that between the * DB_NOTFOUND return and the R_LOCK, * some record was added to the * database, and we don't want to lose * sight of the fact that it's there. */ if (log_compare(&waiting_lsn, &lp->waiting_lsn) == 0) ZERO_LSN( lp->waiting_lsn); /* * Whether or not the current record is * simple, there's no next one, and * therefore we haven't got anything * else to do right now. Break out. */ break; } DB_ASSERT(lsn_dbt.size == sizeof(lsn_rc)); /* * NEWFILE records have somewhat convoluted * semantics, so there are five cases * pertaining to what the newly-gotten record * is and what we want to do about it. * * 1) This isn't a NEWFILE record. Advance * waiting_lsn and proceed. * * 2) NEWFILE, no LSN stored as the datum, * lsn_rc.lsn == ready_lsn. The NEWFILE * record is next, so set waiting_lsn = * ready_lsn. * * 3) NEWFILE, no LSN stored as the datum, but * lsn_rc.lsn > ready_lsn. There's still a * gap; set waiting_lsn = lsn_rc.lsn. * * 4) NEWFILE, newfile_lsn in datum, and it's < * ready_lsn. (If the datum is non-empty, * it's the LSN of the last record in a log * file, not the end of the log, and * lsn_rc.lsn is the LSN of the start of * the new file--we didn't have the end of * the old log handy when we sent the * record.) No gap--we're ready to * proceed. Set both waiting and ready_lsn * to lsn_rc.lsn. * * 5) NEWFILE, newfile_lsn in datum, and it's >= * ready_lsn. We're still missing at * least one record; set waiting_lsn, * but not ready_lsn, to lsn_rc.lsn. */ if (lsn_rc.rectype == REP_NEWFILE && nextrec_dbt.size > 0 && log_compare( &newfile_lsn, &lp->ready_lsn) < 0) /* Case 4. */ lp->ready_lsn = lp->waiting_lsn = lsn_rc.lsn; else { /* Cases 1, 2, 3, and 5. */ DB_ASSERT(log_compare(&lsn_rc.lsn, &lp->ready_lsn) >= 0); lp->waiting_lsn = lsn_rc.lsn; } /* * If the current rectype is simple, we're * done with it, and we should check and see * whether the next record queued is the next * one we're ready for. This is just the loop * condition, so we continue. * * Otherwise, we need to break out of this loop * and process this record first. */ if (!IS_SIMPLE(rectype)) break; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -