📄 rep_method.c
字号:
*rdbpp= dbp; if (0) {err: if (dbp != NULL && (t_ret = __db_close(dbp, NULL, DB_NOSYNC)) != 0 && ret == 0) ret = t_ret; *rdbpp = NULL; } return (ret);}/* * __rep_bt_cmp -- * * Comparison function for the LSN table. We use the entire control * structure as a key (for simplicity, so we don't have to merge the * other fields in the control with the data field), but really only * care about the LSNs. */static int__rep_bt_cmp(dbp, dbt1, dbt2) DB *dbp; const DBT *dbt1, *dbt2;{ DB_LSN lsn1, lsn2; REP_CONTROL *rp1, *rp2; COMPQUIET(dbp, NULL); rp1 = dbt1->data; rp2 = dbt2->data; (void)__ua_memcpy(&lsn1, &rp1->lsn, sizeof(DB_LSN)); (void)__ua_memcpy(&lsn2, &rp2->lsn, sizeof(DB_LSN)); if (lsn1.file > lsn2.file) return (1); if (lsn1.file < lsn2.file) return (-1); if (lsn1.offset > lsn2.offset) return (1); if (lsn1.offset < lsn2.offset) return (-1); return (0);}/* * __rep_abort_prepared -- * Abort any prepared transactions that recovery restored. * * This is used by clients that have just run recovery, since * they cannot/should not call txn_recover and handle prepared transactions * themselves. */static int__rep_abort_prepared(dbenv) DB_ENV *dbenv;{#define PREPLISTSIZE 50 DB_PREPLIST prep[PREPLISTSIZE], *p; DB_TXNMGR *mgr; DB_TXNREGION *region; int do_aborts, ret; long count, i; u_int32_t op; mgr = dbenv->tx_handle; region = mgr->reginfo.primary; do_aborts = 0; R_LOCK(dbenv, &mgr->reginfo); if (region->stat.st_nrestores != 0) do_aborts = 1; R_UNLOCK(dbenv, &mgr->reginfo); if (do_aborts) { op = DB_FIRST; do { if ((ret = __txn_recover(dbenv, prep, PREPLISTSIZE, &count, op)) != 0) return (ret); for (i = 0; i < count; i++) { p = &prep[i]; if ((ret = __txn_abort(p->txn)) != 0) return (ret); } op = DB_NEXT; } while (count == PREPLISTSIZE); } return (0);}/* * __rep_restore_prepared -- * Restore to a prepared state any prepared but not yet committed * transactions. * * This performs, in effect, a "mini-recovery"; it is called from * __rep_start by newly upgraded masters. There may be transactions that an * old master prepared but did not resolve, which we need to restore to an * active state. */static int__rep_restore_prepared(dbenv) DB_ENV *dbenv;{ DB_LOGC *logc; DB_LSN ckp_lsn, lsn; DBT rec; __txn_ckp_args *ckp_args; __txn_regop_args *regop_args; __txn_xa_regop_args *prep_args; int ret, t_ret; u_int32_t hi_txn, low_txn, rectype, status; void *txninfo; txninfo = NULL; ckp_args = NULL; prep_args = NULL; regop_args = NULL; ZERO_LSN(ckp_lsn); ZERO_LSN(lsn); if ((ret = __log_cursor(dbenv, &logc)) != 0) return (ret); /* * We need to consider the set of records between the most recent * checkpoint LSN and the end of the log; any txn in that * range, and only txns in that range, could still have been * active, and thus prepared but not yet committed (PBNYC), * when the old master died. * * Find the most recent checkpoint LSN, and get the record there. * If there is no checkpoint in the log, start off by getting * the very first record in the log instead. */ memset(&rec, 0, sizeof(DBT)); if ((ret = __txn_getckp(dbenv, &lsn)) == 0) { if ((ret = __log_c_get(logc, &lsn, &rec, DB_SET)) != 0) { __db_err(dbenv, "Checkpoint record at LSN [%lu][%lu] not found", (u_long)lsn.file, (u_long)lsn.offset); goto err; } if ((ret = __txn_ckp_read(dbenv, rec.data, &ckp_args)) != 0) { __db_err(dbenv, "Invalid checkpoint record at [%lu][%lu]", (u_long)lsn.file, (u_long)lsn.offset); goto err; } ckp_lsn = ckp_args->ckp_lsn; __os_free(dbenv, ckp_args); if ((ret = __log_c_get(logc, &ckp_lsn, &rec, DB_SET)) != 0) { __db_err(dbenv, "Checkpoint LSN record [%lu][%lu] not found", (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset); goto err; } } else if ((ret = __log_c_get(logc, &lsn, &rec, DB_FIRST)) != 0) { if (ret == DB_NOTFOUND) { /* An empty log means no PBNYC txns. */ ret = 0; goto done; } __db_err(dbenv, "Attempt to get first log record failed"); goto err; } /* * We use the same txnlist infrastructure that recovery does; * it demands an estimate of the high and low txnids for * initialization. * * First, the low txnid. */ do { /* txnid is after rectype, which is a u_int32. */ memcpy(&low_txn, (u_int8_t *)rec.data + sizeof(u_int32_t), sizeof(low_txn)); if (low_txn != 0) break; } while ((ret = __log_c_get(logc, &lsn, &rec, DB_NEXT)) == 0); /* If there are no txns, there are no PBNYC txns. */ if (ret == DB_NOTFOUND) { ret = 0; goto done; } else if (ret != 0) goto err; /* Now, the high txnid. */ if ((ret = __log_c_get(logc, &lsn, &rec, DB_LAST)) != 0) { /* * Note that DB_NOTFOUND is unacceptable here because we * had to have looked at some log record to get this far. */ __db_err(dbenv, "Final log record not found"); goto err; } do { /* txnid is after rectype, which is a u_int32. */ memcpy(&hi_txn, (u_int8_t *)rec.data + sizeof(u_int32_t), sizeof(hi_txn)); if (hi_txn != 0) break; } while ((ret = __log_c_get(logc, &lsn, &rec, DB_PREV)) == 0); if (ret == DB_NOTFOUND) { ret = 0; goto done; } else if (ret != 0) goto err; /* We have a high and low txnid. Initialise the txn list. */ if ((ret = __db_txnlist_init(dbenv, low_txn, hi_txn, NULL, &txninfo)) != 0) goto err; /* * Now, walk backward from the end of the log to ckp_lsn. Any * prepares that we hit without first hitting a commit or * abort belong to PBNYC txns, and we need to apply them and * restore them to a prepared state. * * Note that we wind up applying transactions out of order. * Since all PBNYC txns still held locks on the old master and * were isolated, this should be safe. */ for (ret = __log_c_get(logc, &lsn, &rec, DB_LAST); ret == 0 && log_compare(&lsn, &ckp_lsn) > 0; ret = __log_c_get(logc, &lsn, &rec, DB_PREV)) { memcpy(&rectype, rec.data, sizeof(rectype)); switch (rectype) { case DB___txn_regop: /* * It's a commit or abort--but we don't care * which! Just add it to the list of txns * that are resolved. */ if ((ret = __txn_regop_read(dbenv, rec.data, ®op_args)) != 0) goto err; ret = __db_txnlist_find(dbenv, txninfo, regop_args->txnid->txnid, &status); if (ret == DB_NOTFOUND) ret = __db_txnlist_add(dbenv, txninfo, regop_args->txnid->txnid, regop_args->opcode, &lsn); else if (ret != 0) goto err; __os_free(dbenv, regop_args); break; case DB___txn_xa_regop: /* * It's a prepare. If its not aborted and * we haven't put the txn on our list yet, it * hasn't been resolved, so apply and restore it. */ if ((ret = __txn_xa_regop_read(dbenv, rec.data, &prep_args)) != 0) goto err; ret = __db_txnlist_find(dbenv, txninfo, prep_args->txnid->txnid, &status); if (ret == DB_NOTFOUND) { if (prep_args->opcode == TXN_ABORT) ret = __db_txnlist_add(dbenv, txninfo, prep_args->txnid->txnid, prep_args->opcode, &lsn); else if ((ret = __rep_process_txn(dbenv, &rec)) == 0) ret = __txn_restore_txn(dbenv, &lsn, prep_args); } else if (ret != 0) goto err; __os_free(dbenv, prep_args); break; default: continue; } } /* It's not an error to have hit the beginning of the log. */ if (ret == DB_NOTFOUND) ret = 0;done:err: t_ret = __log_c_close(logc); if (txninfo != NULL) __db_txnlist_end(dbenv, txninfo); return (ret == 0 ? t_ret : ret);}static int__rep_get_limit(dbenv, gbytesp, bytesp) DB_ENV *dbenv; u_int32_t *gbytesp, *bytesp;{ DB_REP *db_rep; REP *rep; PANIC_CHECK(dbenv); ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_get_limit", DB_INIT_REP); if (!REP_ON(dbenv)) { __db_err(dbenv, "DB_ENV->get_rep_limit: database environment not properly initialized"); return (__db_panic(dbenv, EINVAL)); } db_rep = dbenv->rep_handle; rep = db_rep->region; if (gbytesp != NULL) *gbytesp = rep->gbytes; if (bytesp != NULL) *bytesp = rep->bytes; return (0);}/* * __rep_set_limit -- * Set a limit on the amount of data that will be sent during a single * invocation of __rep_process_message. */static int__rep_set_limit(dbenv, gbytes, bytes) DB_ENV *dbenv; u_int32_t gbytes, bytes;{ DB_REP *db_rep; REP *rep; PANIC_CHECK(dbenv); ENV_ILLEGAL_BEFORE_OPEN(dbenv, "DB_ENV->rep_set_limit"); ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_set_limit", DB_INIT_REP); if (!REP_ON(dbenv)) { __db_err(dbenv, "DB_ENV->set_rep_limit: database environment not properly initialized"); return (__db_panic(dbenv, EINVAL)); } db_rep = dbenv->rep_handle; rep = db_rep->region; MUTEX_LOCK(dbenv, db_rep->rep_mutexp); if (bytes > GIGABYTE) { gbytes += bytes / GIGABYTE; bytes = bytes % GIGABYTE; } rep->gbytes = gbytes; rep->bytes = bytes; MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); return (0);}/* * __rep_set_request -- * Set the minimum and maximum number of log records that we wait * before retransmitting. * UNDOCUMENTED. */static int__rep_set_request(dbenv, min, max) DB_ENV *dbenv; u_int32_t min, max;{ LOG *lp; DB_LOG *dblp; DB_REP *db_rep; REP *rep; PANIC_CHECK(dbenv); ENV_ILLEGAL_BEFORE_OPEN(dbenv, "DB_ENV->rep_set_request"); ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_set_request", DB_INIT_REP); if (!REP_ON(dbenv)) { __db_err(dbenv, "DB_ENV->set_rep_request: database environment not properly initialized"); return (__db_panic(dbenv, EINVAL)); } db_rep = dbenv->rep_handle; rep = db_rep->region; /* * Note we acquire the rep_mutexp or the db_mutexp as needed. */ MUTEX_LOCK(dbenv, db_rep->rep_mutexp); rep->request_gap = min; rep->max_gap = max; MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); MUTEX_LOCK(dbenv, db_rep->db_mutexp); dblp = dbenv->lg_handle; if (dblp != NULL && (lp = dblp->reginfo.primary) != NULL) { lp->wait_recs = 0; lp->rcvd_recs = 0; } MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); return (0);}/* * __rep_set_transport -- * Set the transport function for replication. */static int__rep_set_rep_transport(dbenv, eid, f_send) DB_ENV *dbenv; int eid; int (*f_send) __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));{ PANIC_CHECK(dbenv); if (f_send == NULL) { __db_err(dbenv, "DB_ENV->set_rep_transport: no send function specified"); return (EINVAL); } if (eid < 0) { __db_err(dbenv, "DB_ENV->set_rep_transport: eid must be greater than or equal to 0"); return (EINVAL); } dbenv->rep_send = f_send; dbenv->rep_eid = eid; return (0);}/* * __rep_elect -- * Called after master failure to hold/participate in an election for * a new master. */static int__rep_elect(dbenv, nsites, nvotes, priority, timeout, eidp, flags) DB_ENV *dbenv; int nsites, nvotes, priority; u_int32_t timeout; int *eidp; u_int32_t flags;{ DB_LOG *dblp; DB_LSN lsn; DB_REP *db_rep; REP *rep; int ack, done, in_progress, ret, send_vote; u_int32_t egen, orig_tally, tiebreaker, to;#ifdef DIAGNOSTIC DB_MSGBUF mb;#endif PANIC_CHECK(dbenv); COMPQUIET(flags, 0); ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_elect", DB_INIT_REP); /* Error checking. */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -