📄 rep_record.c
字号:
int eid; u_int32_t egen;{ int i; REP_VTALLY *tally, *vtp;#ifdef DIAGNOSTIC DB_MSGBUF mb;#endif tally = R_ADDR((REGINFO *)dbenv->reginfo, rep->tally_off); i = 0; vtp = &tally[i]; for (i = 0; i < rep->sites; i++) { vtp = &tally[i]; if (vtp->eid == eid && vtp->egen == egen) { RPRINT(dbenv, rep, (dbenv, &mb, "Found matching vote1 (%d, %lu), at %d of %d", eid, (u_long)egen, i, rep->sites)); return (0); } } RPRINT(dbenv, rep, (dbenv, &mb, "Didn't find vote1 for eid %d, egen %lu", eid, (u_long)egen)); return (1);}static int__rep_dorecovery(dbenv, lsnp, trunclsnp) DB_ENV *dbenv; DB_LSN *lsnp, *trunclsnp;{ DB_LSN lsn; DB_REP *db_rep; DBT mylog; DB_LOGC *logc; int ret, t_ret, update; u_int32_t rectype; __txn_regop_args *txnrec; db_rep = dbenv->rep_handle; /* Figure out if we are backing out any committed transactions. */ if ((ret = __log_cursor(dbenv, &logc)) != 0) return (ret); memset(&mylog, 0, sizeof(mylog)); update = 0; while (update == 0 && (ret = __log_c_get(logc, &lsn, &mylog, DB_PREV)) == 0 && log_compare(&lsn, lsnp) > 0) { memcpy(&rectype, mylog.data, sizeof(rectype)); if (rectype == DB___txn_regop) { if ((ret = __txn_regop_read(dbenv, mylog.data, &txnrec)) != 0) goto err; if (txnrec->opcode != TXN_ABORT) update = 1; __os_free(dbenv, txnrec); } } /* * If we successfully run recovery, we've opened all the necessary * files. We are guaranteed to be single-threaded here, so no mutex * is necessary. */ if ((ret = __db_apprec(dbenv, lsnp, trunclsnp, update, 0)) == 0) F_SET(db_rep, DBREP_OPENFILES);err: if ((t_ret = __log_c_close(logc)) != 0 && ret == 0) ret = t_ret; return (ret);}/* * __rep_verify_match -- * We have just received a matching log record during verification. * Figure out if we're going to need to run recovery. If so, wait until * everything else has exited the library. If not, set up the world * correctly and move forward. */static int__rep_verify_match(dbenv, reclsnp, savetime) DB_ENV *dbenv; DB_LSN *reclsnp; time_t savetime;{ DB_LOG *dblp; DB_LSN trunclsn; DB_REP *db_rep; LOG *lp; REGENV *renv; REGINFO *infop; REP *rep; int done, master, ret; u_int32_t unused; dblp = dbenv->lg_handle; db_rep = dbenv->rep_handle; rep = db_rep->region; lp = dblp->reginfo.primary; ret = 0; infop = dbenv->reginfo; renv = infop->primary; /* * Check if the savetime is different than our current time stamp. * If it is, then we're racing with another thread trying to recover * and we lost. We must give up. */ MUTEX_LOCK(dbenv, db_rep->db_mutexp); done = savetime != renv->rep_timestamp; if (done) { MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); return (0); } ZERO_LSN(lp->verify_lsn); MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); /* * Make sure the world hasn't changed while we tried to get * the lock. If it hasn't then it's time for us to kick all * operations out of DB and run recovery. */ MUTEX_LOCK(dbenv, db_rep->rep_mutexp); if (!F_ISSET(rep, REP_F_RECOVER_LOG) && (F_ISSET(rep, REP_F_READY) || rep->in_recovery != 0)) { rep->stat.st_msgs_recover++; goto errunlock; } __rep_lockout(dbenv, db_rep, rep, 1); /* OK, everyone is out, we can now run recovery. */ MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); if ((ret = __rep_dorecovery(dbenv, reclsnp, &trunclsn)) != 0) { MUTEX_LOCK(dbenv, db_rep->rep_mutexp); rep->in_recovery = 0; F_CLR(rep, REP_F_READY); goto errunlock; } /* * The log has been truncated (either directly by us or by __db_apprec) * We want to make sure we're waiting for the LSN at the new end-of-log, * not some later point. */ MUTEX_LOCK(dbenv, db_rep->db_mutexp); lp->ready_lsn = trunclsn; ZERO_LSN(lp->waiting_lsn); ZERO_LSN(lp->max_wait_lsn); lp->max_perm_lsn = *reclsnp; lp->wait_recs = 0; lp->rcvd_recs = 0; ZERO_LSN(lp->verify_lsn); /* * Discard any log records we have queued; we're about to re-request * them, and can't trust the ones in the queue. We need to set the * DB_AM_RECOVER bit in this handle, so that the operation doesn't * deadlock. */ F_SET(db_rep->rep_db, DB_AM_RECOVER); MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); ret = __db_truncate(db_rep->rep_db, NULL, &unused); MUTEX_LOCK(dbenv, db_rep->db_mutexp); F_CLR(db_rep->rep_db, DB_AM_RECOVER); MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); MUTEX_LOCK(dbenv, db_rep->rep_mutexp); rep->stat.st_log_queued = 0; rep->in_recovery = 0; F_CLR(rep, REP_F_NOARCHIVE | REP_F_RECOVER_MASK); if (ret != 0) goto errunlock; /* * If the master_id is invalid, this means that since * the last record was sent, somebody declared an * election and we may not have a master to request * things of. * * This is not an error; when we find a new master, * we'll re-negotiate where the end of the log is and * try to bring ourselves up to date again anyway. * * !!! * We cannot assert the election flags though because * somebody may have declared an election and then * got an error, thus clearing the election flags * but we still have an invalid master_id. */ master = rep->master_id; MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); if (master == DB_EID_INVALID) ret = 0; else (void)__rep_send_message(dbenv, master, REP_ALL_REQ, reclsnp, NULL, 0); if (0) {errunlock: MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); } return (ret);}/* * __rep_do_ckp -- * Perform the memp_sync necessary for this checkpoint without holding * the db_rep->db_mutexp. All callers of this function must hold the * db_rep->db_mutexp and must not be holding the db_rep->rep_mutexp. */static int__rep_do_ckp(dbenv, rec, rp) DB_ENV *dbenv; DBT *rec; REP_CONTROL *rp;{ DB_LSN ckp_lsn; DB_REP *db_rep; int ret; db_rep = dbenv->rep_handle; MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); DB_TEST_CHECKPOINT(dbenv, dbenv->test_check); /* Sync the memory pool. */ memcpy(&ckp_lsn, (u_int8_t *)rec->data + SSZ(__txn_ckp_args, ckp_lsn), sizeof(DB_LSN)); ret = __memp_sync(dbenv, &ckp_lsn); /* Update the last_ckp in the txn region. */ if (ret == 0) __txn_updateckp(dbenv, &rp->lsn); else { __db_err(dbenv, "Error syncing ckp [%lu][%lu]", (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset); ret = __db_panic(dbenv, ret); } MUTEX_LOCK(dbenv, db_rep->db_mutexp); return (ret);}/* * __rep_remfirst -- * Remove the first entry from the __db.rep.db */static int__rep_remfirst(dbenv, cntrl, rec) DB_ENV *dbenv; DBT *cntrl; DBT *rec;{ DB *dbp; DBC *dbc; DB_REP *db_rep; int ret, t_ret; u_int32_t rectype; db_rep = dbenv->rep_handle; dbp = db_rep->rep_db; if ((ret = __db_cursor(dbp, NULL, &dbc, 0)) != 0) return (ret); /* The DBTs need to persist through another call. */ F_SET(cntrl, DB_DBT_REALLOC); F_SET(rec, DB_DBT_REALLOC); if ((ret = __db_c_get(dbc, cntrl, rec, DB_RMW | DB_FIRST)) == 0) { memcpy(&rectype, rec->data, sizeof(rectype)); ret = __db_c_del(dbc, 0); } if ((t_ret = __db_c_close(dbc)) != 0 && ret == 0) ret = t_ret; return (ret);}/* * __rep_getnext -- * Get the next record out of the __db.rep.db table. */static int__rep_getnext(dbenv) DB_ENV *dbenv;{ DB *dbp; DB_REP *db_rep; DB_LOG *dblp; DBC *dbc; DBT lsn_dbt, nextrec_dbt; LOG *lp; REP_CONTROL *rp; int ret, t_ret; dblp = dbenv->lg_handle; lp = dblp->reginfo.primary; db_rep = dbenv->rep_handle; dbp = db_rep->rep_db; if ((ret = __db_cursor(dbp, NULL, &dbc, 0)) != 0) return (ret); /* * Update waiting_lsn. We need to move it * forward to the LSN of the next record * in the queue. * * If the next item in the database is a log * record--the common case--we're not * interested in its contents, just in its LSN. * Optimize by doing a partial get of the data item. */ memset(&nextrec_dbt, 0, sizeof(nextrec_dbt)); F_SET(&nextrec_dbt, DB_DBT_PARTIAL); nextrec_dbt.ulen = nextrec_dbt.dlen = 0; memset(&lsn_dbt, 0, sizeof(lsn_dbt)); ret = __db_c_get(dbc, &lsn_dbt, &nextrec_dbt, DB_FIRST); if (ret != DB_NOTFOUND && ret != 0) goto err; if (ret == DB_NOTFOUND) { ZERO_LSN(lp->waiting_lsn); /* * Whether or not the current record is * simple, there's no next one, and * therefore we haven't got anything * else to do right now. Break out. */ goto err; } rp = (REP_CONTROL *)lsn_dbt.data; lp->waiting_lsn = rp->lsn;err: if ((t_ret = __db_c_close(dbc)) != 0 && ret == 0) ret = t_ret; return (ret);}/* * __rep_process_rec -- * * Given a record in 'rp', process it. In the case of a NEWFILE, that means * potentially switching files. In the case of a checkpoint, it means doing * the checkpoint, and in other cases, it means simply writing the record into * the log. */static int__rep_process_rec(dbenv, rp, rec, typep, ret_lsnp) DB_ENV *dbenv; REP_CONTROL *rp; DBT *rec; u_int32_t *typep; DB_LSN *ret_lsnp;{ DB *dbp; DB_LOG *dblp; DB_REP *db_rep; DBT control_dbt, key_dbt, rec_dbt; LOG *lp; REP *rep; u_int32_t txnid; int ret, t_ret; db_rep = dbenv->rep_handle; rep = db_rep->region; dbp = db_rep->rep_db; dblp = dbenv->lg_handle; lp = dblp->reginfo.primary; ret = 0; if (rp->rectype == REP_NEWFILE) { ret = __rep_newfile(dbenv, rp, &lp->ready_lsn); /* Make this evaluate to a simple rectype. */ *typep = 0; return (0); } memcpy(typep, rec->data, sizeof(*typep)); memset(&control_dbt, 0, sizeof(control_dbt)); memset(&rec_dbt, 0, sizeof(rec_dbt)); /* * We write all records except for checkpoint records here. * All non-checkpoint records need to appear in the log before * we take action upon them (i.e., we enforce write-ahead logging). * However, we can't write the checkpoint record here until the * data buffers are actually written to disk, else we are creating * an invalid log -- one that says all data before a certain point * has been written to disk. * * If two threads are both processing the same checkpoint record * (because, for example, it was resent and the original finally * arrived), we handle that below by checking for the existence of * the log record when we add it to the replication database. * * Any log records that arrive while we are processing the checkpoint * are added to the bookkeeping database because ready_lsn is not yet * updated to point after the checkpoint record. */ if (*typep != DB___txn_ckp || F_ISSET(rep, REP_F_RECOVER_LOG)) { if ((ret = __log_rep_put(dbenv, &rp->lsn, rec)) != 0) return (ret); rep->stat.st_log_records++; if (F_ISSET(rep, REP_F_RECOVER_LOG)) { *ret_lsnp = rp->lsn; goto out; } } switch (*typep) { case DB___dbreg_register: /* * DB opens occur in the context of a transaction, so we can * simply handle them when we process the transaction. Closes, * however, are not transaction-protected, so we have to * handle them here. * * Note that it should be unsafe for the master to do a close * of a file that was opened in an active transaction, so we * should be guaranteed to get the ordering right. */ memcpy(&txnid, (u_int8_t *)rec->data + SSZ(__dbreg_register_args, txnid), sizeof(u_int32_t)); if (txnid == TXN_INVALID) ret = __db_dispatch(dbenv, dbenv->recover_dtab, dbenv->recover_dtab_size, rec, &rp->lsn, DB_TXN_APPLY, NULL); break; case DB___txn_regop: /* * If an application is doing app-specific recovery * and acquires locks while applying a transaction, * it can deadlock. Any other locks held by this * thread should have been discarded in the * __rep_process_txn error path, so if we simply * retry, we should eventually succeed. */ do { ret = 0; if (!F_ISSET(db_rep, DBREP_OPENFILES)) { ret = __txn_openfiles(dbenv, NULL, 1); F_SET(db_rep, DBREP_OPENFILES); } if (ret == 0) ret = __rep_process_txn(dbenv, rec); } while (ret == DB_LOCK_DEADLOCK); /* Now flush the log unless we're running TXN_NOSYNC. */ if (ret == 0 && !F_ISSET(dbenv, DB_ENV_TXN_NOSYNC)) ret = __log_flush(dbenv, NULL); if (ret != 0) { __db_err(dbenv, "Error processing txn [%lu][%lu]", (u_long)rp->lsn.file, (u_long)rp->lsn.off
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -