📄 tsrecv.c
字号:
mtr_t mtr; upd_node_t* node; ulint err; ulint thr_no; ulint tuple_no; printf("-------------------------------------------------\n"); printf("TEST 4. MULTITHREADED UPDATES\n"); thr_no = *((ulint*)arg); heap = mem_heap_create(512); com_endpoint = (com_endpoint_t*)heap; /* This is a dummy non-NULL value */ mutex_enter(&kernel_mutex); sess = sess_open(ut_dulint_zero, com_endpoint, (byte*)"user1", 6); trx = sess->trx; mutex_exit(&kernel_mutex);loop: /*-------------------------------------*/ ut_a(trx_start(trx, ULINT_UNDEFINED)); fork = que_fork_create(NULL, NULL, QUE_FORK_UPDATE, heap); fork->trx = trx; thr = que_thr_create(fork, fork, heap); table = dict_table_get("TS_TABLE1", trx); update = upd_create(1, heap); node = upd_node_create(fork, thr, table, &pcur, update, heap); thr->child = node; node->cmpl_info = 0; mutex_enter(&kernel_mutex); que_graph_publish(fork, trx->sess); trx->graph = fork; mutex_exit(&kernel_mutex); rnd = 87607651; entry = dtuple_create(heap, 1); oldtm = ut_clock(); thr_no = *((ulint*)arg); ut_a(DB_SUCCESS == lock_table(0, table, LOCK_IX, thr)); for (i = 0; i < 300; i++) { rnd += 874681; tuple_no = (rnd % 40) * 5 + thr_no; dtuple_gen_search_tuple3(entry, tuple_no, buf); index = dict_table_get_first_index(table); tree = dict_index_get_tree(index); btr_pcur_set_mtr(&pcur, &mtr); mtr_start(&mtr); btr_pcur_open(index, entry, PAGE_CUR_LE, BTR_SEARCH_LEAF, &pcur, &mtr); btr_pcur_store_position(&pcur, &mtr);/* printf("Thread %lu to update row %lu\n", thr_no, tuple_no); */ err = lock_clust_rec_read_check_and_lock(0, btr_pcur_get_rec(&pcur), index, LOCK_X, thr); ut_a(err == DB_SUCCESS); btr_pcur_commit(&pcur); ufield = upd_get_nth_field(update, 0); upd_field_set_col_no(ufield, 2, table); dfield_set_data(&(ufield->new_val), buf2, rnd % 3000); mutex_enter(&kernel_mutex); ut_a( thr == que_fork_start_command(fork, SESS_COMM_EXECUTE, 0)); mutex_exit(&kernel_mutex); que_run_threads(thr); } /* for (i = ... */ tm = ut_clock(); printf("Wall time for %lu updates %lu milliseconds\n", i, tm - oldtm);/* dict_table_print_by_name("TS_TABLE1"); */ printf("Validating tree\n"); btr_validate_tree(tree); printf("Validated\n"); lock_validate(); /* lock_print_info(); */ /* mem_print_info(); */ mem_pool_print_info(mem_comm_pool); /*-------------------------------------*/ /* ROLLBACK */ fork = que_fork_create(NULL, NULL, QUE_FORK_EXECUTE, heap); fork->trx = trx; thr = que_thr_create(fork, fork, heap); thr->child = roll_node_create(fork, thr, heap); mutex_enter(&kernel_mutex); que_graph_publish(fork, trx->sess); trx->graph = fork; ut_a(thr == que_fork_start_command(fork, SESS_COMM_EXECUTE, 0)); mutex_exit(&kernel_mutex); oldtm = ut_clock(); que_run_threads(thr); tm = ut_clock(); printf("Wall time for rollback of %lu updates %lu milliseconds\n", i, tm - oldtm); os_thread_sleep(2000000); btr_validate_tree(tree); ut_a(trx->conc_state != TRX_ACTIVE); ut_a(UT_LIST_GET_LEN(trx->trx_locks) == 0); count++; if (count < 2) { goto loop; } return(0);}/*********************************************************************Test for join. */ulinttest6(/*==*/ void* arg){ ulint tm, oldtm; sess_t* sess; com_endpoint_t* com_endpoint; mem_heap_t* heap; que_fork_t* fork; dict_table_t* table; que_thr_t* thr; trx_t* trx; ulint i; byte buf[100]; ulint count = 0; dtuple_t* entry; dict_index_t* index; dict_tree_t* tree; btr_pcur_t pcur; btr_pcur_t pcur2; mtr_t mtr; mtr_t mtr2; ulint rnd; ulint latch_mode; printf("-------------------------------------------------\n"); printf("TEST 6. MASSIVE EQUIJOIN\n"); heap = mem_heap_create(512); com_endpoint = (com_endpoint_t*)heap; /* This is a dummy non-NULL value */ mutex_enter(&kernel_mutex); sess = sess_open(ut_dulint_zero, com_endpoint, (byte*)"user1", 6); trx = sess->trx; mutex_exit(&kernel_mutex);loop: /*--------------*/ fork = que_fork_create(NULL, NULL, QUE_FORK_EXECUTE, heap); fork->trx = trx; thr = que_thr_create(fork, fork, heap); thr->child = commit_node_create(fork, thr, heap); /*--------------*/ ut_a(trx_start(trx, ULINT_UNDEFINED)); /* Check inserted entries */ entry = dtuple_create(heap, 1); dtuple_gen_search_tuple3(entry, 0, buf); mtr_start(&mtr); table = dict_table_get("TS_TABLE1", trx); index = dict_table_get_first_index(table); tree = dict_index_get_tree(index); oldtm = ut_clock(); btr_pcur_open(index, entry, PAGE_CUR_L, BTR_SEARCH_LEAF, &pcur, &mtr); ut_a(DB_SUCCESS == lock_table(0, table, LOCK_IS, thr)); rnd = 98651; for (i = 0; i < *((ulint*)arg); i++) { ut_a(btr_pcur_move_to_next(&pcur, &mtr)); btr_pcur_store_position(&pcur, &mtr); ut_a(DB_SUCCESS == lock_clust_rec_cons_read_check( btr_pcur_get_rec(&pcur), index)); btr_pcur_commit_specify_mtr(&pcur, &mtr); if (i % 1211 == 0) { dummy++; } rnd = 55321; dtuple_gen_search_tuple3(entry, rnd % *((ulint*)arg), buf);/* if (i == 0) { */ latch_mode = BTR_SEARCH_LEAF;/* } else { latch_mode = BTR_SEARCH_LEAF | BTR_GUESS_LATCH; } */ mtr_start(&mtr2); btr_pcur_open(index, entry, PAGE_CUR_LE, latch_mode, &pcur2, &mtr2); ut_a(DB_SUCCESS == lock_clust_rec_cons_read_check( btr_pcur_get_rec(&pcur2), index)); ut_a(0 == cmp_dtuple_rec(entry, btr_pcur_get_rec(&pcur2))); mtr_commit(&mtr2); mtr_start(&mtr); btr_pcur_restore_position(BTR_SEARCH_LEAF, &pcur, &mtr); } ut_a(!btr_pcur_move_to_next(&pcur, &mtr)); ut_a(btr_pcur_is_after_last_in_tree(&pcur, &mtr)); btr_pcur_close(&pcur); mtr_commit(&mtr); tm = ut_clock(); printf("Wall time for join of %lu rows %lu milliseconds\n", i, tm - oldtm); btr_search_index_print_info(index); /*-------------------------------------*/ /* COMMIT */ mutex_enter(&kernel_mutex); que_graph_publish(fork, trx->sess); trx->graph = fork; ut_a(thr == que_fork_start_command(fork, SESS_COMM_EXECUTE, 0)); mutex_exit(&kernel_mutex); oldtm = ut_clock(); que_run_threads(thr); tm = ut_clock();/* printf("Wall time for commit %lu milliseconds\n", tm - oldtm); */ /*-------------------------------------*/ count++;/* btr_validate_tree(tree); */ if (count < 3) { goto loop; } mem_heap_free(heap); return(0);}/*********************************************************************Test for lock wait. Requires Test 4.1 first. */ulinttest7(/*==*/ void* arg){ ulint tm, oldtm; sess_t* sess; com_endpoint_t* com_endpoint; mem_heap_t* heap; que_fork_t* fork; dict_table_t* table; que_thr_t* thr; trx_t* trx; trx_t* trx2; ulint rnd; dtuple_t* entry; dtuple_t* row; byte buf[100]; byte buf2[4000]; ulint count = 0; btr_pcur_t pcur; upd_t* update; upd_field_t* ufield; dict_tree_t* tree; dict_index_t* index; mtr_t mtr; upd_node_t* node; ulint err; ulint thr_no; ulint tuple_no; printf("-------------------------------------------------\n"); printf("TEST 7. LOCK WAIT\n"); thr_no = *((ulint*)arg); heap = mem_heap_create(512); com_endpoint = (com_endpoint_t*)heap; /* This is a dummy non-NULL value */ mutex_enter(&kernel_mutex); sess = sess_open(ut_dulint_zero, com_endpoint, (byte*)"user1", 6); trx = sess->trx; sess = sess_open(ut_dulint_zero, com_endpoint, (byte*)"user1", 6); trx2 = sess->trx; mutex_exit(&kernel_mutex); /*-------------------------------------*/ /* UPDATE by trx */ ut_a(trx_start(trx, ULINT_UNDEFINED)); ut_a(trx_start(trx2, ULINT_UNDEFINED)); fork = que_fork_create(NULL, NULL, QUE_FORK_UPDATE, heap); fork->trx = trx; thr = que_thr_create(fork, fork, heap); table = dict_table_get("TS_TABLE1", trx); update = upd_create(1, heap); node = upd_node_create(fork, thr, table, &pcur, update, heap); thr->child = node; node->cmpl_info = 0; mutex_enter(&kernel_mutex); que_graph_publish(fork, trx->sess); trx->graph = fork; mutex_exit(&kernel_mutex); rnd = 87607651; entry = dtuple_create(heap, 2); oldtm = ut_clock(); thr_no = *((ulint*)arg); ut_a(DB_SUCCESS == lock_table(0, table, LOCK_IX, thr)); rnd += 874681; tuple_no = 3; dtuple_gen_search_tuple3(entry, tuple_no, buf); index = dict_table_get_first_index(table); tree = dict_index_get_tree(index); btr_pcur_set_mtr(&pcur, &mtr); mtr_start(&mtr); btr_pcur_open(index, entry, PAGE_CUR_LE, BTR_SEARCH_LEAF, &pcur, &mtr); btr_pcur_store_position(&pcur, &mtr); err = lock_clust_rec_read_check_and_lock(0, btr_pcur_get_rec(&pcur), index, LOCK_X, thr); ut_a(err == DB_SUCCESS); btr_pcur_commit(&pcur); ufield = upd_get_nth_field(update, 0); upd_field_set_col_no(ufield, 2, table); dfield_set_data(&(ufield->new_val), buf2, rnd % 1500); mutex_enter(&kernel_mutex); ut_a( thr == que_fork_start_command(fork, SESS_COMM_EXECUTE, 0)); mutex_exit(&kernel_mutex); que_run_threads(thr); tm = ut_clock(); /* dict_table_print_by_name("TS_TABLE1"); */ printf("Validating tree\n"); btr_validate_tree(tree); printf("Validated\n"); lock_validate(); lock_print_info(); /*-------------------------------------*/ /* INSERT by trx2 */ fork = que_fork_create(NULL, NULL, QUE_FORK_INSERT, heap); fork->trx = trx2; thr = que_thr_create(fork, fork, heap); table = dict_table_get("TS_TABLE1", trx2); row = dtuple_create(heap, 3 + DATA_N_SYS_COLS); dict_table_copy_types(row, table); thr->child = ins_node_create(fork, thr, row, table, heap); mutex_enter(&kernel_mutex); que_graph_publish(fork, trx2->sess); trx2->graph = fork; mutex_exit(&kernel_mutex); rnd = 0; oldtm = ut_clock(); dtuple_gen_test_tuple3(row, 2, DTUPLE_TEST_FIXED30, buf); mutex_enter(&kernel_mutex); ut_a(thr == que_fork_start_command(fork, SESS_COMM_EXECUTE, 0)); mutex_exit(&kernel_mutex); /* Insert should be left to wait until trx releases the row lock */ que_run_threads(thr); tm = ut_clock(); lock_validate(); lock_print_info(); /*-------------------------------------*/ /* COMMIT of trx */ fork = que_fork_create(NULL, NULL, QUE_FORK_EXECUTE, heap); fork->trx = trx; thr = que_thr_create(fork, fork, heap); thr->child = commit_node_create(fork, thr, heap); mutex_enter(&kernel_mutex); que_graph_publish(fork, trx->sess); trx->graph = fork; ut_a(thr == que_fork_start_command(fork, SESS_COMM_EXECUTE, 0)); mutex_exit(&kernel_mutex); oldtm = ut_clock(); que_run_threads(thr); tm = ut_clock(); /*-------------------------------------*/ os_thread_sleep(1000000); printf( "trx2 can now continue to do the insert, after trx committed.\n"); printf("Validating tree\n"); btr_validate_tree(tree); printf("Validated\n"); lock_validate(); lock_print_info(); dict_table_print_by_name("TS_TABLE1"); return(0);}/*********************************************************************Inserts for TPC-A. */ulinttest8A(/*===*/ void* arg){ ulint tm, oldtm; sess_t* sess; com_endpoint_t* com_endpoint; mem_heap_t* heap; que_fork_t* fork; dict_table_t* table; dict_index_t* index; dict_tree_t* tree; que_thr_t* thr; trx_t* trx; ulint i; ulint rnd; dtuple_t* row; dtuple_t* entry; byte buf[100]; ulint count = 0; ins_node_t* node; btr_pcur_t pcur; mtr_t mtr; UT_NOT_USED(arg); printf("-------------------------------------------------\n"); printf("TEST 8A. 1000 INSERTS FOR TPC-A\n"); heap = mem_heap_create(512); com_endpoint = (com_endpoint_t*)heap; /* This is a dummy non-NULL value */ mutex_enter(&kernel_mutex); sess = sess_open(ut_dulint_zero, com_endpoint, (byte*)"user1", 6); trx = sess->trx; mutex_exit(&kernel_mutex);loop: ut_a(trx_start(trx, ULINT_UNDEFINED)); btr_search_print_info(); /*-------------------------------------*/ fork = que_fork_create(NULL, NULL, QUE_FORK_INSERT, heap); fork->trx = trx; thr = que_thr_create(fork, fork, heap); table = dict_table_get("TS_TABLE2", trx); row = dtuple_create(heap, 3 + DATA_N_SYS_COLS); dict_table_copy_types(row, table); node = ins_node_create(fork, thr, row, table, heap); thr->child = node; row_ins_init_sys_fields_at_sql_compile(node->row, node->table, heap); row_ins_init_sys_fields_at_sql_prepare(node->row, node->table, trx); node->init_all_sys_fields = FALSE; mutex_enter(&kernel_mutex); que_graph_publish(fork, trx->sess); trx->graph = fork; mutex_ex
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -