recov_thread.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 645 行 · 第 1/2 页
C
645 行
/* We are the only one manipulating our local list - no lock */ list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){ int size[2] = { sizeof(struct ptlrpc_body), llcd->llcd_cookiebytes }; char *bufs[2] = { NULL, (char *)llcd->llcd_cookies }; list_del(&llcd->llcd_list); if (llcd->llcd_cookiebytes == 0) { CDEBUG(D_RPCTRACE, "put empty llcd %p:%p\n", llcd, llcd->llcd_ctxt); llcd_put(llcd); continue; } mutex_down(&llcd->llcd_ctxt->loc_sem); if (llcd->llcd_ctxt->loc_imp == NULL) { mutex_up(&llcd->llcd_ctxt->loc_sem); CWARN("import will be destroyed, put " "llcd %p:%p\n", llcd, llcd->llcd_ctxt); llcd_put(llcd); continue; } mutex_up(&llcd->llcd_ctxt->loc_sem); if (!import || (import == LP_POISON) || (import->imp_client == LP_POISON)) { CERROR("No import %p (llcd=%p, ctxt=%p)\n", import, llcd, llcd->llcd_ctxt); llcd_put(llcd); continue; } OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10); request = ptlrpc_prep_req(import, LUSTRE_LOG_VERSION, OBD_LOG_CANCEL, 2, size,bufs); if (request == NULL) { rc = -ENOMEM; CERROR("error preparing commit: rc %d\n", rc); spin_lock(&lcm->lcm_llcd_lock); list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend); CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list); spin_unlock(&lcm->lcm_llcd_lock); break; } /* bug 5515 */ request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL; request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; ptlrpc_at_set_req_timeout(request); ptlrpc_req_set_repsize(request, 1, NULL); mutex_down(&llcd->llcd_ctxt->loc_sem); if (llcd->llcd_ctxt->loc_imp == NULL) { mutex_up(&llcd->llcd_ctxt->loc_sem); CWARN("import will be destroyed, put " "llcd %p:%p\n", llcd, llcd->llcd_ctxt); llcd_put(llcd); ptlrpc_req_finished(request); continue; } mutex_up(&llcd->llcd_ctxt->loc_sem); rc = ptlrpc_queue_wait(request); ptlrpc_req_finished(request); /* If the RPC failed, we put this and the remaining * messages onto the resend list for another time. */ if (rc == 0) { llcd_put(llcd); continue; } CERROR("commit %p:%p drop %d cookies: rc %d\n", llcd, llcd->llcd_ctxt, (int)(llcd->llcd_cookiebytes / sizeof(*llcd->llcd_cookies)), rc); llcd_put(llcd); } if (rc == 0) { sending_list = &lcm->lcm_llcd_resend; if (!list_empty(sending_list)) goto resend; } } while(1); if (import) class_import_put(import); /* If we are force exiting, just drop all of the cookies. */ if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) { spin_lock(&lcm->lcm_llcd_lock); list_splice(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list); list_splice(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list); list_splice(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list); spin_unlock(&lcm->lcm_llcd_lock); list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list) llcd_put(llcd); } CDEBUG(D_HA, "%s exiting\n", cfs_curproc_comm()); spin_lock(&lcm->lcm_thread_lock); list_del(&lcd->lcd_lcm_list); spin_unlock(&lcm->lcm_thread_lock); OBD_FREE_PTR(lcd); llog_lcm_dec(lcm); RETURN(0);}int llog_start_commit_thread(struct llog_commit_master *lcm){ struct llog_commit_daemon *lcd; int rc, index; ENTRY; if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max) RETURN(0); /* Check whether it will be cleanup llog commit thread first, * If not, increate the lcm_thread_total count to prevent the * lcm being freed when the log_commit_thread is started */ spin_lock(&lcm->lcm_thread_lock); if (!lcm->lcm_flags & LLOG_LCM_FL_EXIT) { atomic_inc(&lcm->lcm_thread_total); index = atomic_read(&lcm->lcm_thread_total); spin_unlock(&lcm->lcm_thread_lock); } else { spin_unlock(&lcm->lcm_thread_lock); RETURN(0); } OBD_ALLOC_PTR(lcd); if (lcd == NULL) GOTO(cleanup, rc = -ENOMEM); CFS_INIT_LIST_HEAD(&lcd->lcd_lcm_list); CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list); lcd->lcd_index = index; lcd->lcd_lcm = lcm; rc = cfs_kernel_thread(log_commit_thread, lcd, CLONE_VM | CLONE_FILES);cleanup: if (rc < 0) { CERROR("error starting thread #%d: %d\n", lcd->lcd_index, rc); llog_lcm_dec(lcm); if (lcd) OBD_FREE_PTR(lcd); RETURN(rc); } RETURN(0);}EXPORT_SYMBOL(llog_start_commit_thread);static struct llog_process_args { struct semaphore llpa_sem; struct llog_ctxt *llpa_ctxt; void *llpa_cb; void *llpa_arg;} llpa;int llog_init_commit_master(struct llog_commit_master *lcm){ CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy); CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle); spin_lock_init(&lcm->lcm_thread_lock); atomic_set(&lcm->lcm_thread_numidle, 0); cfs_waitq_init(&lcm->lcm_waitq); CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_pending); CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_resend); CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_free); spin_lock_init(&lcm->lcm_llcd_lock); atomic_set(&lcm->lcm_llcd_numfree, 0); lcm->lcm_llcd_minfree = 0; lcm->lcm_thread_max = 5; /* FIXME initialize semaphore for llog_process_args */ sema_init(&llpa.llpa_sem, 1); return 0;}EXPORT_SYMBOL(llog_init_commit_master);int llog_cleanup_commit_master(struct llog_commit_master *lcm, int force){ spin_lock(&lcm->lcm_thread_lock); lcm->lcm_flags |= LLOG_LCM_FL_EXIT; if (force) lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE; spin_unlock(&lcm->lcm_thread_lock); cfs_waitq_signal(&lcm->lcm_waitq); wait_event_interruptible(lcm->lcm_waitq, atomic_read(&lcm->lcm_thread_total) == 0); return 0;}EXPORT_SYMBOL(llog_cleanup_commit_master);static int log_process_thread(void *args){ struct llog_process_args *data = args; struct llog_ctxt *ctxt = data->llpa_ctxt; void *cb = data->llpa_cb; struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg); struct llog_handle *llh = NULL; int rc; ENTRY; mutex_up(&data->llpa_sem); ptlrpc_daemonize("llog_process"); /* thread does IO to log files */ rc = llog_create(ctxt, &llh, &logid, NULL); if (rc) { CERROR("llog_create failed %d\n", rc); GOTO(out, rc); } rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL); if (rc) { CERROR("llog_init_handle failed %d\n", rc); GOTO(release_llh, rc); } if (cb) { rc = llog_cat_process(llh, (llog_cb_t)cb, NULL); if (rc != LLOG_PROC_BREAK) CERROR("llog_cat_process failed %d\n", rc); } else { CWARN("no callback function for recovery\n"); } CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n", ctxt->loc_llcd, ctxt); llog_sync(ctxt, NULL);release_llh: rc = llog_cat_put(llh); if (rc) CERROR("llog_cat_put failed %d\n", rc);out: llog_ctxt_put(ctxt); RETURN(rc);}static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg){ struct obd_device *obd = ctxt->loc_obd; int rc; ENTRY; if (obd->obd_stopping) RETURN(-ENODEV); mutex_down(&llpa.llpa_sem); llpa.llpa_cb = handle; llpa.llpa_arg = arg; llpa.llpa_ctxt = llog_get_context(ctxt->loc_obd, ctxt->loc_idx); if (!llpa.llpa_ctxt) { mutex_up(&llpa.llpa_sem); RETURN(-ENODEV); } rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES); if (rc < 0) CERROR("error starting log_process_thread: %d\n", rc); else { CDEBUG(D_HA, "log_process_thread: %d\n", rc); rc = 0; } RETURN(rc);}int llog_repl_connect(struct llog_ctxt *ctxt, int count, struct llog_logid *logid, struct llog_gen *gen, struct obd_uuid *uuid){ struct llog_canceld_ctxt *llcd; int rc; ENTRY; /* send back llcd before recovery from llog */ if (ctxt->loc_llcd != NULL) { CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt); llog_sync(ctxt, NULL); } mutex_down(&ctxt->loc_sem); ctxt->loc_gen = *gen; llcd = llcd_grab(ctxt->loc_lcm); if (llcd == NULL) { CERROR("couldn't get an llcd\n"); mutex_up(&ctxt->loc_sem); RETURN(-ENOMEM); } llcd->llcd_ctxt = llog_ctxt_get(ctxt); ctxt->loc_llcd = llcd; mutex_up(&ctxt->loc_sem); rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid); if (rc != 0) CERROR("error recovery process: %d\n", rc); RETURN(rc);}EXPORT_SYMBOL(llog_repl_connect);#else /* !__KERNEL__ */int llog_obd_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags){ return 0;}#endif
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?