📄 btree_insert.c
字号:
else if (cmp<0) { slot++; bte=btree_node_get_key(db, node, slot); memmove(((char *)bte)+sizeof(int_key_t)-1+keysize, bte, (sizeof(int_key_t)-1+keysize)*(count-slot)); } /* * otherwise, the current slot is the first key, which is * bigger than the new key; this is where we insert the new key */ else {shift_elements: /* shift all keys one position to the right */ memmove(((char *)bte)+sizeof(int_key_t)-1+keysize, bte, (sizeof(int_key_t)-1+keysize)*(count-slot)); } } i=slot; if (i==count) bte=btree_node_get_key(db, node, count); /* * if we're in the leaf: insert the blob, and append the blob-id * in this node; otherwise just append the entry (rid) * * if the record's size is <= sizeof(ham_offset_t), we don't allocate * a blob but store the record data directly in the offset * * in an in-memory-database, we don't use the blob management, but * allocate a single chunk of memory, and store the memory address * in rid */ if (btree_node_is_leaf(node) && record->size>sizeof(ham_offset_t)) { ham_status_t st; /* * make sure that we only call blob_replace(), if there IS a blob * to replace! otherwise call blob_allocate() */ if (overwrite) { if (!((key_get_flags(bte)&KEY_BLOB_SIZE_TINY) || (key_get_flags(bte)&KEY_BLOB_SIZE_SMALL) || (key_get_flags(bte)&KEY_BLOB_SIZE_EMPTY))) { ham_offset_t blobid=key_get_extended_rid(db, bte); /* remove the cached extended key */ if (db_get_extkey_cache(db)) (void)extkey_cache_remove(db_get_extkey_cache(db), blobid); st=blob_replace(db, key_get_ptr(bte), record->data, record->size, 0, &rid); } else st=blob_allocate(db, record->data, record->size, 0, &rid); if (st) return (st); } else { if ((st=blob_allocate(db, record->data, record->size, 0, &rid))) return (st); } } /* * if the record's size is <= sizeof(ham_offset_t), store the data * directly in the offset * * if the record's size is < sizeof(ham_offset_t), the last byte * in &rid is the size of the data. if the record is empty, we just * set the "empty"-flag. * * before, reset the key-flags */ oldflags=key_get_flags(bte); key_set_flags(bte, 0); if (btree_node_is_leaf(node)) { /* * if a blob exists, free it */ if (overwrite && record->size<=sizeof(ham_offset_t)) { if (!((oldflags&KEY_BLOB_SIZE_TINY) || (oldflags&KEY_BLOB_SIZE_SMALL) || (oldflags&KEY_BLOB_SIZE_EMPTY))) { st=blob_free(db, key_get_ptr(bte), 0); if (st) return (st); } } /* * now set the new key flags */ if (record->size==0) { key_set_flags(bte, key_get_flags(bte)|KEY_BLOB_SIZE_EMPTY); } else if (record->size<=sizeof(ham_offset_t)) { memcpy(&rid, record->data, record->size); if (record->size<sizeof(ham_offset_t)) { char *p=(char *)&rid; p[sizeof(ham_offset_t)-1]=record->size; key_set_flags(bte, key_get_flags(bte)|KEY_BLOB_SIZE_TINY); } else { key_set_flags(bte, key_get_flags(bte)|KEY_BLOB_SIZE_SMALL); } } } key_set_ptr(bte, rid); page_set_dirty(page, 1); /* * set a flag if the key is extended, and does not fit into the * btree */ if (key->size>db_get_keysize(db)) { key_set_flags(bte, key_get_flags(bte)|KEY_IS_EXTENDED); key_set_size(bte, key->size); } else { key_set_size(bte, key->size); /*key_set_key(bte, key->data, key_get_size(bte));*/ } /* * if we've overwritten a key: no need to continue, we're done */ if (overwrite) return (0); /* * we insert the extended key, if necessary */ key_set_key(bte, key->data, db_get_keysize(db)<key->size?db_get_keysize(db):key->size); /* * if we need an extended key, allocate a blob and store * the blob-id in the key */ if (key->size>db_get_keysize(db)) { ham_offset_t blobid; key_set_key(bte, key->data, db_get_keysize(db)); blobid=key_insert_extended(db, page, key); if (!blobid) return (db_get_error(db)); key_set_extended_rid(db, bte, blobid); } btree_node_set_count(node, count+1); /* * if we have a cursor: couple it to the new key */ if (cursor) { ham_assert(!(bt_cursor_get_flags(cursor)&BT_CURSOR_FLAG_UNCOUPLED), ("coupling an uncoupled cursor, but need a nil-cursor")); ham_assert(!(bt_cursor_get_flags(cursor)&BT_CURSOR_FLAG_COUPLED), ("coupling a coupled cursor, but need a nil-cursor")); bt_cursor_set_flags(cursor, bt_cursor_get_flags(cursor)|BT_CURSOR_FLAG_COUPLED); bt_cursor_set_coupled_page(cursor, page); bt_cursor_set_coupled_index(cursor, slot); page_add_cursor(page, (ham_cursor_t *)cursor); } return (0);}static ham_status_tmy_insert_split(ham_page_t *page, ham_key_t *key, ham_offset_t rid, ham_u32_t flags, insert_scratchpad_t *scratchpad){ int cmp; ham_status_t st; ham_page_t *newpage, *oldsib; int_key_t *nbte, *obte; btree_node_t *nbtp, *obtp, *sbtp; ham_size_t count, keysize; ham_db_t *db=page_get_owner(page); ham_key_t pivotkey, oldkey; ham_offset_t pivotrid; ham_u16_t pivot; keysize=db_get_keysize(db); /* * uncouple all cursors */ st=db_uncouple_all_cursors(page); if (st) return (db_set_error(db, st)); /* * allocate a new page */ newpage=db_alloc_page(db, PAGE_TYPE_B_INDEX, 0); if (!newpage) return (db_get_error(db)); /* clear the node header */ memset(page_get_payload(newpage), 0, sizeof(btree_node_t)); /* * move half of the key/rid-tuples to the new page */ nbtp=ham_page_get_btree_node(newpage); nbte=btree_node_get_key(db, nbtp, 0); obtp=ham_page_get_btree_node(page); obte=btree_node_get_key(db, obtp, 0); count=btree_node_get_count(obtp); pivot=count/2; /* * if we split a leaf, we'll insert the pivot element in the leaf * page, too. in internal nodes, we don't insert it, but propagate * it to the parent node only. */ if (btree_node_is_leaf(obtp)) { memcpy((char *)nbte, ((char *)obte)+(sizeof(int_key_t)-1+keysize)*pivot, (sizeof(int_key_t)-1+keysize)*(count-pivot)); } else { memcpy((char *)nbte, ((char *)obte)+(sizeof(int_key_t)-1+keysize)*(pivot+1), (sizeof(int_key_t)-1+keysize)*(count-pivot-1)); } /* * store the pivot element, we'll need it later to propagate it * to the parent page */ nbte=btree_node_get_key(db, obtp, pivot); oldkey.data=key_get_key(nbte); oldkey.size=key_get_size(nbte); oldkey._flags=key_get_flags(nbte); if (!util_copy_key(db, &oldkey, &pivotkey)) { (void)db_free_page(newpage, DB_MOVE_TO_FREELIST); return (db_get_error(db)); } pivotrid=page_get_self(newpage); /* * adjust the page count */ if (btree_node_is_leaf(obtp)) { btree_node_set_count(obtp, pivot); btree_node_set_count(nbtp, count-pivot); } else { btree_node_set_count(obtp, pivot); btree_node_set_count(nbtp, count-pivot-1); } /* * if we're in an internal page: fix the ptr_left of the new page * (it points to the ptr of the pivot key) */ if (!btree_node_is_leaf(obtp)) { /* * nbte still contains the pivot key */ btree_node_set_ptr_left(nbtp, key_get_ptr(nbte)); } /* * insert the new element */ cmp=key_compare_int_to_pub(page, pivot, key); if (db_get_error(db)) return (db_get_error(db)); if (cmp<=0) st=my_insert_nosplit(newpage, key, rid, scratchpad->record, scratchpad->cursor, flags|NOFLUSH); else st=my_insert_nosplit(page, key, rid, scratchpad->record, scratchpad->cursor, flags|NOFLUSH); if (st) return (st); scratchpad->cursor=0; /* don't overwrite cursor if my_insert_nosplit is called again */ /* * fix the double-linked list of pages, and mark the pages as dirty */ if (btree_node_get_right(obtp)) oldsib=db_fetch_page(db, btree_node_get_right(obtp), 0); else oldsib=0; btree_node_set_left (nbtp, page_get_self(page)); btree_node_set_right(nbtp, btree_node_get_right(obtp)); btree_node_set_right(obtp, page_get_self(newpage)); if (oldsib) { sbtp=ham_page_get_btree_node(oldsib); btree_node_set_left(sbtp, page_get_self(newpage)); page_set_dirty(oldsib, 1); } page_set_dirty(newpage, 1); page_set_dirty(page, 1); /* * propagate the pivot key to the parent page */ if (scratchpad->key.data) ham_mem_free(db, scratchpad->key.data); scratchpad->key=pivotkey; scratchpad->rid=pivotrid; return (SPLIT);}#if HAM_DEBUGvoidpp(ham_page_t *page){ ham_size_t i, j, len, count; int cmp; int_key_t *bte=0; btree_node_t *node; ham_db_t *db=page_get_owner(page); printf("page 0x%llx\n", (unsigned long long)page_get_self(page)); node=ham_page_get_btree_node(page); count=btree_node_get_count(node); for (i=0; i<count; i++) { bte=btree_node_get_key(db, node, i); printf("%03d: ", i); if (key_get_flags(bte)&KEY_IS_EXTENDED) len=db_get_keysize(page_get_owner(page))-sizeof(ham_offset_t); else len=key_get_size(bte); for (j=0; j<len; j++) /*printf("%02x ", (unsigned char)(key_get_key(bte)[j]));*/ printf("%c", key_get_key(bte)[j]); printf("(%d bytes, 0x%x flags) -> rid 0x%llx\n", key_get_size(bte), key_get_flags(bte), (unsigned long long)key_get_ptr(bte)); } /* * verify page */ for (i=0; i<count-1; i++) { cmp=key_compare_int_to_int(page, i, i+1); ham_assert(db_get_error(db)==0, ("key compare failed")); if (cmp>=0) { ham_log(("integrity check failed in page 0x%llx: item #%d " "< item #%d\n", page_get_self(page), i, i+1)); return; } }}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -