📄 log0recv.c
字号:
recv_addr->state = RECV_NOT_PROCESSED; UT_LIST_INIT(recv_addr->rec_list); HASH_INSERT(recv_addr_t, addr_hash, recv_sys->addr_hash, recv_fold(space, page_no), recv_addr); recv_sys->n_addrs++; /* fprintf(stderr, "Inserting log rec for space %lu, page %lu\n", space, page_no); */ } UT_LIST_ADD_LAST(rec_list, recv_addr->rec_list, recv); prev_field = &(recv->data); /* Store the log record body in chunks of less than UNIV_PAGE_SIZE: recv_sys->heap grows into the buffer pool, and bigger chunks could not be allocated */ while (rec_end > body) { len = rec_end - body; if (len > RECV_DATA_BLOCK_SIZE) { len = RECV_DATA_BLOCK_SIZE; } recv_data = mem_heap_alloc(recv_sys->heap, sizeof(recv_data_t) + len); *prev_field = recv_data; ut_memcpy(((byte*)recv_data) + sizeof(recv_data_t), body, len); prev_field = &(recv_data->next); body += len; } *prev_field = NULL;}/*************************************************************************Copies the log record body from recv to buf. */staticvoidrecv_data_copy_to_buf(/*==================*/ byte* buf, /* in: buffer of length at least recv->len */ recv_t* recv) /* in: log record */{ recv_data_t* recv_data; ulint part_len; ulint len; len = recv->len; recv_data = recv->data; while (len > 0) { if (len > RECV_DATA_BLOCK_SIZE) { part_len = RECV_DATA_BLOCK_SIZE; } else { part_len = len; } ut_memcpy(buf, ((byte*)recv_data) + sizeof(recv_data_t), part_len); buf += part_len; len -= part_len; recv_data = recv_data->next; }}/****************************************************************************Applies the hashed log records to the page, if the page lsn is less than thelsn of a log record. This can be called when a buffer page has just beenread in, or also for a page already in the buffer pool. */voidrecv_recover_page(/*==============*/ ibool recover_backup, /* in: TRUE if we are recovering a backup page: then we do not acquire any latches since the page was read in outside the buffer pool */ ibool just_read_in, /* in: TRUE if the i/o-handler calls this for a freshly read page */ page_t* page, /* in: buffer page */ ulint space, /* in: space id */ ulint page_no) /* in: page number */{ buf_block_t* block = NULL; recv_addr_t* recv_addr; recv_t* recv; byte* buf; dulint start_lsn; dulint end_lsn; dulint page_lsn; dulint page_newest_lsn; ibool modification_to_page; ibool success; mtr_t mtr; mutex_enter(&(recv_sys->mutex)); if (recv_sys->apply_log_recs == FALSE) { /* Log records should not be applied now */ mutex_exit(&(recv_sys->mutex)); return; } recv_addr = recv_get_fil_addr_struct(space, page_no); if ((recv_addr == NULL) || (recv_addr->state == RECV_BEING_PROCESSED) || (recv_addr->state == RECV_PROCESSED)) { mutex_exit(&(recv_sys->mutex)); return; } /* fprintf(stderr, "Recovering space %lu, page %lu\n", space, page_no); */ recv_addr->state = RECV_BEING_PROCESSED; mutex_exit(&(recv_sys->mutex)); mtr_start(&mtr); mtr_set_log_mode(&mtr, MTR_LOG_NONE); if (!recover_backup) { block = buf_block_align(page); if (just_read_in) { /* Move the ownership of the x-latch on the page to this OS thread, so that we can acquire a second x-latch on it. This is needed for the operations to the page to pass the debug checks. */ rw_lock_x_lock_move_ownership(&(block->lock)); } success = buf_page_get_known_nowait(RW_X_LATCH, page, BUF_KEEP_OLD, __FILE__, __LINE__, &mtr); ut_a(success);#ifdef UNIV_SYNC_DEBUG buf_page_dbg_add_level(page, SYNC_NO_ORDER_CHECK);#endif /* UNIV_SYNC_DEBUG */ } /* Read the newest modification lsn from the page */ page_lsn = mach_read_from_8(page + FIL_PAGE_LSN); if (!recover_backup) { /* It may be that the page has been modified in the buffer pool: read the newest modification lsn there */ page_newest_lsn = buf_frame_get_newest_modification(page); if (!ut_dulint_is_zero(page_newest_lsn)) { page_lsn = page_newest_lsn; } } else { /* In recovery from a backup we do not really use the buffer pool */ page_newest_lsn = ut_dulint_zero; } modification_to_page = FALSE; start_lsn = end_lsn = ut_dulint_zero; recv = UT_LIST_GET_FIRST(recv_addr->rec_list); while (recv) { end_lsn = recv->end_lsn; if (recv->len > RECV_DATA_BLOCK_SIZE) { /* We have to copy the record body to a separate buffer */ buf = mem_alloc(recv->len); recv_data_copy_to_buf(buf, recv); } else { buf = ((byte*)(recv->data)) + sizeof(recv_data_t); } if (recv->type == MLOG_INIT_FILE_PAGE) { page_lsn = page_newest_lsn; mach_write_to_8(page + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM, ut_dulint_zero); mach_write_to_8(page + FIL_PAGE_LSN, ut_dulint_zero); } if (ut_dulint_cmp(recv->start_lsn, page_lsn) >= 0) { if (!modification_to_page) { modification_to_page = TRUE; start_lsn = recv->start_lsn; }#ifdef UNIV_DEBUG if (log_debug_writes) { fprintf(stderr, "InnoDB: Applying log rec type %lu len %lu to space %lu page no %lu\n", (ulong) recv->type, (ulong) recv->len, (ulong) recv_addr->space, (ulong) recv_addr->page_no); }#endif /* UNIV_DEBUG */ recv_parse_or_apply_log_rec_body(recv->type, buf, buf + recv->len, page, &mtr); mach_write_to_8(page + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM, ut_dulint_add(recv->start_lsn, recv->len)); mach_write_to_8(page + FIL_PAGE_LSN, ut_dulint_add(recv->start_lsn, recv->len)); } if (recv->len > RECV_DATA_BLOCK_SIZE) { mem_free(buf); } recv = UT_LIST_GET_NEXT(rec_list, recv); } mutex_enter(&(recv_sys->mutex)); if (ut_dulint_cmp(recv_max_page_lsn, page_lsn) < 0) { recv_max_page_lsn = page_lsn; } recv_addr->state = RECV_PROCESSED; ut_a(recv_sys->n_addrs); recv_sys->n_addrs--; mutex_exit(&(recv_sys->mutex)); if (!recover_backup && modification_to_page) { ut_a(block); buf_flush_recv_note_modification(block, start_lsn, end_lsn); } /* Make sure that committing mtr does not change the modification lsn values of page */ mtr.modifications = FALSE; mtr_commit(&mtr); }/***********************************************************************Reads in pages which have hashed log records, from an area around a givenpage number. */staticulintrecv_read_in_area(/*==============*/ /* out: number of pages found */ ulint space, /* in: space */ ulint page_no)/* in: page number */{ recv_addr_t* recv_addr; ulint page_nos[RECV_READ_AHEAD_AREA]; ulint low_limit; ulint n; low_limit = page_no - (page_no % RECV_READ_AHEAD_AREA); n = 0; for (page_no = low_limit; page_no < low_limit + RECV_READ_AHEAD_AREA; page_no++) { recv_addr = recv_get_fil_addr_struct(space, page_no); if (recv_addr && !buf_page_peek(space, page_no)) { mutex_enter(&(recv_sys->mutex)); if (recv_addr->state == RECV_NOT_PROCESSED) { recv_addr->state = RECV_BEING_READ; page_nos[n] = page_no; n++; } mutex_exit(&(recv_sys->mutex)); } } buf_read_recv_pages(FALSE, space, page_nos, n); /* fprintf(stderr, "Recv pages at %lu n %lu\n", page_nos[0], n); */ return(n);} /***********************************************************************Empties the hash table of stored log records, applying them to appropriatepages. */voidrecv_apply_hashed_log_recs(/*=======================*/ ibool allow_ibuf) /* in: if TRUE, also ibuf operations are allowed during the application; if FALSE, no ibuf operations are allowed, and after the application all file pages are flushed to disk and invalidated in buffer pool: this alternative means that no new log records can be generated during the application; the caller must in this case own the log mutex */{ recv_addr_t* recv_addr; page_t* page; ulint i; ulint space; ulint page_no; ulint n_pages; ibool has_printed = FALSE; mtr_t mtr;loop: mutex_enter(&(recv_sys->mutex)); if (recv_sys->apply_batch_on) { mutex_exit(&(recv_sys->mutex)); os_thread_sleep(500000); goto loop; }#ifdef UNIV_SYNC_DEBUG ut_ad(!allow_ibuf == mutex_own(&log_sys->mutex));#endif /* UNIV_SYNC_DEBUG */ if (!allow_ibuf) { recv_no_ibuf_operations = TRUE; } recv_sys->apply_log_recs = TRUE; recv_sys->apply_batch_on = TRUE; for (i = 0; i < hash_get_n_cells(recv_sys->addr_hash); i++) { recv_addr = HASH_GET_FIRST(recv_sys->addr_hash, i); while (recv_addr) { space = recv_addr->space; page_no = recv_addr->page_no; if (recv_addr->state == RECV_NOT_PROCESSED) { if (!has_printed) { ut_print_timestamp(stderr); fputs( " InnoDB: Starting an apply batch of log records to the database...\n""InnoDB: Progress in percents: ",stderr); has_printed = TRUE; } mutex_exit(&(recv_sys->mutex)); if (buf_page_peek(space, page_no)) { mtr_start(&mtr); page = buf_page_get(space, page_no, RW_X_LATCH, &mtr);#ifdef UNIV_SYNC_DEBUG buf_page_dbg_add_level(page, SYNC_NO_ORDER_CHECK);#endif /* UNIV_SYNC_DEBUG */ recv_recover_page(FALSE, FALSE, page, space, page_no); mtr_commit(&mtr); } else { recv_read_in_area(space, page_no); } mutex_enter(&(recv_sys->mutex)); } recv_addr = HASH_GET_NEXT(addr_hash, recv_addr); } if (has_printed && (i * 100) / hash_get_n_cells(recv_sys->addr_hash) != ((i + 1) * 100) / hash_get_n_cells(recv_sys->addr_hash)) { fprintf(stderr, "%lu ", (ulong) ((i * 100) / hash_get_n_cells(recv_sys->addr_hash))); } } /* Wait until all the pages have been processed */ while (recv_sys->n_addrs != 0) { mutex_exit(&(recv_sys->mutex)); os_thread_sleep(500000); mutex_enter(&(recv_sys->mutex)); } if (has_printed) { fprintf(stderr, "\n"); } if (!allow_ibuf) { /* Flush all the file pages to disk and invalidate them in the buffer pool */ mutex_exit(&(recv_sys->mutex)); mutex_exit(&(log_sys->mutex)); n_pages = buf_flush_batch(BUF_FLUSH_LIST, ULINT_MAX, ut_dulint_max); ut_a(n_pages != ULINT_UNDEFINED); buf_flush_wait_batch_end(BUF_FLUSH_LIST); buf_pool_invalidate(); mutex_enter(&(log_sys->mutex)); mutex_enter(&(recv_sys->mutex)); recv_no_ibuf_operations = FALSE; } recv_sys->apply_log_recs = FALSE; recv_sys->apply_batch_on = FALSE; recv_sys_empty_hash(); if (has_printed) { fprintf(stderr, "InnoDB: Apply batch completed\n"); } mutex_exit(&(recv_sys->mutex));}/* This page is allocated from the buffer pool and used in the functionbelow */static page_t* recv_backup_application_page = NULL;/***********************************************************************Applies log records in the hash table to a backup. */voidrecv_apply_log_recs_for_backup(void)/*================================*/{ recv_addr_t* recv_addr; ulint n_hash_cells; byte* page; ulint actual_size; ibool success; ulint error; ulint i; recv_sys->apply_log_recs = TRUE; recv_sys->apply_batch_on = TRUE; if (recv_backup_application_page == NULL) { recv_backup_application_page = buf_frame_alloc(); } page = recv_backup_application_page; fputs("InnoDB: Starting an apply batch of log records to the database...\n""InnoDB: Progress in percents: ", stderr); n_hash_cells = hash_get_n_cells(recv_sys->addr_hash); for (i = 0; i < n_hash_cells; i++) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -