📄 btr0btr.c
字号:
/* Ok, there will be enough available space on the half page where the tuple is inserted */ return(TRUE); } rec = page_rec_get_next(rec); } return(FALSE);} /***********************************************************Inserts a data tuple to a tree on a non-leaf level. It is assumedthat mtr holds an x-latch on the tree. */voidbtr_insert_on_non_leaf_level(/*=========================*/ dict_tree_t* tree, /* in: tree */ ulint level, /* in: level, must be > 0 */ dtuple_t* tuple, /* in: the record to be inserted */ mtr_t* mtr) /* in: mtr */{ big_rec_t* dummy_big_rec; btr_cur_t cursor; ulint err; rec_t* rec; ut_ad(level > 0); /* In the following, choose just any index from the tree as the first parameter for btr_cur_search_to_nth_level. */ btr_cur_search_to_nth_level(UT_LIST_GET_FIRST(tree->tree_indexes), level, tuple, PAGE_CUR_LE, BTR_CONT_MODIFY_TREE, &cursor, 0, mtr); err = btr_cur_pessimistic_insert(BTR_NO_LOCKING_FLAG | BTR_KEEP_SYS_FLAG | BTR_NO_UNDO_LOG_FLAG, &cursor, tuple, &rec, &dummy_big_rec, NULL, mtr); ut_a(err == DB_SUCCESS);}/******************************************************************Attaches the halves of an index page on the appropriate level in anindex tree. */staticvoidbtr_attach_half_pages(/*==================*/ dict_tree_t* tree, /* in: the index tree */ page_t* page, /* in: page to be split */ rec_t* split_rec, /* in: first record on upper half page */ page_t* new_page, /* in: the new half page */ ulint direction, /* in: FSP_UP or FSP_DOWN */ mtr_t* mtr) /* in: mtr */{ ulint space; rec_t* node_ptr; page_t* prev_page; page_t* next_page; ulint prev_page_no; ulint next_page_no; ulint level; page_t* lower_page; page_t* upper_page; ulint lower_page_no; ulint upper_page_no; dtuple_t* node_ptr_upper; mem_heap_t* heap; ut_ad(mtr_memo_contains(mtr, buf_block_align(page), MTR_MEMO_PAGE_X_FIX)); ut_ad(mtr_memo_contains(mtr, buf_block_align(new_page), MTR_MEMO_PAGE_X_FIX)); ut_a(page_is_comp(page) == page_is_comp(new_page)); /* Create a memory heap where the data tuple is stored */ heap = mem_heap_create(1024); /* Based on split direction, decide upper and lower pages */ if (direction == FSP_DOWN) { lower_page_no = buf_frame_get_page_no(new_page); upper_page_no = buf_frame_get_page_no(page); lower_page = new_page; upper_page = page; /* Look from the tree for the node pointer to page */ node_ptr = btr_page_get_father_node_ptr(tree, page, mtr); /* Replace the address of the old child node (= page) with the address of the new lower half */ btr_node_ptr_set_child_page_no(node_ptr, rec_get_offsets(node_ptr, UT_LIST_GET_FIRST(tree->tree_indexes), NULL, ULINT_UNDEFINED, &heap), lower_page_no, mtr); mem_heap_empty(heap); } else { lower_page_no = buf_frame_get_page_no(page); upper_page_no = buf_frame_get_page_no(new_page); lower_page = page; upper_page = new_page; } /* Get the level of the split pages */ level = btr_page_get_level(page, mtr); /* Build the node pointer (= node key and page address) for the upper half */ node_ptr_upper = dict_tree_build_node_ptr(tree, split_rec, upper_page_no, heap, level); /* Insert it next to the pointer to the lower half. Note that this may generate recursion leading to a split on the higher level. */ btr_insert_on_non_leaf_level(tree, level + 1, node_ptr_upper, mtr); /* Free the memory heap */ mem_heap_free(heap); /* Get the previous and next pages of page */ prev_page_no = btr_page_get_prev(page, mtr); next_page_no = btr_page_get_next(page, mtr); space = buf_frame_get_space_id(page); /* Update page links of the level */ if (prev_page_no != FIL_NULL) { prev_page = btr_page_get(space, prev_page_no, RW_X_LATCH, mtr); ut_a(page_is_comp(prev_page) == page_is_comp(page)); btr_page_set_next(prev_page, lower_page_no, mtr); } if (next_page_no != FIL_NULL) { next_page = btr_page_get(space, next_page_no, RW_X_LATCH, mtr); ut_a(page_is_comp(next_page) == page_is_comp(page)); btr_page_set_prev(next_page, upper_page_no, mtr); } btr_page_set_prev(lower_page, prev_page_no, mtr); btr_page_set_next(lower_page, upper_page_no, mtr); btr_page_set_level(lower_page, level, mtr); btr_page_set_prev(upper_page, lower_page_no, mtr); btr_page_set_next(upper_page, next_page_no, mtr); btr_page_set_level(upper_page, level, mtr);}/*****************************************************************Splits an index page to halves and inserts the tuple. It is assumedthat mtr holds an x-latch to the index tree. NOTE: the tree x-latchis released within this function! NOTE that the operation of thisfunction must always succeed, we cannot reverse it: thereforeenough free disk space must be guaranteed to be available beforethis function is called. */rec_t*btr_page_split_and_insert(/*======================*/ /* out: inserted record; NOTE: the tree x-latch is released! NOTE: 2 free disk pages must be available! */ btr_cur_t* cursor, /* in: cursor at which to insert; when the function returns, the cursor is positioned on the predecessor of the inserted record */ dtuple_t* tuple, /* in: tuple to insert */ mtr_t* mtr) /* in: mtr */{ dict_tree_t* tree; page_t* page; ulint page_no; byte direction; ulint hint_page_no; page_t* new_page; rec_t* split_rec; page_t* left_page; page_t* right_page; page_t* insert_page; page_cur_t* page_cursor; rec_t* first_rec; byte* buf = 0; /* remove warning */ rec_t* move_limit; ibool insert_will_fit; ulint n_iterations = 0; rec_t* rec; mem_heap_t* heap; ulint n_uniq; ulint* offsets; heap = mem_heap_create(1024); n_uniq = dict_index_get_n_unique_in_tree(cursor->index);func_start: mem_heap_empty(heap); offsets = NULL; tree = btr_cur_get_tree(cursor); ut_ad(mtr_memo_contains(mtr, dict_tree_get_lock(tree), MTR_MEMO_X_LOCK));#ifdef UNIV_SYNC_DEBUG ut_ad(rw_lock_own(dict_tree_get_lock(tree), RW_LOCK_EX));#endif /* UNIV_SYNC_DEBUG */ page = btr_cur_get_page(cursor); ut_ad(mtr_memo_contains(mtr, buf_block_align(page), MTR_MEMO_PAGE_X_FIX)); ut_ad(page_get_n_recs(page) >= 2); page_no = buf_frame_get_page_no(page); /* 1. Decide the split record; split_rec == NULL means that the tuple to be inserted should be the first record on the upper half-page */ if (n_iterations > 0) { direction = FSP_UP; hint_page_no = page_no + 1; split_rec = btr_page_get_sure_split_rec(cursor, tuple); } else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) { direction = FSP_UP; hint_page_no = page_no + 1; } else if (btr_page_get_split_rec_to_left(cursor, &split_rec)) { direction = FSP_DOWN; hint_page_no = page_no - 1; } else { direction = FSP_UP; hint_page_no = page_no + 1; split_rec = page_get_middle_rec(page); } /* 2. Allocate a new page to the tree */ new_page = btr_page_alloc(tree, hint_page_no, direction, btr_page_get_level(page, mtr), mtr); btr_page_create(new_page, tree, mtr); /* 3. Calculate the first record on the upper half-page, and the first record (move_limit) on original page which ends up on the upper half */ if (split_rec != NULL) { first_rec = split_rec; move_limit = split_rec; } else { buf = mem_alloc(rec_get_converted_size(cursor->index, tuple)); first_rec = rec_convert_dtuple_to_rec(buf, cursor->index, tuple); move_limit = page_rec_get_next(btr_cur_get_rec(cursor)); } /* 4. Do first the modifications in the tree structure */ btr_attach_half_pages(tree, page, first_rec, new_page, direction, mtr); if (split_rec == NULL) { mem_free(buf); } /* If the split is made on the leaf level and the insert will fit on the appropriate half-page, we may release the tree x-latch. We can then move the records after releasing the tree latch, thus reducing the tree latch contention. */ if (split_rec) { offsets = rec_get_offsets(split_rec, cursor->index, offsets, n_uniq, &heap); insert_will_fit = btr_page_insert_fits(cursor, split_rec, offsets, tuple, heap); } else { insert_will_fit = btr_page_insert_fits(cursor, NULL, NULL, tuple, heap); } if (insert_will_fit && (btr_page_get_level(page, mtr) == 0)) { mtr_memo_release(mtr, dict_tree_get_lock(tree), MTR_MEMO_X_LOCK); } /* 5. Move then the records to the new page */ if (direction == FSP_DOWN) {/* fputs("Split left\n", stderr); */ page_move_rec_list_start(new_page, page, move_limit, cursor->index, mtr); left_page = new_page; right_page = page; lock_update_split_left(right_page, left_page); } else {/* fputs("Split right\n", stderr); */ page_move_rec_list_end(new_page, page, move_limit, cursor->index, mtr); left_page = page; right_page = new_page; lock_update_split_right(right_page, left_page); } /* 6. The split and the tree modification is now completed. Decide the page where the tuple should be inserted */ if (split_rec == NULL) { insert_page = right_page; } else { offsets = rec_get_offsets(first_rec, cursor->index, offsets, n_uniq, &heap); if (cmp_dtuple_rec(tuple, first_rec, offsets) >= 0) { insert_page = right_page; } else { insert_page = left_page; } } /* 7. Reposition the cursor for insert and try insertion */ page_cursor = btr_cur_get_page_cur(cursor); page_cur_search(insert_page, cursor->index, tuple, PAGE_CUR_LE, page_cursor); rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index, mtr); if (rec != NULL) { /* Insert fit on the page: update the free bits for the left and right pages in the same mtr */ ibuf_update_free_bits_for_two_pages_low(cursor->index, left_page, right_page, mtr); /* fprintf(stderr, "Split and insert done %lu %lu\n", buf_frame_get_page_no(left_page), buf_frame_get_page_no(right_page)); */ mem_heap_free(heap); return(rec); } /* 8. If insert did not fit, try page reorganization */ btr_page_reorganize(insert_page, cursor->index, mtr); page_cur_search(insert_page, cursor->index, tuple, PAGE_CUR_LE, page_cursor); rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index, mtr); if (rec == NULL) { /* The insert did not fit on the page: loop back to the start of the function for a new split */ /* We play safe and reset the free bits for new_page */ ibuf_reset_free_bits(cursor->index, new_page); /* fprintf(stderr, "Split second round %lu\n", buf_frame_get_page_no(page)); */ n_iterations++; ut_ad(n_iterations < 2); ut_ad(!insert_will_fit); goto func_start; } /* Insert fit on the page: update the free bits for the left and right pages in the same mtr */ ibuf_update_free_bits_for_two_pages_low(cursor->index, left_page, right_page, mtr); /* fprintf(stderr, "Split and insert done %lu %lu\n", buf_frame_get_page_no(left_page), buf_frame_get_page_no(right_page)); */ ut_ad(page_validate(left_page, UT_LIST_GET_FIRST(tree->tree_indexes))); ut_ad(page_validate(right_page, UT_LIST_GET_FIRST(tree->tree_indexes))); mem_heap_free(heap); return(rec);}/*****************************************************************Removes a page from the level list of pages. */staticvoidbtr_level_list_remove(/*==================*/ dict_tree_t* tree __attribute__((unused)), /* in: index tree */ page_t* page, /* in: page to remove */ mtr_t* mtr) /* in: mtr */{ ulint space; ulint prev_page_no; page_t* prev_page; ulint next_page_no; page_t* next_page; ut_ad(tree && page && mtr); ut_ad(mtr_memo_contains(mtr, buf_block_align(page), MTR_MEMO_PAGE_X_FIX)); /* Get the previous and next page numbers of page */ prev_page_no = btr_page_get_prev(page, mtr); next_page_no = btr_page_get_next(page, mtr); space = buf_frame_get_space_id(page); /* Update page links of the level */ if (prev_page_no != FIL_NULL) { prev_page = btr_page_get(space, prev_page_no, RW_X_LATCH, mtr); ut_a(page_is_comp(prev_page) == page_is_comp(page)); btr_page_set_next(prev_page, next_page_no, mtr); } if (next_page_no != FIL_NULL) { next_page = btr_page_get(space, next_page_no, RW_X_LATCH, mtr); ut_a(page_is_comp(next_page) == page_is_comp(page)); btr_page_set_prev(next_page, prev_page_no, mtr); }} /********************************************************************Writes the redo log record for setting an index record as the predefinedminimum record. */UNIV_INLINEvoidbtr_set_min_rec_mark_log(/*=====================*/ rec_t* rec, /* in: record */ ulint comp, /* nonzero=compact record format */ mtr_t* mtr) /* in: mtr */{ mlog_write_initial_log_record(rec, comp ? MLOG_COMP_REC_MIN_MARK : MLOG_REC_MIN_MARK, mtr); /* Write rec offset as a 2-byte ulint */ mlog_catenate_ulint(mtr, ut_align_offset(rec, UNIV_PAGE_SIZE), MLOG_2BYTES);}/********************************************************************
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -