📄 rep_record.c
字号:
} else { MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); } goto errlock; case REP_VERIFY_REQ: MASTER_ONLY(rep, rp); type = REP_VERIFY; if ((ret = __log_cursor(dbenv, &logc)) != 0) goto errlock; d = &data_dbt; memset(d, 0, sizeof(data_dbt)); F_SET(logc, DB_LOG_SILENT_ERR); ret = __log_c_get(logc, &rp->lsn, d, DB_SET); /* * If the LSN was invalid, then we might get a not * found, we might get an EIO, we could get anything. * If we get a DB_NOTFOUND, then there is a chance that * the LSN comes before the first file present in which * case we need to return a fail so that the client can return * a DB_OUTDATED. */ if (ret == DB_NOTFOUND && __log_is_outdated(dbenv, rp->lsn.file, &old) == 0 && old != 0) type = REP_VERIFY_FAIL; if (ret != 0) d = NULL; (void)__rep_send_message(dbenv, *eidp, type, &rp->lsn, d, 0); ret = __log_c_close(logc); goto errlock; case REP_VOTE1: if (F_ISSET(rep, REP_F_MASTER)) { RPRINT(dbenv, rep, (dbenv, &mb, "Master received vote")); R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); (void)__rep_send_message(dbenv, *eidp, REP_NEWMASTER, &lsn, NULL, 0); goto errlock; } vi = (REP_VOTE_INFO *)rec->data; MUTEX_LOCK(dbenv, db_rep->rep_mutexp); /* * If we get a vote from a later election gen, we * clear everything from the current one, and we'll * start over by tallying it. If we get an old vote, * send an ALIVE to the old participant. */ RPRINT(dbenv, rep, (dbenv, &mb, "Received vote1 egen %lu, egen %lu", (u_long)vi->egen, (u_long)rep->egen)); if (vi->egen < rep->egen) { RPRINT(dbenv, rep, (dbenv, &mb, "Received old vote %lu, egen %lu, ignoring vote1", (u_long)vi->egen, (u_long)rep->egen)); egen = rep->egen; MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); data_dbt.data = &egen; data_dbt.size = sizeof(egen); (void)__rep_send_message(dbenv, *eidp, REP_ALIVE, &rp->lsn, &data_dbt, 0); goto errlock; } if (vi->egen > rep->egen) { RPRINT(dbenv, rep, (dbenv, &mb, "Received VOTE1 from egen %lu, my egen %lu; reset", (u_long)vi->egen, (u_long)rep->egen)); __rep_elect_done(dbenv, rep); rep->egen = vi->egen; } if (!IN_ELECTION(rep)) F_SET(rep, REP_F_TALLY); /* Check if this site knows about more sites than we do. */ if (vi->nsites > rep->nsites) rep->nsites = vi->nsites; /* Check if this site requires more votes than we do. */ if (vi->nvotes > rep->nvotes) rep->nvotes = vi->nvotes; /* * We are keeping the vote, let's see if that changes our * count of the number of sites. */ if (rep->sites + 1 > rep->nsites) rep->nsites = rep->sites + 1; if (rep->nsites > rep->asites && (ret = __rep_grow_sites(dbenv, rep->nsites)) != 0) { RPRINT(dbenv, rep, (dbenv, &mb, "Grow sites returned error %d", ret)); goto errunlock; } /* * Ignore vote1's if we're in phase 2. */ if (F_ISSET(rep, REP_F_EPHASE2)) { RPRINT(dbenv, rep, (dbenv, &mb, "In phase 2, ignoring vote1")); goto errunlock; } /* * Record this vote. If we get back non-zero, we * ignore the vote. */ if ((ret = __rep_tally(dbenv, rep, *eidp, &rep->sites, vi->egen, rep->tally_off)) != 0) { RPRINT(dbenv, rep, (dbenv, &mb, "Tally returned %d, sites %d", ret, rep->sites)); ret = 0; goto errunlock; } RPRINT(dbenv, rep, (dbenv, &mb, "Incoming vote: (eid)%d (pri)%d (gen)%lu (egen)%lu [%lu,%lu]", *eidp, vi->priority, (u_long)rp->gen, (u_long)vi->egen, (u_long)rp->lsn.file, (u_long)rp->lsn.offset));#ifdef DIAGNOSTIC if (rep->sites > 1) RPRINT(dbenv, rep, (dbenv, &mb, "Existing vote: (eid)%d (pri)%d (gen)%lu (sites)%d [%lu,%lu]", rep->winner, rep->w_priority, (u_long)rep->w_gen, rep->sites, (u_long)rep->w_lsn.file, (u_long)rep->w_lsn.offset));#endif __rep_cmp_vote(dbenv, rep, eidp, &rp->lsn, vi->priority, rp->gen, vi->tiebreaker); /* * If you get a vote and you're not in an election, we've * already recorded this vote. But that is all we need * to do. */ if (!IN_ELECTION(rep)) { RPRINT(dbenv, rep, (dbenv, &mb, "Not in election, but received vote1 0x%x", rep->flags)); ret = DB_REP_HOLDELECTION; goto errunlock; } master = rep->winner; lsn = rep->w_lsn; /* * We need to check sites == nsites, not more than half * like we do in __rep_elect and the VOTE2 code below. The * reason is that we want to process all the incoming votes * and not short-circuit once we reach more than half. The * real winner's vote may be in the last half. */ done = rep->sites >= rep->nsites && rep->w_priority != 0; if (done) { RPRINT(dbenv, rep, (dbenv, &mb, "Phase1 election done")); RPRINT(dbenv, rep, (dbenv, &mb, "Voting for %d%s", master, master == rep->eid ? "(self)" : "")); egen = rep->egen; F_SET(rep, REP_F_EPHASE2); F_CLR(rep, REP_F_EPHASE1); if (master == rep->eid) { (void)__rep_tally(dbenv, rep, rep->eid, &rep->votes, egen, rep->v2tally_off); goto errunlock; } MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); /* Vote for someone else. */ __rep_send_vote(dbenv, NULL, 0, 0, 0, 0, egen, master, REP_VOTE2); } else MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); /* Election is still going on. */ break; case REP_VOTE2: RPRINT(dbenv, rep, (dbenv, &mb, "We received a vote%s", F_ISSET(rep, REP_F_MASTER) ? " (master)" : "")); if (F_ISSET(rep, REP_F_MASTER)) { R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); rep->stat.st_elections_won++; (void)__rep_send_message(dbenv, *eidp, REP_NEWMASTER, &lsn, NULL, 0); goto errlock; } MUTEX_LOCK(dbenv, db_rep->rep_mutexp); /* If we have priority 0, we should never get a vote. */ DB_ASSERT(rep->priority != 0); /* * We might be the last to the party and we haven't had * time to tally all the vote1's, but others have and * decided we're the winner. So, if we're in the process * of tallying sites, keep the vote so that when our * election thread catches up we'll have the votes we * already received. */ vi = (REP_VOTE_INFO *)rec->data; if (!IN_ELECTION_TALLY(rep) && vi->egen >= rep->egen) { RPRINT(dbenv, rep, (dbenv, &mb, "Not in election gen %lu, at %lu, got vote", (u_long)vi->egen, (u_long)rep->egen)); ret = DB_REP_HOLDELECTION; goto errunlock; } /* * Record this vote. In a VOTE2, the only valid entry * in the REP_VOTE_INFO is the election generation. * * There are several things which can go wrong that we * need to account for: * 1. If we receive a latent VOTE2 from an earlier election, * we want to ignore it. * 2. If we receive a VOTE2 from a site from which we never * received a VOTE1, we want to ignore it. * 3. If we have received a duplicate VOTE2 from this election * from the same site we want to ignore it. * 4. If this is from the current election and someone is * really voting for us, then we finally get to record it. */ /* * __rep_cmp_vote2 checks for cases 1 and 2. */ if ((ret = __rep_cmp_vote2(dbenv, rep, *eidp, vi->egen)) != 0) { ret = 0; goto errunlock; } /* * __rep_tally takes care of cases 3 and 4. */ if ((ret = __rep_tally(dbenv, rep, *eidp, &rep->votes, vi->egen, rep->v2tally_off)) != 0) { ret = 0; goto errunlock; } done = rep->votes >= rep->nvotes; RPRINT(dbenv, rep, (dbenv, &mb, "Counted vote %d of %d", rep->votes, rep->nvotes)); if (done) { __rep_elect_master(dbenv, rep, eidp); ret = DB_REP_NEWMASTER; goto errunlock; } else MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); break; default: __db_err(dbenv, "DB_ENV->rep_process_message: unknown replication message: type %lu", (u_long)rp->rectype); ret = EINVAL; goto errlock; } /* * If we already hold rep_mutexp then we goto 'errunlock' * Otherwise we goto 'errlock' to acquire it before we * decrement our message thread count. */errlock: MUTEX_LOCK(dbenv, db_rep->rep_mutexp);errunlock: rep->msg_th--; MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);out: if (ret == 0 && F_ISSET(rp, DB_LOG_PERM)) { if (ret_lsnp != NULL) *ret_lsnp = rp->lsn; ret = DB_REP_NOTPERM; } return (ret);}/* * __rep_apply -- * * Handle incoming log records on a client, applying when possible and * entering into the bookkeeping table otherwise. This routine manages * the state of the incoming message stream -- processing records, via * __rep_process_rec, when possible and enqueuing in the __db.rep.db * when necessary. As gaps in the stream are filled in, this is where * we try to process as much as possible from __db.rep.db to catch up. */static int__rep_apply(dbenv, rp, rec, ret_lsnp, is_dupp) DB_ENV *dbenv; REP_CONTROL *rp; DBT *rec; DB_LSN *ret_lsnp; int *is_dupp;{ DB_REP *db_rep; DBT control_dbt, key_dbt; DBT rec_dbt; DB *dbp; DB_LOG *dblp; DB_LSN max_lsn; LOG *lp; REP *rep; u_int32_t rectype; int cmp, ret;#ifdef DIAGNOSTIC DB_MSGBUF mb;#endif db_rep = dbenv->rep_handle; rep = db_rep->region; dbp = db_rep->rep_db; rectype = 0; ret = 0; memset(&control_dbt, 0, sizeof(control_dbt)); memset(&rec_dbt, 0, sizeof(rec_dbt)); ZERO_LSN(max_lsn); dblp = dbenv->lg_handle; MUTEX_LOCK(dbenv, db_rep->db_mutexp); lp = dblp->reginfo.primary; MUTEX_LOCK(dbenv, db_rep->rep_mutexp); if (F_ISSET(rep, REP_F_RECOVER_LOG) && log_compare(&lp->ready_lsn, &rep->first_lsn) < 0) lp->ready_lsn = rep->first_lsn; MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); cmp = log_compare(&rp->lsn, &lp->ready_lsn); if (cmp == 0) { if ((ret = __rep_process_rec(dbenv, rp, rec, &rectype, &max_lsn)) != 0) goto err; /* * If we get the record we are expecting, reset * the count of records we've received and are applying * towards the request interval. */ lp->rcvd_recs = 0; while (ret == 0 && log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0) { /* * We just filled in a gap in the log record stream. * Write subsequent records to the log. */gap_check: lp->rcvd_recs = 0; ZERO_LSN(lp->max_wait_lsn); if ((ret = __rep_remfirst(dbenv, &control_dbt, &rec_dbt)) != 0) goto err; rp = (REP_CONTROL *)control_dbt.data; rec = &rec_dbt; if ((ret = __rep_process_rec(dbenv, rp, rec, &rectype, &max_lsn)) != 0) goto err; /* * We may miscount, as we don't hold the rep mutex. */ --rep->stat.st_log_queued; if ((ret = __rep_getnext(dbenv)) == DB_NOTFOUND) { ret = 0; break; } else if (ret != 0) goto err; } /* * Check if we're at a gap in the table and if so, whether we * need to ask for any records. */ if (!IS_ZERO_LSN(lp->waiting_lsn) && log_compare(&lp->ready_lsn, &lp->waiting_lsn) != 0) { /* * We got a record and processed it, but we may * still be waiting for more records. */ if (__rep_check_doreq(dbenv, rep)) __rep_loggap_req(dbenv, rep, &rp->lsn, 0); } else { lp->wait_recs = 0; ZERO_LSN(lp->max_wait_lsn); } } else if (cmp > 0) { /* * The LSN is higher than the one we were waiting for. * This record isn't in sequence; add it to the temporary * database, update waiting_lsn if necessary, and perform * calculations to determine if we should issue requests * for new records. */ memset(&key_dbt, 0, sizeof(key_dbt)); key_dbt.data = rp; key_dbt.size = sizeof(*rp); if (lp->wait_recs == 0) { /* * This is a new gap. Initialize the number of * records that we should wait before requesting * that it be resent. We grab the limits out of * the rep without the mutex. */ lp->wait_recs = rep->request_gap; lp->rcvd_recs = 0; ZERO_LSN(lp->max_wait_lsn); } if (__rep_check_doreq(dbenv, rep)) __rep_loggap_req(dbenv, rep, &rp->lsn, 0); ret = __db_put(dbp, NULL, &key_dbt, rec, DB_NOOVERWRITE); rep->stat.st_log_queued++; rep->stat.st_log_queued_total++; if (rep->stat.st_log_queued_max < rep->stat.st_log_queued) rep->stat.st_log_queued_max = rep->stat.st_log_queued; if (ret == DB_KEYEXIST) ret = 0; if (ret != 0) goto done; if (IS_ZERO_LSN(lp->waiting_lsn) || log_compare(&rp->lsn, &lp->waiting_lsn) < 0) lp->waiting_lsn = rp->lsn; /* * If this is permanent; let the caller know that we have * not yet written it to disk, but we've accepted it. */ if (ret == 0 && F_ISSET(rp, DB_LOG_PERM)) { max_lsn = rp->lsn; ret = DB_REP_NOTPERM; } goto done; } else { /* * We may miscount if we race, since we * don't currently hold the rep mutex. */ rep->stat.st_log_duplicated++; if (is_dupp != NULL) *is_dupp = 1; if (F_ISSET(rp, DB_LOG_PERM)) max_lsn = lp->max_perm_lsn; goto done; } /* Check if we need to go back into the table. */ if (ret == 0 && log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0) goto gap_check;done:err: /* Check if we need to go back into the table. */ MUTEX_LOCK(dbenv, db_rep->rep_mutexp); if (ret == 0 && F_ISSET(rep, REP_F_RECOVER_LOG) && log_compare(&lp->ready_lsn, &rep->last_lsn) >= 0) { rep->last_lsn = max_lsn;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -