📄 rep_backup.c
字号:
/* * If we get here, we do not have the file open via dbreg. * We need to open the file and then send its pages. * If we cannot open the file, we send REP_FILE_FAIL. */ RPRINT(dbenv, rep, (dbenv, &mb, "page_req: Open %d via mpf_open", msgfp->filenum)); if ((ret = __rep_mpf_open(dbenv, &mpf, msgfp)) != 0) { memset(&msgdbt, 0, sizeof(msgdbt)); msgdbt.data = msgfp; msgdbt.size = sizeof(*msgfp); RPRINT(dbenv, rep, (dbenv, &mb, "page_req: Open %d failed", msgfp->filenum)); (void)__rep_send_message(dbenv, eid, REP_FILE_FAIL, NULL, &msgdbt, 0); goto err; } ret = __rep_page_sendpages(dbenv, eid, msgfp, mpf, NULL); t_ret = __memp_fclose(mpf, 0); if (ret == 0 && t_ret != 0) ret = t_ret;err: __os_free(dbenv, msgfp); return (ret);}static int__rep_page_sendpages(dbenv, eid, msgfp, mpf, dbp) DB_ENV *dbenv; int eid; __rep_fileinfo_args *msgfp; DB_MPOOLFILE *mpf; DB *dbp;{ DB *qdbp; DBT msgdbt, pgdbt; DB_LOG *dblp; DB_LSN lsn; DB_MSGBUF mb; DB_REP *db_rep; PAGE *pagep; REP *rep; db_pgno_t p; size_t len, msgsz; u_int32_t bytes, gbytes, type; int check_limit, opened, ret, t_ret; u_int8_t *buf;#ifndef DIAGNOSTIC DB_MSGBUF_INIT(&mb);#endif db_rep = dbenv->rep_handle; rep = db_rep->region; opened = 0; gbytes = bytes = 0; MUTEX_LOCK(dbenv, db_rep->rep_mutexp); gbytes = rep->gbytes; bytes = rep->bytes; MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); check_limit = gbytes != 0 || bytes != 0; qdbp = NULL; buf = NULL; if (msgfp->type == DB_QUEUE) { if (dbp == NULL) { if ((ret = db_create(&qdbp, dbenv, 0)) != 0) goto err; if ((ret = __db_open(qdbp, NULL, msgfp->info.data, NULL, DB_UNKNOWN, DB_RDONLY | (F_ISSET(dbenv, DB_ENV_THREAD) ? DB_THREAD : 0), 0, PGNO_BASE_MD)) != 0) goto err; opened = 1; } else qdbp = dbp; } msgsz = sizeof(__rep_fileinfo_args) + DB_FILE_ID_LEN + msgfp->pgsize; if ((ret = __os_calloc(dbenv, 1, msgsz, &buf)) != 0) return (ret); memset(&msgdbt, 0, sizeof(msgdbt)); memset(&pgdbt, 0, sizeof(pgdbt)); RPRINT(dbenv, rep, (dbenv, &mb, "sendpages: file %d page %lu to %lu", msgfp->filenum, (u_long)msgfp->pgno, (u_long)msgfp->max_pgno)); for (p = msgfp->pgno; p <= msgfp->max_pgno; p++) { if (msgfp->type == DB_QUEUE && p != 0)#ifdef HAVE_QUEUE ret = __qam_fget(qdbp, &p, DB_MPOOL_CREATE, &pagep);#else ret = DB_PAGE_NOTFOUND;#endif else ret = __memp_fget(mpf, &p, DB_MPOOL_CREATE, &pagep); type = REP_PAGE; if (ret == DB_PAGE_NOTFOUND) { memset(&pgdbt, 0, sizeof(pgdbt)); ret = 0; ZERO_LSN(lsn); RPRINT(dbenv, rep, (dbenv, &mb, "sendpages: PAGE_FAIL on page %lu", (u_long)p)); type = REP_PAGE_FAIL; msgfp->pgno = p; goto send; } else if (ret != 0) goto err; else { pgdbt.data = pagep; pgdbt.size = (u_int32_t)msgfp->pgsize; } len = 0; ret = __rep_fileinfo_buf(buf, msgsz, &len, msgfp->pgsize, p, msgfp->max_pgno, msgfp->filenum, msgfp->id, msgfp->type, msgfp->flags, &msgfp->uid, &pgdbt); if (ret != 0) goto err; DB_ASSERT(len <= msgsz); msgdbt.data = buf; msgdbt.size = (u_int32_t)len; dblp = dbenv->lg_handle; R_LOCK(dbenv, &dblp->reginfo); lsn = ((LOG *)dblp->reginfo.primary)->lsn; R_UNLOCK(dbenv, &dblp->reginfo); if (check_limit) { /* * msgdbt.size is only the size of the page and * other information we're sending. It doesn't * count the size of the control structure. Factor * that in as well so we're not off by a lot if * pages are small. */ while (bytes < msgdbt.size + sizeof(REP_CONTROL)) { if (gbytes > 0) { bytes += GIGABYTE; --gbytes; continue; } /* * We don't hold the rep mutex, and may * miscount. */ rep->stat.st_nthrottles++; type = REP_PAGE_MORE; goto send; } bytes -= (msgdbt.size + sizeof(REP_CONTROL)); }send: RPRINT(dbenv, rep, (dbenv, &mb, "sendpages: %s %lu, lsn [%lu][%lu]", (type == REP_PAGE ? "PAGE" : (type == REP_PAGE_MORE ? "PAGE_MORE" : "PAGE_FAIL")), (u_long)p, (u_long)lsn.file, (u_long)lsn.offset)); (void)__rep_send_message(dbenv, eid, type, &lsn, &msgdbt, 0); /* * If we have REP_PAGE_FAIL we need to break before trying * to give the page back to mpool. If we have REP_PAGE_MORE * we need to break this loop after giving the page back * to mpool. Otherwise, with REP_PAGE, we keep going. */ if (type == REP_PAGE_FAIL) break; if (msgfp->type != DB_QUEUE || p == 0) ret = __memp_fput(mpf, pagep, 0);#ifdef HAVE_QUEUE else /* * We don't need an #else for HAVE_QUEUE here because if * we're not compiled with queue, then we're guaranteed * to have set REP_PAGE_FAIL above. */ ret = __qam_fput(qdbp, p, pagep, 0);#endif if (type == REP_PAGE_MORE) break; }err: if (opened && (t_ret = __db_close(qdbp, NULL, DB_NOSYNC)) != 0 && ret == 0) ret = t_ret; if (buf != NULL) __os_free(dbenv, buf); return (ret);}/* * __rep_update_setup * Process and setup with this file information. * * PUBLIC: int __rep_update_setup __P((DB_ENV *, int, REP_CONTROL *, DBT *)); */int__rep_update_setup(dbenv, eid, rp, rec) DB_ENV *dbenv; int eid; REP_CONTROL *rp; DBT *rec;{ DB_LOG *dblp; DB_LSN lsn; DB_REP *db_rep; DBT pagereq_dbt; LOG *lp; REGENV *renv; REGINFO *infop; REP *rep; __rep_update_args *rup; int ret; u_int32_t count, infolen; void *next;#ifdef DIAGNOSTIC __rep_fileinfo_args *msgfp; DB_MSGBUF mb;#endif db_rep = dbenv->rep_handle; rep = db_rep->region; dblp = dbenv->lg_handle; lp = dblp->reginfo.primary; ret = 0; MUTEX_LOCK(dbenv, db_rep->rep_mutexp); if (!F_ISSET(rep, REP_F_RECOVER_UPDATE)) { MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); return (0); } F_CLR(rep, REP_F_RECOVER_UPDATE); /* * We know we're the first to come in here due to the * REP_F_RECOVER_UPDATE flag. REP_F_READY should not be set. */ DB_ASSERT(!F_ISSET(rep, REP_F_READY)); F_SET(rep, REP_F_RECOVER_PAGE); /* * We do not clear REP_F_READY or rep->in_recovery in this code. * We'll eventually call the normal __rep_verify_match recovery * code and that will clear all the flags and allow others to * proceed. */ __rep_lockout(dbenv, db_rep, rep, 1); /* * We need to update the timestamp and kill any open handles * on this client. The files are changing completely. */ infop = dbenv->reginfo; renv = infop->primary; (void)time(&renv->rep_timestamp); MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); MUTEX_LOCK(dbenv, db_rep->db_mutexp); lp->wait_recs = rep->request_gap; lp->rcvd_recs = 0; ZERO_LSN(lp->ready_lsn); ZERO_LSN(lp->waiting_lsn); ZERO_LSN(lp->max_wait_lsn); ZERO_LSN(lp->max_perm_lsn); MUTEX_UNLOCK(dbenv, db_rep->db_mutexp); if ((ret = __rep_update_read(dbenv, rec->data, &next, &rup)) != 0) goto err; R_LOCK(dbenv, &dblp->reginfo); lsn = lp->lsn; R_UNLOCK(dbenv, &dblp->reginfo); /* * We need to empty out any old log records that might be in the * temp database. */ if ((ret = __db_truncate(db_rep->rep_db, NULL, &count)) != 0) goto err; /* * If our log is before the master's beginning of log, * we need to request from the master's beginning. * If we have some log, we need the earlier of the * master's last checkpoint LSN or our current LSN. */ MUTEX_LOCK(dbenv, db_rep->rep_mutexp); if (log_compare(&lsn, &rup->first_lsn) < 0) rep->first_lsn = rup->first_lsn; else rep->first_lsn = lsn; rep->last_lsn = rp->lsn; rep->nfiles = rup->num_files; rep->curfile = 0; rep->ready_pg = 0; rep->npages = 0; rep->waiting_pg = PGNO_INVALID; rep->max_wait_pg = PGNO_INVALID; __os_free(dbenv, rup); RPRINT(dbenv, rep, (dbenv, &mb, "Update setup for %d files.", rep->nfiles)); RPRINT(dbenv, rep, (dbenv, &mb, "Update setup: First LSN [%lu][%lu].", (u_long)rep->first_lsn.file, (u_long)rep->first_lsn.offset)); RPRINT(dbenv, rep, (dbenv, &mb, "Update setup: Last LSN [%lu][%lu]", (u_long)rep->last_lsn.file, (u_long)rep->last_lsn.offset)); infolen = rec->size - sizeof(__rep_update_args); if ((ret = __os_calloc(dbenv, 1, infolen, &rep->originfo)) != 0) goto err; memcpy(rep->originfo, next, infolen); rep->finfo = rep->originfo; if ((ret = __rep_fileinfo_read(dbenv, rep->finfo, &next, &rep->curinfo)) != 0) { RPRINT(dbenv, rep, (dbenv, &mb, "Update setup: Fileinfo read: %s", db_strerror(ret))); goto errmem1; } rep->nextinfo = next;#ifdef DIAGNOSTIC msgfp = rep->curinfo; DB_ASSERT(msgfp->pgno == 0);#endif /* * We want to create/open our dbp to the database * where we'll keep our page information. */ if ((ret = __rep_client_dbinit(dbenv, 1, REP_PG)) != 0) { RPRINT(dbenv, rep, (dbenv, &mb, "Update setup: Client_dbinit %s", db_strerror(ret))); goto errmem; } /* * We should get file info 'ready to go' to avoid data copies. */ memset(&pagereq_dbt, 0, sizeof(pagereq_dbt)); pagereq_dbt.data = rep->finfo; pagereq_dbt.size = (u_int32_t)((u_int8_t *)rep->nextinfo - (u_int8_t *)rep->finfo); RPRINT(dbenv, rep, (dbenv, &mb, "Update PAGE_REQ file 0: pgsize %lu, maxpg %lu", (u_long)rep->curinfo->pgsize, (u_long)rep->curinfo->max_pgno)); /* * We set up pagereq_dbt as we went along. Send it now. */ (void)__rep_send_message(dbenv, eid, REP_PAGE_REQ, NULL, &pagereq_dbt, 0); if (0) {errmem: __os_free(dbenv, rep->curinfo);errmem1: __os_free(dbenv, rep->originfo); rep->finfo = NULL; rep->curinfo = NULL; rep->originfo = NULL; }err: /* * If we get an error, we cannot leave ourselves in the * RECOVER_PAGE state because we have no file information. * That also means undo'ing the rep_lockout. * We need to move back to the RECOVER_UPDATE stage. */ if (ret != 0) { RPRINT(dbenv, rep, (dbenv, &mb, "Update_setup: Error: Clear PAGE, set UPDATE again. %s", db_strerror(ret))); F_CLR(rep, REP_F_RECOVER_PAGE | REP_F_READY); rep->in_recovery = 0; F_SET(rep, REP_F_RECOVER_UPDATE); } MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); return (ret);}/* * __rep_page * Process a page message. * * PUBLIC: int __rep_page __P((DB_ENV *, int, REP_CONTROL *, DBT *)); */int__rep_page(dbenv, eid, rp, rec) DB_ENV *dbenv; int eid; REP_CONTROL *rp; DBT *rec;{ DB_REP *db_rep; DBT key, data; REP *rep; __rep_fileinfo_args *msgfp; db_recno_t recno; int ret; void *next;#ifdef DIAGNOSTIC DB_MSGBUF mb;#endif ret = 0; db_rep = dbenv->rep_handle; rep = db_rep->region; MUTEX_LOCK(dbenv, db_rep->rep_mutexp); if (!F_ISSET(rep, REP_F_RECOVER_PAGE)) { MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); return (0); } if ((ret = __rep_fileinfo_read(dbenv, rec->data, &next, &msgfp)) != 0) { MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); return (ret); } RPRINT(dbenv, rep, (dbenv, &mb, "PAGE: Received page %lu from file %d", (u_long)msgfp->pgno, msgfp->filenum)); /* * Check if this page is from the file we're expecting. * This may be an old or delayed page message. */ /* * !!! * If we allow dbrename/dbremove on the master while a client * is updating, then we'd have to verify the file's uid here too. */ if (msgfp->filenum != rep->curfile) { RPRINT(dbenv, rep, (dbenv, &mb, "Msg file %d != curfile %d", msgfp->filenum, rep->curfile)); goto err; } /* * We want to create/open our dbp to the database * where we'll keep our page information. */ if ((ret = __rep_client_dbinit(dbenv, 1, REP_PG)) != 0) goto err; MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp); memset(&key, 0, sizeof(key)); memset(&data, 0, sizeof(data)); recno = (db_recno_t)(msgfp->pgno + 1); key.data = &recno; key.ulen = key.size = sizeof(db_recno_t); key.flags = DB_DBT_USERMEM; /* * If we already have this page, then we don't want to bother * rewriting it into the file. Otherwise, any other error * we want to return. */ ret = __db_put(rep->file_dbp, NULL, &key, &data, DB_NOOVERWRITE); if (ret == DB_KEYEXIST) { RPRINT(dbenv, rep, (dbenv, &mb, "PAGE: Received duplicate page %lu from file %d", (u_long)msgfp->pgno, msgfp->filenum));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -