recov_thread.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 645 行 · 第 1/2 页

C
645
字号
                /* We are the only one manipulating our local list - no lock */                list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){                        int size[2] = { sizeof(struct ptlrpc_body),                                        llcd->llcd_cookiebytes };                        char *bufs[2] = { NULL, (char *)llcd->llcd_cookies };                        list_del(&llcd->llcd_list);                        if (llcd->llcd_cookiebytes == 0) {                                CDEBUG(D_RPCTRACE, "put empty llcd %p:%p\n",                                       llcd, llcd->llcd_ctxt);                                llcd_put(llcd);                                continue;                        }                        mutex_down(&llcd->llcd_ctxt->loc_sem);                        if (llcd->llcd_ctxt->loc_imp == NULL) {                                mutex_up(&llcd->llcd_ctxt->loc_sem);                                CWARN("import will be destroyed, put "                                      "llcd %p:%p\n", llcd, llcd->llcd_ctxt);                                llcd_put(llcd);                                continue;                        }                        mutex_up(&llcd->llcd_ctxt->loc_sem);                        if (!import || (import == LP_POISON) ||                            (import->imp_client == LP_POISON)) {                                CERROR("No import %p (llcd=%p, ctxt=%p)\n",                                       import, llcd, llcd->llcd_ctxt);                                llcd_put(llcd);                                continue;                        }                        OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10);                        request = ptlrpc_prep_req(import, LUSTRE_LOG_VERSION,                                                  OBD_LOG_CANCEL, 2, size,bufs);                        if (request == NULL) {                                rc = -ENOMEM;                                CERROR("error preparing commit: rc %d\n", rc);                                spin_lock(&lcm->lcm_llcd_lock);                                list_splice(&lcd->lcd_llcd_list,                                            &lcm->lcm_llcd_resend);                                CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list);                                spin_unlock(&lcm->lcm_llcd_lock);                                break;                        }                        /* bug 5515 */                        request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;                        request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;                        ptlrpc_at_set_req_timeout(request);                        ptlrpc_req_set_repsize(request, 1, NULL);                        mutex_down(&llcd->llcd_ctxt->loc_sem);                        if (llcd->llcd_ctxt->loc_imp == NULL) {                                mutex_up(&llcd->llcd_ctxt->loc_sem);                                CWARN("import will be destroyed, put "                                      "llcd %p:%p\n", llcd, llcd->llcd_ctxt);                                llcd_put(llcd);                                ptlrpc_req_finished(request);                                continue;                        }                        mutex_up(&llcd->llcd_ctxt->loc_sem);                        rc = ptlrpc_queue_wait(request);                        ptlrpc_req_finished(request);                        /* If the RPC failed, we put this and the remaining                         * messages onto the resend list for another time. */                        if (rc == 0) {                                llcd_put(llcd);                                continue;                        }                        CERROR("commit %p:%p drop %d cookies: rc %d\n",                               llcd, llcd->llcd_ctxt,                               (int)(llcd->llcd_cookiebytes /                                     sizeof(*llcd->llcd_cookies)), rc);                        llcd_put(llcd);                }                if (rc == 0) {                        sending_list = &lcm->lcm_llcd_resend;                        if (!list_empty(sending_list))                                goto resend;                }        } while(1);        if (import)                class_import_put(import);        /* If we are force exiting, just drop all of the cookies. */        if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) {                spin_lock(&lcm->lcm_llcd_lock);                list_splice(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list);                list_splice(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);                list_splice(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);                spin_unlock(&lcm->lcm_llcd_lock);                list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list)                        llcd_put(llcd);        }        CDEBUG(D_HA, "%s exiting\n", cfs_curproc_comm());        spin_lock(&lcm->lcm_thread_lock);        list_del(&lcd->lcd_lcm_list);        spin_unlock(&lcm->lcm_thread_lock);        OBD_FREE_PTR(lcd);        llog_lcm_dec(lcm);        RETURN(0);}int llog_start_commit_thread(struct llog_commit_master *lcm){        struct llog_commit_daemon *lcd;        int rc, index;         ENTRY;        if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max)                RETURN(0);        /* Check whether it will be cleanup llog commit thread first,         * If not, increate the lcm_thread_total count to prevent the          * lcm being freed when the log_commit_thread is started */        spin_lock(&lcm->lcm_thread_lock);        if (!lcm->lcm_flags & LLOG_LCM_FL_EXIT) {                 atomic_inc(&lcm->lcm_thread_total);                index = atomic_read(&lcm->lcm_thread_total);                spin_unlock(&lcm->lcm_thread_lock);        } else {                spin_unlock(&lcm->lcm_thread_lock);                RETURN(0);        }        OBD_ALLOC_PTR(lcd);        if (lcd == NULL)                GOTO(cleanup, rc = -ENOMEM);        CFS_INIT_LIST_HEAD(&lcd->lcd_lcm_list);        CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list);        lcd->lcd_index = index;        lcd->lcd_lcm = lcm;        rc = cfs_kernel_thread(log_commit_thread, lcd, CLONE_VM | CLONE_FILES);cleanup:        if (rc < 0) {                CERROR("error starting thread #%d: %d\n", lcd->lcd_index, rc);                llog_lcm_dec(lcm);                if (lcd)                         OBD_FREE_PTR(lcd);                RETURN(rc);        }        RETURN(0);}EXPORT_SYMBOL(llog_start_commit_thread);static struct llog_process_args {        struct semaphore         llpa_sem;        struct llog_ctxt        *llpa_ctxt;        void                    *llpa_cb;        void                    *llpa_arg;} llpa;int llog_init_commit_master(struct llog_commit_master *lcm){        CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy);        CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle);        spin_lock_init(&lcm->lcm_thread_lock);        atomic_set(&lcm->lcm_thread_numidle, 0);        cfs_waitq_init(&lcm->lcm_waitq);        CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_pending);        CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_resend);        CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_free);        spin_lock_init(&lcm->lcm_llcd_lock);        atomic_set(&lcm->lcm_llcd_numfree, 0);        lcm->lcm_llcd_minfree = 0;        lcm->lcm_thread_max = 5;        /* FIXME initialize semaphore for llog_process_args */        sema_init(&llpa.llpa_sem, 1);        return 0;}EXPORT_SYMBOL(llog_init_commit_master);int llog_cleanup_commit_master(struct llog_commit_master *lcm,                               int force){        spin_lock(&lcm->lcm_thread_lock);        lcm->lcm_flags |= LLOG_LCM_FL_EXIT;        if (force)                lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE;                spin_unlock(&lcm->lcm_thread_lock);                cfs_waitq_signal(&lcm->lcm_waitq);        wait_event_interruptible(lcm->lcm_waitq,                                 atomic_read(&lcm->lcm_thread_total) == 0);        return 0;}EXPORT_SYMBOL(llog_cleanup_commit_master);static int log_process_thread(void *args){        struct llog_process_args *data = args;        struct llog_ctxt *ctxt = data->llpa_ctxt;        void   *cb = data->llpa_cb;        struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg);        struct llog_handle *llh = NULL;        int rc;        ENTRY;        mutex_up(&data->llpa_sem);        ptlrpc_daemonize("llog_process");     /* thread does IO to log files */        rc = llog_create(ctxt, &llh, &logid, NULL);        if (rc) {                CERROR("llog_create failed %d\n", rc);                GOTO(out, rc);        }        rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);        if (rc) {                CERROR("llog_init_handle failed %d\n", rc);                GOTO(release_llh, rc);        }        if (cb) {                rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);                if (rc != LLOG_PROC_BREAK)                        CERROR("llog_cat_process failed %d\n", rc);        } else {                CWARN("no callback function for recovery\n");        }        CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",               ctxt->loc_llcd, ctxt);        llog_sync(ctxt, NULL);release_llh:        rc = llog_cat_put(llh);        if (rc)                CERROR("llog_cat_put failed %d\n", rc);out:        llog_ctxt_put(ctxt);        RETURN(rc);}static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg){        struct obd_device *obd = ctxt->loc_obd;        int rc;        ENTRY;        if (obd->obd_stopping)                RETURN(-ENODEV);        mutex_down(&llpa.llpa_sem);        llpa.llpa_cb = handle;        llpa.llpa_arg = arg;        llpa.llpa_ctxt = llog_get_context(ctxt->loc_obd, ctxt->loc_idx);        if (!llpa.llpa_ctxt) {                mutex_up(&llpa.llpa_sem);                RETURN(-ENODEV);        }        rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);        if (rc < 0)                CERROR("error starting log_process_thread: %d\n", rc);        else {                CDEBUG(D_HA, "log_process_thread: %d\n", rc);                rc = 0;        }        RETURN(rc);}int llog_repl_connect(struct llog_ctxt *ctxt, int count,                      struct llog_logid *logid, struct llog_gen *gen,                      struct obd_uuid *uuid){        struct llog_canceld_ctxt *llcd;        int rc;        ENTRY;        /* send back llcd before recovery from llog */        if (ctxt->loc_llcd != NULL) {                CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt);                llog_sync(ctxt, NULL);        }        mutex_down(&ctxt->loc_sem);        ctxt->loc_gen = *gen;        llcd = llcd_grab(ctxt->loc_lcm);        if (llcd == NULL) {                CERROR("couldn't get an llcd\n");                mutex_up(&ctxt->loc_sem);                RETURN(-ENOMEM);        }        llcd->llcd_ctxt = llog_ctxt_get(ctxt);        ctxt->loc_llcd = llcd;        mutex_up(&ctxt->loc_sem);        rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);        if (rc != 0)                CERROR("error recovery process: %d\n", rc);        RETURN(rc);}EXPORT_SYMBOL(llog_repl_connect);#else /* !__KERNEL__ */int llog_obd_repl_cancel(struct llog_ctxt *ctxt,                         struct lov_stripe_md *lsm, int count,                         struct llog_cookie *cookies, int flags){        return 0;}#endif

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?