📄 bt_split.c
字号:
memset(&log_dbt, 0, sizeof(log_dbt)); log_dbt.data = cp->page; log_dbt.size = dbp->pgsize; if (tp == NULL) ZERO_LSN(log_lsn); opflags = F_ISSET(bc, C_RECNUM) ? SPL_NRECS : 0; if ((ret = __bam_split_log(dbp, dbc->txn, &LSN(cp->page), 0, PGNO(cp->page), &LSN(cp->page), PGNO(alloc_rp), &LSN(alloc_rp), (u_int32_t)NUM_ENT(lp), tp == NULL ? 0 : PGNO(tp), tp == NULL ? &log_lsn : &LSN(tp), PGNO_INVALID, &log_dbt, opflags)) != 0) goto err; } else LSN_NOT_LOGGED(LSN(cp->page)); /* Update the LSNs for all involved pages. */ LSN(alloc_rp) = LSN(cp->page); LSN(lp) = LSN(cp->page); LSN(rp) = LSN(cp->page); if (tp != NULL) LSN(tp) = LSN(cp->page); /* * Copy the left and right pages into place. There are two paths * through here. Either we are logging and we set the LSNs in the * logging path. However, if we are not logging, then we do not * have valid LSNs on lp or rp. The correct LSNs to use are the * ones on the page we got from __db_new or the one that was * originally on cp->page. In both cases, we save the LSN from the * real database page (not a malloc'd one) and reapply it after we * do the copy. */ save_lsn = alloc_rp->lsn; memcpy(alloc_rp, rp, LOFFSET(dbp, rp)); memcpy((u_int8_t *)alloc_rp + HOFFSET(rp), (u_int8_t *)rp + HOFFSET(rp), dbp->pgsize - HOFFSET(rp)); alloc_rp->lsn = save_lsn; save_lsn = cp->page->lsn; memcpy(cp->page, lp, LOFFSET(dbp, lp)); memcpy((u_int8_t *)cp->page + HOFFSET(lp), (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp)); cp->page->lsn = save_lsn; /* Fix up the next-page link. */ if (tp != NULL) PREV_PGNO(tp) = PGNO(rp); /* Adjust any cursors. */ if ((ret = __bam_ca_split(dbc, PGNO(cp->page), PGNO(cp->page), PGNO(rp), split, 0)) != 0) goto err; __os_free(dbp->dbenv, lp); __os_free(dbp->dbenv, rp); /* * Success -- write the real pages back to the store. As we never * acquired any sort of lock on the new page, we release it before * releasing locks on the pages that reference it. We're finished * modifying the page so it's not really necessary, but it's neater. */ if ((t_ret = __memp_fput(mpf, alloc_rp, DB_MPOOL_DIRTY)) != 0 && ret == 0) ret = t_ret; if ((t_ret = __TLPUT(dbc, rplock)) != 0 && ret == 0) ret = t_ret; if ((t_ret = __memp_fput(mpf, pp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0) ret = t_ret; if ((t_ret = __TLPUT(dbc, pp->lock)) != 0 && ret == 0) ret = t_ret; if ((t_ret = __memp_fput(mpf, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0) ret = t_ret; if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0) ret = t_ret; if (tp != NULL) { if ((t_ret = __memp_fput(mpf, tp, DB_MPOOL_DIRTY)) != 0 && ret == 0) ret = t_ret; if ((t_ret = __TLPUT(dbc, tplock)) != 0 && ret == 0) ret = t_ret; } return (ret);err: if (lp != NULL) __os_free(dbp->dbenv, lp); if (rp != NULL) __os_free(dbp->dbenv, rp); if (alloc_rp != NULL) (void)__memp_fput(mpf, alloc_rp, 0); if (tp != NULL) (void)__memp_fput(mpf, tp, 0); /* We never updated the new or next pages, we can release them. */ (void)__LPUT(dbc, rplock); (void)__LPUT(dbc, tplock); (void)__memp_fput(mpf, pp->page, 0); if (ret == DB_NEEDSPLIT) (void)__LPUT(dbc, pp->lock); else (void)__TLPUT(dbc, pp->lock); (void)__memp_fput(mpf, cp->page, 0); if (ret == DB_NEEDSPLIT) (void)__LPUT(dbc, cp->lock); else (void)__TLPUT(dbc, cp->lock); return (ret);}/* * __bam_broot -- * Fix up the btree root page after it has been split. */static int__bam_broot(dbc, rootp, lp, rp) DBC *dbc; PAGE *rootp, *lp, *rp;{ BINTERNAL bi, *child_bi; BKEYDATA *child_bk; BTREE_CURSOR *cp; DB *dbp; DBT hdr, data; db_pgno_t root_pgno; int ret; dbp = dbc->dbp; cp = (BTREE_CURSOR *)dbc->internal; /* * If the root page was a leaf page, change it into an internal page. * We copy the key we split on (but not the key's data, in the case of * a leaf page) to the new root page. */ root_pgno = cp->root; P_INIT(rootp, dbp->pgsize, root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE); memset(&data, 0, sizeof(data)); memset(&hdr, 0, sizeof(hdr)); /* * The btree comparison code guarantees that the left-most key on any * internal btree page is never used, so it doesn't need to be filled * in. Set the record count if necessary. */ memset(&bi, 0, sizeof(bi)); bi.len = 0; B_TSET(bi.type, B_KEYDATA, 0); bi.pgno = lp->pgno; if (F_ISSET(cp, C_RECNUM)) { bi.nrecs = __bam_total(dbp, lp); RE_NREC_SET(rootp, bi.nrecs); } hdr.data = &bi; hdr.size = SSZA(BINTERNAL, data); if ((ret = __db_pitem(dbc, rootp, 0, BINTERNAL_SIZE(0), &hdr, NULL)) != 0) return (ret); switch (TYPE(rp)) { case P_IBTREE: /* Copy the first key of the child page onto the root page. */ child_bi = GET_BINTERNAL(dbp, rp, 0); bi.len = child_bi->len; B_TSET(bi.type, child_bi->type, 0); bi.pgno = rp->pgno; if (F_ISSET(cp, C_RECNUM)) { bi.nrecs = __bam_total(dbp, rp); RE_NREC_ADJ(rootp, bi.nrecs); } hdr.data = &bi; hdr.size = SSZA(BINTERNAL, data); data.data = child_bi->data; data.size = child_bi->len; if ((ret = __db_pitem(dbc, rootp, 1, BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0) return (ret); /* Increment the overflow ref count. */ if (B_TYPE(child_bi->type) == B_OVERFLOW) if ((ret = __db_ovref(dbc, ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0) return (ret); break; case P_LDUP: case P_LBTREE: /* Copy the first key of the child page onto the root page. */ child_bk = GET_BKEYDATA(dbp, rp, 0); switch (B_TYPE(child_bk->type)) { case B_KEYDATA: bi.len = child_bk->len; B_TSET(bi.type, child_bk->type, 0); bi.pgno = rp->pgno; if (F_ISSET(cp, C_RECNUM)) { bi.nrecs = __bam_total(dbp, rp); RE_NREC_ADJ(rootp, bi.nrecs); } hdr.data = &bi; hdr.size = SSZA(BINTERNAL, data); data.data = child_bk->data; data.size = child_bk->len; if ((ret = __db_pitem(dbc, rootp, 1, BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0) return (ret); break; case B_DUPLICATE: case B_OVERFLOW: bi.len = BOVERFLOW_SIZE; B_TSET(bi.type, child_bk->type, 0); bi.pgno = rp->pgno; if (F_ISSET(cp, C_RECNUM)) { bi.nrecs = __bam_total(dbp, rp); RE_NREC_ADJ(rootp, bi.nrecs); } hdr.data = &bi; hdr.size = SSZA(BINTERNAL, data); data.data = child_bk; data.size = BOVERFLOW_SIZE; if ((ret = __db_pitem(dbc, rootp, 1, BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0) return (ret); /* Increment the overflow ref count. */ if (B_TYPE(child_bk->type) == B_OVERFLOW) if ((ret = __db_ovref(dbc, ((BOVERFLOW *)child_bk)->pgno, 1)) != 0) return (ret); break; default: return (__db_pgfmt(dbp->dbenv, rp->pgno)); } break; default: return (__db_pgfmt(dbp->dbenv, rp->pgno)); } return (0);}/* * __ram_root -- * Fix up the recno root page after it has been split. */static int__ram_root(dbc, rootp, lp, rp) DBC *dbc; PAGE *rootp, *lp, *rp;{ DB *dbp; DBT hdr; RINTERNAL ri; db_pgno_t root_pgno; int ret; dbp = dbc->dbp; root_pgno = dbc->internal->root; /* Initialize the page. */ P_INIT(rootp, dbp->pgsize, root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO); /* Initialize the header. */ memset(&hdr, 0, sizeof(hdr)); hdr.data = &ri; hdr.size = RINTERNAL_SIZE; /* Insert the left and right keys, set the header information. */ ri.pgno = lp->pgno; ri.nrecs = __bam_total(dbp, lp); if ((ret = __db_pitem(dbc, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0) return (ret); RE_NREC_SET(rootp, ri.nrecs); ri.pgno = rp->pgno; ri.nrecs = __bam_total(dbp, rp); if ((ret = __db_pitem(dbc, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0) return (ret); RE_NREC_ADJ(rootp, ri.nrecs); return (0);}/* * __bam_pinsert -- * Insert a new key into a parent page, completing the split. */static int__bam_pinsert(dbc, parent, lchild, rchild, space_check) DBC *dbc; EPG *parent; PAGE *lchild, *rchild; int space_check;{ BINTERNAL bi, *child_bi; BKEYDATA *child_bk, *tmp_bk; BTREE *t; BTREE_CURSOR *cp; DB *dbp; DBT a, b, hdr, data; PAGE *ppage; RINTERNAL ri; db_indx_t off; db_recno_t nrecs; size_t (*func) __P((DB *, const DBT *, const DBT *)); u_int32_t n, nbytes, nksize; int ret; dbp = dbc->dbp; cp = (BTREE_CURSOR *)dbc->internal; t = dbp->bt_internal; ppage = parent->page; /* If handling record numbers, count records split to the right page. */ nrecs = F_ISSET(cp, C_RECNUM) && !space_check ? __bam_total(dbp, rchild) : 0; /* * Now we insert the new page's first key into the parent page, which * completes the split. The parent points to a PAGE and a page index * offset, where the new key goes ONE AFTER the index, because we split * to the right. * * XXX * Some btree algorithms replace the key for the old page as well as * the new page. We don't, as there's no reason to believe that the * first key on the old page is any better than the key we have, and, * in the case of a key being placed at index 0 causing the split, the * key is unavailable. */ off = parent->indx + O_INDX; /* * Calculate the space needed on the parent page. * * Prefix trees: space hack used when inserting into BINTERNAL pages. * Retain only what's needed to distinguish between the new entry and * the LAST entry on the page to its left. If the keys compare equal, * retain the entire key. We ignore overflow keys, and the entire key * must be retained for the next-to-leftmost key on the leftmost page * of each level, or the search will fail. Applicable ONLY to internal * pages that have leaf pages as children. Further reduction of the * key between pairs of internal pages loses too much information. */ switch (TYPE(rchild)) { case P_IBTREE: child_bi = GET_BINTERNAL(dbp, rchild, 0); nbytes = BINTERNAL_PSIZE(child_bi->len); if (P_FREESPACE(dbp, ppage) < nbytes) return (DB_NEEDSPLIT); if (space_check) return (0); /* Add a new record for the right page. */ memset(&bi, 0, sizeof(bi)); bi.len = child_bi->len; B_TSET(bi.type, child_bi->type, 0); bi.pgno = rchild->pgno; bi.nrecs = nrecs; memset(&hdr, 0, sizeof(hdr)); hdr.data = &bi; hdr.size = SSZA(BINTERNAL, data); memset(&data, 0, sizeof(data)); data.data = child_bi->data; data.size = child_bi->len; if ((ret = __db_pitem(dbc, ppage, off, BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0) return (ret); /* Increment the overflow ref count. */ if (B_TYPE(child_bi->type) == B_OVERFLOW) if ((ret = __db_ovref(dbc, ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0) return (ret); break; case P_LDUP: case P_LBTREE: child_bk = GET_BKEYDATA(dbp, rchild, 0); switch (B_TYPE(child_bk->type)) { case B_KEYDATA: /* * We set t->bt_prefix to NULL if we have a comparison * callback but no prefix compression callback. But, * if we're splitting in an off-page duplicates tree, * we still have to do some checking. If using the * default off-page duplicates comparison routine we * can use the default prefix compression callback. If * not using the default off-page duplicates comparison * routine, we can't do any kind of prefix compression * as there's no way for an application to specify a
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -