📄 trx0trx.c
字号:
trx_commit_off_kernel(trx); ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0); /* Remove all TRX_SIG_COMMIT signals from the signal queue and send reply messages to them */ sig = UT_LIST_GET_FIRST(trx->signals); while (sig != NULL) { next_sig = UT_LIST_GET_NEXT(signals, sig); if (sig->type == TRX_SIG_COMMIT) { trx_sig_reply(sig, next_thr); trx_sig_remove(trx, sig); } sig = next_sig; } trx->que_state = TRX_QUE_RUNNING;}/***************************************************************The transaction must be in the TRX_QUE_LOCK_WAIT state. Puts it tothe TRX_QUE_RUNNING state and releases query threads which werewaiting for a lock in the wait_thrs list. */voidtrx_end_lock_wait(/*==============*/ trx_t* trx) /* in: transaction */{ que_thr_t* thr;#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex));#endif /* UNIV_SYNC_DEBUG */ ut_ad(trx->que_state == TRX_QUE_LOCK_WAIT); thr = UT_LIST_GET_FIRST(trx->wait_thrs); while (thr != NULL) { que_thr_end_wait_no_next_thr(thr); UT_LIST_REMOVE(trx_thrs, trx->wait_thrs, thr); thr = UT_LIST_GET_FIRST(trx->wait_thrs); } trx->que_state = TRX_QUE_RUNNING;}/***************************************************************Moves the query threads in the lock wait list to the SUSPENDED state and putsthe transaction to the TRX_QUE_RUNNING state. */staticvoidtrx_lock_wait_to_suspended(/*=======================*/ trx_t* trx) /* in: transaction in the TRX_QUE_LOCK_WAIT state */{ que_thr_t* thr;#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex));#endif /* UNIV_SYNC_DEBUG */ ut_ad(trx->que_state == TRX_QUE_LOCK_WAIT); thr = UT_LIST_GET_FIRST(trx->wait_thrs); while (thr != NULL) { thr->state = QUE_THR_SUSPENDED; UT_LIST_REMOVE(trx_thrs, trx->wait_thrs, thr); thr = UT_LIST_GET_FIRST(trx->wait_thrs); } trx->que_state = TRX_QUE_RUNNING;}/***************************************************************Moves the query threads in the sig reply wait list of trx to the SUSPENDEDstate. */staticvoidtrx_sig_reply_wait_to_suspended(/*============================*/ trx_t* trx) /* in: transaction */{ trx_sig_t* sig; que_thr_t* thr;#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex));#endif /* UNIV_SYNC_DEBUG */ sig = UT_LIST_GET_FIRST(trx->reply_signals); while (sig != NULL) { thr = sig->receiver; ut_ad(thr->state == QUE_THR_SIG_REPLY_WAIT); thr->state = QUE_THR_SUSPENDED; sig->receiver = NULL; UT_LIST_REMOVE(reply_signals, trx->reply_signals, sig); sig = UT_LIST_GET_FIRST(trx->reply_signals); }}/*********************************************************************Checks the compatibility of a new signal with the other signals in thequeue. */staticibooltrx_sig_is_compatible(/*==================*/ /* out: TRUE if the signal can be queued */ trx_t* trx, /* in: trx handle */ ulint type, /* in: signal type */ ulint sender) /* in: TRX_SIG_SELF or TRX_SIG_OTHER_SESS */{ trx_sig_t* sig;#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex));#endif /* UNIV_SYNC_DEBUG */ if (UT_LIST_GET_LEN(trx->signals) == 0) { return(TRUE); } if (sender == TRX_SIG_SELF) { if (type == TRX_SIG_ERROR_OCCURRED) { return(TRUE); } else if (type == TRX_SIG_BREAK_EXECUTION) { return(TRUE); } else { return(FALSE); } } ut_ad(sender == TRX_SIG_OTHER_SESS); sig = UT_LIST_GET_FIRST(trx->signals); if (type == TRX_SIG_COMMIT) { while (sig != NULL) { if (sig->type == TRX_SIG_TOTAL_ROLLBACK) { return(FALSE); } sig = UT_LIST_GET_NEXT(signals, sig); } return(TRUE); } else if (type == TRX_SIG_TOTAL_ROLLBACK) { while (sig != NULL) { if (sig->type == TRX_SIG_COMMIT) { return(FALSE); } sig = UT_LIST_GET_NEXT(signals, sig); } return(TRUE); } else if (type == TRX_SIG_BREAK_EXECUTION) { return(TRUE); } else { ut_error; return(FALSE); }}/********************************************************************Sends a signal to a trx object. */ibooltrx_sig_send(/*=========*/ /* out: TRUE if the signal was successfully delivered */ trx_t* trx, /* in: trx handle */ ulint type, /* in: signal type */ ulint sender, /* in: TRX_SIG_SELF or TRX_SIG_OTHER_SESS */ que_thr_t* receiver_thr, /* in: query thread which wants the reply, or NULL; if type is TRX_SIG_END_WAIT, this must be NULL */ trx_savept_t* savept, /* in: possible rollback savepoint, or NULL */ que_thr_t** next_thr) /* in/out: next query thread to run; if the value which is passed in is a pointer to a NULL pointer, then the calling function can start running a new query thread; if the parameter is NULL, it is ignored */{ trx_sig_t* sig; trx_t* receiver_trx; ut_ad(trx);#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex));#endif /* UNIV_SYNC_DEBUG */ if (!trx_sig_is_compatible(trx, type, sender)) { /* The signal is not compatible with the other signals in the queue: do nothing */ ut_error; return(FALSE); } /* Queue the signal object */ if (UT_LIST_GET_LEN(trx->signals) == 0) { /* The signal list is empty: the 'sig' slot must be unused (we improve performance a bit by avoiding mem_alloc) */ sig = &(trx->sig); } else { /* It might be that the 'sig' slot is unused also in this case, but we choose the easy way of using mem_alloc */ sig = mem_alloc(sizeof(trx_sig_t)); } UT_LIST_ADD_LAST(signals, trx->signals, sig); sig->type = type; sig->state = TRX_SIG_WAITING; sig->sender = sender; sig->receiver = receiver_thr; if (savept) { sig->savept = *savept; } if (receiver_thr) { receiver_trx = thr_get_trx(receiver_thr); UT_LIST_ADD_LAST(reply_signals, receiver_trx->reply_signals, sig); } if (trx->sess->state == SESS_ERROR) { trx_sig_reply_wait_to_suspended(trx); } if ((sender != TRX_SIG_SELF) || (type == TRX_SIG_BREAK_EXECUTION)) { /* The following call will add a TRX_SIG_ERROR_OCCURRED signal to the end of the queue, if the session is not yet in the error state: */ ut_error; } /* If there were no other signals ahead in the queue, try to start handling of the signal */ if (UT_LIST_GET_FIRST(trx->signals) == sig) { trx_sig_start_handle(trx, next_thr); } return(TRUE);}/********************************************************************Ends signal handling. If the session is in the error state, andtrx->graph_before_signal_handling != NULL, then returns control to the errorhandling routine of the graph (currently just returns the control to thegraph root which then will send an error message to the client). */voidtrx_end_signal_handling(/*====================*/ trx_t* trx) /* in: trx */{#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex));#endif /* UNIV_SYNC_DEBUG */ ut_ad(trx->handling_signals == TRUE); trx->handling_signals = FALSE; trx->graph = trx->graph_before_signal_handling; if (trx->graph && (trx->sess->state == SESS_ERROR)) { que_fork_error_handle(trx, trx->graph); }}/********************************************************************Starts handling of a trx signal. */voidtrx_sig_start_handle(/*=================*/ trx_t* trx, /* in: trx handle */ que_thr_t** next_thr) /* in/out: next query thread to run; if the value which is passed in is a pointer to a NULL pointer, then the calling function can start running a new query thread; if the parameter is NULL, it is ignored */{ trx_sig_t* sig; ulint type;loop: /* We loop in this function body as long as there are queued signals we can process immediately */ ut_ad(trx);#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex));#endif /* UNIV_SYNC_DEBUG */ if (trx->handling_signals && (UT_LIST_GET_LEN(trx->signals) == 0)) { trx_end_signal_handling(trx); return; } if (trx->conc_state == TRX_NOT_STARTED) { trx_start_low(trx, ULINT_UNDEFINED); } /* If the trx is in a lock wait state, moves the waiting query threads to the suspended state */ if (trx->que_state == TRX_QUE_LOCK_WAIT) { trx_lock_wait_to_suspended(trx); } /* If the session is in the error state and this trx has threads waiting for reply from signals, moves these threads to the suspended state, canceling wait reservations; note that if the transaction has sent a commit or rollback signal to itself, and its session is not in the error state, then nothing is done here. */ if (trx->sess->state == SESS_ERROR) { trx_sig_reply_wait_to_suspended(trx); } /* If there are no running query threads, we can start processing of a signal, otherwise we have to wait until all query threads of this transaction are aware of the arrival of the signal. */ if (trx->n_active_thrs > 0) { return; } if (trx->handling_signals == FALSE) { trx->graph_before_signal_handling = trx->graph; trx->handling_signals = TRUE; } sig = UT_LIST_GET_FIRST(trx->signals); type = sig->type; if (type == TRX_SIG_COMMIT) { trx_handle_commit_sig_off_kernel(trx, next_thr); } else if ((type == TRX_SIG_TOTAL_ROLLBACK) || (type == TRX_SIG_ROLLBACK_TO_SAVEPT)) { trx_rollback(trx, sig, next_thr); /* No further signals can be handled until the rollback completes, therefore we return */ return; } else if (type == TRX_SIG_ERROR_OCCURRED) { trx_rollback(trx, sig, next_thr); /* No further signals can be handled until the rollback completes, therefore we return */ return; } else if (type == TRX_SIG_BREAK_EXECUTION) { trx_sig_reply(sig, next_thr); trx_sig_remove(trx, sig); } else { ut_error; } goto loop;} /********************************************************************Send the reply message when a signal in the queue of the trx has beenhandled. */voidtrx_sig_reply(/*==========*/ trx_sig_t* sig, /* in: signal */ que_thr_t** next_thr) /* in/out: next query thread to run; if the value which is passed in is a pointer to a NULL pointer, then the calling function can start running a new query thread */{ trx_t* receiver_trx; ut_ad(sig);#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex));#endif /* UNIV_SYNC_DEBUG */ if (sig->receiver != NULL) { ut_ad((sig->receiver)->state == QUE_THR_SIG_REPLY_WAIT); receiver_trx = thr_get_trx(sig->receiver); UT_LIST_REMOVE(reply_signals, receiver_trx->reply_signals, sig); ut_ad(receiver_trx->sess->state != SESS_ERROR); que_thr_end_wait(sig->receiver, next_thr); sig->receiver = NULL; }}/********************************************************************Removes a signal object from the trx signal queue. */voidtrx_sig_remove(/*===========*/ trx_t* trx, /* in: trx handle */ trx_sig_t* sig) /* in, own: signal */{ ut_ad(trx && sig);#ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex));#endif /* UNIV_SYNC_DEBUG */ ut_ad(sig->receiver == NULL); UT_LIST_REMOVE(signals, trx->signals, sig); sig->type = 0; /* reset the field to catch possible bugs */ if (sig != &(trx->sig)) { mem_free(sig); }}/*************************************************************************Creates a commit command node struct. */commit_node_t*commit_node_create(/*===============*/ /* out, own: commit node struct */ mem_heap_t* heap) /* in: mem heap where created */{ commit_node_t* node; node = mem_heap_alloc(heap, sizeof(commit_node_t)); node->common.type = QUE_NODE_COMMIT; node->state = COMMIT_NODE_SEND; return(node);}/***************************************************************Performs an execution step for a commit type node in a query graph. */que_thr_t*trx_commit_step(/*============*/ /* out: query thread to run next, or NULL */ que_thr_t* thr) /* in: query thread */{ commit_node_t* node; que_thr_t* next_thr; ibool success;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -