📄 rep_record.c
字号:
} /* * Check if we're at a gap in the table and if so, whether we * need to ask for any records. */ do_req = 0; if (!IS_ZERO_LSN(lp->waiting_lsn) && log_compare(&lp->ready_lsn, &lp->waiting_lsn) != 0) { next_lsn = lp->ready_lsn; do_req = ++lp->rcvd_recs >= lp->wait_recs; if (do_req) { lp->wait_recs = rep->request_gap; lp->rcvd_recs = 0; } } R_UNLOCK(dbenv, &dblp->reginfo); if (dbc != NULL) { if ((ret = dbc->c_close(dbc)) != 0) goto err; MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); have_mutex = 0; } dbc = NULL; if (do_req) { MUTEX_LOCK(dbenv, db_rep->mutexp); eid = db_rep->region->master_id; MUTEX_UNLOCK(dbenv, db_rep->mutexp); if (eid != DB_EID_INVALID) { rep->stat.st_log_requested++; if ((ret = __rep_send_message(dbenv, eid, REP_LOG_REQ, &next_lsn, NULL, 0)) != 0) goto err; } } } else if (cmp > 0) { /* * The LSN is higher than the one we were waiting for. * If it is a NEWFILE message, this may not mean that * there's a gap; in some cases, NEWFILE messages contain * the LSN of the beginning of the new file instead * of the end of the old. * * In these cases, the rec DBT will contain the last LSN * of the old file, so we can tell whether there's a gap. */ if (rp->rectype == REP_NEWFILE && rp->lsn.file == lp->ready_lsn.file + 1 && rp->lsn.offset == 0) { DB_ASSERT(rec != NULL && rec->data != NULL && rec->size == sizeof(DB_LSN)); memcpy(&lsn, rec->data, sizeof(DB_LSN)); if (log_compare(&lp->ready_lsn, &lsn) > 0) /* * The last LSN in the old file is smaller * than the one we're expecting, so there's * no gap--the one we're expecting just * doesn't exist. */ goto newfile; } /* * This record isn't in sequence; add it to the table and * update waiting_lsn if necessary. */ memset(&key_dbt, 0, sizeof(key_dbt)); key_dbt.data = rp; key_dbt.size = sizeof(*rp); next_lsn = lp->lsn; do_req = 0; if (lp->wait_recs == 0) { /* * This is a new gap. Initialize the number of * records that we should wait before requesting * that it be resent. We grab the limits out of * the rep without the mutex. */ lp->wait_recs = rep->request_gap; lp->rcvd_recs = 0; } if (++lp->rcvd_recs >= lp->wait_recs) { /* * If we've waited long enough, request the record * and double the wait interval. */ do_req = 1; lp->wait_recs <<= 1; lp->rcvd_recs = 0; if (lp->wait_recs > rep->max_gap) lp->wait_recs = rep->max_gap; } R_UNLOCK(dbenv, &dblp->reginfo); MUTEX_LOCK(dbenv, db_rep->db_mutexp); ret = dbp->put(dbp, NULL, &key_dbt, rec, 0); rep->stat.st_log_queued++; rep->stat.st_log_queued_total++; if (rep->stat.st_log_queued_max < rep->stat.st_log_queued) rep->stat.st_log_queued_max = rep->stat.st_log_queued; MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); if (ret != 0) return (ret); R_LOCK(dbenv, &dblp->reginfo); if (IS_ZERO_LSN(lp->waiting_lsn) || log_compare(&rp->lsn, &lp->waiting_lsn) < 0) lp->waiting_lsn = rp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); if (do_req) { /* Request the LSN we are still waiting for. */ MUTEX_LOCK(dbenv, db_rep->mutexp); /* May as well do this after we grab the mutex. */ eid = db_rep->region->master_id; /* * If the master_id is invalid, this means that since * the last record was sent, somebody declared an * election and we may not have a master to request * things of. * * This is not an error; when we find a new master, * we'll re-negotiate where the end of the log is and * try to to bring ourselves up to date again anyway. */ if (eid != DB_EID_INVALID) { rep->stat.st_log_requested++; MUTEX_UNLOCK(dbenv, db_rep->mutexp); ret = __rep_send_message(dbenv, eid, REP_LOG_REQ, &next_lsn, NULL, 0); } else MUTEX_UNLOCK(dbenv, db_rep->mutexp); } return (ret); } else { R_UNLOCK(dbenv, &dblp->reginfo); /* * We may miscount if we race, since we * don't currently hold the rep mutex. */ rep->stat.st_log_duplicated++; } if (ret != 0 || cmp < 0 || (cmp == 0 && IS_SIMPLE(rectype))) goto done; /* * If we got here, then we've got a log record in rp and rec that * we need to process. */ switch(rectype) { case DB___dbreg_register: /* * DB opens occur in the context of a transaction, so we can * simply handle them when we process the transaction. Closes, * however, are not transaction-protected, so we have to * handle them here. * * Note that it should be unsafe for the master to do a close * of a file that was opened in an active transaction, so we * should be guaranteed to get the ordering right. */ memcpy(&txnid, (u_int8_t *)rec->data + ((u_int8_t *)&dbreg_args.txnid - (u_int8_t *)&dbreg_args), sizeof(u_int32_t)); if (txnid == TXN_INVALID && !F_ISSET(dbenv, DB_ENV_REP_LOGSONLY)) ret = __db_dispatch(dbenv, dbenv->recover_dtab, dbenv->recover_dtab_size, rec, &rp->lsn, DB_TXN_APPLY, NULL); break; case DB___txn_ckp: /* Sync the memory pool. */ memcpy(&ckp_lsn, (u_int8_t *)rec->data + ((u_int8_t *)&ckp_args.ckp_lsn - (u_int8_t *)&ckp_args), sizeof(DB_LSN)); if (!F_ISSET(dbenv, DB_ENV_REP_LOGSONLY)) ret = dbenv->memp_sync(dbenv, &ckp_lsn); else /* * We ought to make sure the logs on a logs-only * replica get flushed now and again. */ ret = dbenv->log_flush(dbenv, &ckp_lsn); /* Update the last_ckp in the txn region. */ if (ret == 0) __txn_updateckp(dbenv, &rp->lsn); break; case DB___txn_regop: if (!F_ISSET(dbenv, DB_ENV_REP_LOGSONLY)) do { /* * If an application is doing app-specific * recovery and acquires locks while applying * a transaction, it can deadlock. Any other * locks held by this thread should have been * discarded in the __rep_process_txn error * path, so if we simply retry, we should * eventually succeed. */ ret = __rep_process_txn(dbenv, rec); } while (ret == DB_LOCK_DEADLOCK); break; default: goto err; } /* Check if we need to go back into the table. */ if (ret == 0) { R_LOCK(dbenv, &dblp->reginfo); if (log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0) goto gap_check; R_UNLOCK(dbenv, &dblp->reginfo); }done:err: if (dbc != NULL && (t_ret = dbc->c_close(dbc)) != 0 && ret == 0) ret = t_ret; if (have_mutex) MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); if (control_dbt.data != NULL) __os_ufree(dbenv, control_dbt.data); if (rec_dbt.data != NULL) __os_ufree(dbenv, rec_dbt.data); return (ret);}/* * __rep_process_txn -- * * This is the routine that actually gets a transaction ready for * processing. * * PUBLIC: int __rep_process_txn __P((DB_ENV *, DBT *)); */int__rep_process_txn(dbenv, rec) DB_ENV *dbenv; DBT *rec;{ DBT data_dbt; DB_LOCKREQ req, *lvp; DB_LOGC *logc; DB_LSN prev_lsn, *lsnp; DB_REP *db_rep; LSN_COLLECTION lc; REP *rep; __txn_regop_args *txn_args; __txn_xa_regop_args *prep_args; u_int32_t lockid, op, rectype; int i, ret, t_ret; int (**dtab)__P((DB_ENV *, DBT *, DB_LSN *, db_recops, void *)); size_t dtabsize; void *txninfo; db_rep = dbenv->rep_handle; rep = db_rep->region; logc = NULL; txninfo = NULL; memset(&data_dbt, 0, sizeof(data_dbt)); if (F_ISSET(dbenv, DB_ENV_THREAD)) F_SET(&data_dbt, DB_DBT_REALLOC); /* * There are two phases: First, we have to traverse * backwards through the log records gathering the list * of all LSNs in the transaction. Once we have this information, * we can loop through, acquire the locks we need for each record, * and then apply it. */ dtab = NULL; /* * We may be passed a prepare (if we're restoring a prepare * on upgrade) instead of a commit (the common case). * Check which and behave appropriately. */ memcpy(&rectype, rec->data, sizeof(rectype)); memset(&lc, 0, sizeof(lc)); if (rectype == DB___txn_regop) { /* * We're the end of a transaction. Make sure this is * really a commit and not an abort! */ if ((ret = __txn_regop_read(dbenv, rec->data, &txn_args)) != 0) return (ret); op = txn_args->opcode; prev_lsn = txn_args->prev_lsn; __os_free(dbenv, txn_args); if (op != TXN_COMMIT) return (0); } else { /* We're a prepare. */ DB_ASSERT(rectype == DB___txn_xa_regop); if ((ret = __txn_xa_regop_read(dbenv, rec->data, &prep_args)) != 0) return (ret); prev_lsn = prep_args->prev_lsn; __os_free(dbenv, prep_args); } /* Phase 1. Get a list of the LSNs in this transaction, and sort it. */ if ((ret = __rep_collect_txn(dbenv, &prev_lsn, &lc)) != 0) return (ret); qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp); if ((ret = dbenv->lock_id(dbenv, &lockid)) != 0) goto err; /* Initialize the getpgno dispatch table. */ if ((ret = __rep_lockpgno_init(dbenv, &dtab, &dtabsize)) != 0) goto err; /* * The set of records for a transaction may include dbreg_register * records. Create a txnlist so that they can keep track of file * state between records. */ if ((ret = __db_txnlist_init(dbenv, 0, 0, NULL, &txninfo)) != 0) goto err; /* Phase 2: Apply updates. */ if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0) goto err; for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) { if ((ret = __rep_lockpages(dbenv, dtab, dtabsize, lsnp, NULL, NULL, lockid)) != 0) goto err; if ((ret = logc->get(logc, lsnp, &data_dbt, DB_SET)) != 0) goto err; if ((ret = __db_dispatch(dbenv, dbenv->recover_dtab, dbenv->recover_dtab_size, &data_dbt, lsnp, DB_TXN_APPLY, txninfo)) != 0) goto err; }err: memset(&req, 0, sizeof(req)); req.op = DB_LOCK_PUT_ALL; if ((t_ret = dbenv->lock_vec(dbenv, lockid, DB_LOCK_FREE_LOCKER, &req, 1, &lvp)) != 0 && ret == 0) ret = t_ret; if (lc.nalloc != 0) __os_free(dbenv, lc.array); if ((t_ret = dbenv->lock_id_free(dbenv, lockid)) != 0 && ret == 0) ret = t_ret; if (logc != NULL && (t_ret = logc->close(logc, 0)) != 0 && ret == 0) ret = t_ret; if (txninfo != NULL) __db_txnlist_end(dbenv, txninfo); if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL) __os_ufree(dbenv, data_dbt.data); if (dtab != NULL) __os_free(dbenv, dtab); if (ret == 0) /* * We don't hold the rep mutex, and could miscount if we race. */ rep->stat.st_txns_applied++; return (ret);}/* * __rep_collect_txn * Recursive function that will let us visit every entry in a transaction * chain including all child transactions so that we can then apply * the entire transaction family at once. */static int__rep_collect_txn(dbenv, lsnp, lc) DB_ENV *dbenv; DB_LSN *lsnp; LSN_COLLECTION *lc;{ __txn_child_args *argp; DB_LOGC *logc; DB_LSN c_lsn; DBT data; u_int32_t rectype; int nalloc, ret, t_ret; memset(&data, 0, sizeof(data)); F_SET(&data, DB_DBT_REALLOC); if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0) return (ret); while (!IS_ZERO_LSN(*lsnp) && (ret = logc->get(logc, lsnp, &data, DB_SET)) == 0) { memcpy(&rectype, data.data, sizeof(rectype)); if (rectype == DB___txn_child) { if ((ret = __txn_child_read(dbenv, data.data, &argp)) != 0) goto err; c_lsn = argp->c_lsn; *lsnp = argp->prev_lsn; __os_free(dbenv, argp); ret = __rep_collect_txn(dbenv, &c_lsn, lc); } else { if (lc->nalloc < lc->nlsns + 1) { nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2; if ((ret = __os_realloc(dbenv, nalloc * sizeof(DB_LSN), &lc->array)) != 0) goto err; lc->nalloc = nalloc; } lc->array[lc->nlsns++] = *lsnp; /* * Explicitly copy the previous lsn. The record * starts with a u_int32_t record type, a u_int32_t * txn id, and then the DB_LSN (prev_lsn) that we * want. We copy explicitly because we have no idea * what kind of record this is. */ memcpy(lsnp, (u_int8_t *)data.data + sizeof(u_int32_t) + sizeof(u_int32_t), sizeof(DB_LSN)); } if (ret != 0) goto err; }err: if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0) ret = t_ret; if (data.data != NULL) __os_ufree(dbenv, data.data); return (ret);}/* * __rep_lsn_cmp -- * qsort-type-compatible wrapper for log_compare. */static int__rep_lsn_cmp(lsn1, lsn2) const void *lsn1, *lsn2;{ return (log_compare((DB_LSN *)lsn1, (DB_LSN *)lsn2));}/* * __rep_newfile -- * NEWFILE messages can contain either the last LSN of the old file * or the first LSN of the new one, depending on which we have available * when the message is sent. When applying a NEWFILE message, make sure * we haven't already swapped files, as it's possible (given the right sequence * of out-of-order messages) to wind up with a NEWFILE message of each * variety, and __rep_apply won't detect the two as duplicates of each other. */static int__rep_newfile(dbenv, rc, msgdbt, lsnp) DB_ENV *dbenv; REP_CONTROL *rc; DBT *msgdbt; DB_LSN *lsnp;{ DB_LOG *dblp; LOG *lp; u_int32_t newfile; dblp = dbenv->lg_handle; lp = dblp->reginfo.primary; /* * A NEWFILE message containing the old file's LSN will be * accompanied by a NULL rec DBT; one containing the new one's LSN * will need to supply the last record in the old file by * sending it in the rec DBT. */ if (msgdbt == NULL || msgdbt->size == 0) newfile = rc->lsn.file + 1; else newfile = rc->lsn.file; if (newfile > lp->lsn.file) return (__log_newfile(dblp, lsnp)); else { /* We've already applied this NEWFILE. Just ignore it. */ *lsnp = lp->lsn; return (0); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -