recov_thread.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 645 行 · 第 1/2 页

C
645
字号
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * *  Copyright (C) 2003 Cluster File Systems, Inc. *   Author: Andreas Dilger <adilger@clusterfs.com> * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * *   You may have signed or agreed to another license before downloading *   this software.  If so, you are bound by the terms and conditions *   of that agreement, and the following does not apply to you.  See the *   LICENSE file included with this distribution for more information. * *   If you did not agree to a different license, then this copy of Lustre *   is open source software; you can redistribute it and/or modify it *   under the terms of version 2 of the GNU General Public License as *   published by the Free Software Foundation. * *   In either case, Lustre is distributed in the hope that it will be *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *   license text for more details. * * OST<->MDS recovery logging thread. * * Invariants in implementation: * - we do not share logs among different OST<->MDS connections, so that *   if an OST or MDS fails it need only look at log(s) relevant to itself */#define DEBUG_SUBSYSTEM S_LOG#ifndef EXPORT_SYMTAB# define EXPORT_SYMTAB#endif#ifdef __KERNEL__# include <libcfs/libcfs.h>#else# include <libcfs/list.h># include <liblustre.h>#endif#include <libcfs/kp30.h>#include <obd_class.h>#include <lustre_commit_confd.h>#include <obd_support.h>#include <obd_class.h>#include <lustre_net.h>#include <lnet/types.h>#include <libcfs/list.h>#include <lustre_log.h>#include "ptlrpc_internal.h"#ifdef __KERNEL__/* Allocate new commit structs in case we do not have enough. * Make the llcd size small enough that it fits into a single page when we * are sending/receiving it. */static int llcd_alloc(struct llog_commit_master *lcm){        struct llog_canceld_ctxt *llcd;        int llcd_size;        /* payload of lustre_msg V2 is bigger */        llcd_size = 4096 - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);        OBD_ALLOC(llcd,                  llcd_size + offsetof(struct llog_canceld_ctxt, llcd_cookies));        if (llcd == NULL)                return -ENOMEM;        llcd->llcd_size = llcd_size;        llcd->llcd_lcm = lcm;        spin_lock(&lcm->lcm_llcd_lock);        list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);        atomic_inc(&lcm->lcm_llcd_numfree);        spin_unlock(&lcm->lcm_llcd_lock);        return 0;}/* Get a free cookie struct from the list */static struct llog_canceld_ctxt *llcd_grab(struct llog_commit_master *lcm){        struct llog_canceld_ctxt *llcd;repeat:        spin_lock(&lcm->lcm_llcd_lock);        if (list_empty(&lcm->lcm_llcd_free)) {                spin_unlock(&lcm->lcm_llcd_lock);                if (llcd_alloc(lcm) < 0) {                        CERROR("unable to allocate log commit data!\n");                        return NULL;                }                /* check new llcd wasn't grabbed while lock dropped, b=7407 */                goto repeat;        }        llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);        list_del(&llcd->llcd_list);        atomic_dec(&lcm->lcm_llcd_numfree);        spin_unlock(&lcm->lcm_llcd_lock);        llcd->llcd_cookiebytes = 0;        return llcd;}static void llcd_put(struct llog_canceld_ctxt *llcd){        struct llog_commit_master *lcm = llcd->llcd_lcm;        llog_ctxt_put(llcd->llcd_ctxt);        if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {                int llcd_size = llcd->llcd_size +                         offsetof(struct llog_canceld_ctxt, llcd_cookies);                OBD_FREE(llcd, llcd_size);        } else {                spin_lock(&lcm->lcm_llcd_lock);                list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);                atomic_inc(&lcm->lcm_llcd_numfree);                spin_unlock(&lcm->lcm_llcd_lock);        }}/* Send some cookies to the appropriate target */static void llcd_send(struct llog_canceld_ctxt *llcd){        if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) {                spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);                list_add_tail(&llcd->llcd_list,                              &llcd->llcd_lcm->lcm_llcd_pending);                spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);        }        cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);}/* deleted objects have a commit callback that cancels the MDS * log record for the deletion.  The commit callback calls this * function */int llog_obd_repl_cancel(struct llog_ctxt *ctxt,                         struct lov_stripe_md *lsm, int count,                         struct llog_cookie *cookies, int flags){        struct llog_canceld_ctxt *llcd;        int rc = 0;        ENTRY;        LASSERT(ctxt);        mutex_down(&ctxt->loc_sem);        if (ctxt->loc_imp == NULL) {                CDEBUG(D_RPCTRACE, "no import for ctxt %p\n", ctxt);                GOTO(out, rc = 0);        }        llcd = ctxt->loc_llcd;        if (count > 0 && cookies != NULL) {                if (llcd == NULL) {                        llcd = llcd_grab(ctxt->loc_lcm);                        if (llcd == NULL) {                                CERROR("couldn't get an llcd - dropped "LPX64                                       ":%x+%u\n",                                       cookies->lgc_lgl.lgl_oid,                                       cookies->lgc_lgl.lgl_ogen,                                       cookies->lgc_index);                                GOTO(out, rc = -ENOMEM);                        }                        llcd->llcd_ctxt = llog_ctxt_get(ctxt);                        ctxt->loc_llcd = llcd;                }                memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes,                       cookies, sizeof(*cookies));                llcd->llcd_cookiebytes += sizeof(*cookies);        } else {                if (llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))                        GOTO(out, rc);        }        if ((llcd->llcd_size - llcd->llcd_cookiebytes) < sizeof(*cookies) ||            (flags & OBD_LLOG_FL_SENDNOW)) {                CDEBUG(D_RPCTRACE, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);                ctxt->loc_llcd = NULL;                llcd_send(llcd);        }out:        mutex_up(&ctxt->loc_sem);        return rc;}EXPORT_SYMBOL(llog_obd_repl_cancel);int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp){        int rc = 0;        ENTRY;        if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {                CDEBUG(D_RPCTRACE,"reverse import disconnect, put llcd %p:%p\n",                       ctxt->loc_llcd, ctxt);                mutex_down(&ctxt->loc_sem);                if (ctxt->loc_llcd != NULL) {                        llcd_put(ctxt->loc_llcd);                        ctxt->loc_llcd = NULL;                }                ctxt->loc_imp = NULL;                mutex_up(&ctxt->loc_sem);        } else {                rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);        }        RETURN(rc);}EXPORT_SYMBOL(llog_obd_repl_sync);static void llog_lcm_dec(struct llog_commit_master *lcm){        atomic_dec(&lcm->lcm_thread_total);        cfs_waitq_signal(&lcm->lcm_waitq);}static int log_commit_thread(void *arg){        struct llog_commit_daemon *lcd = arg;        struct llog_commit_master *lcm = lcd->lcd_lcm;        struct llog_canceld_ctxt *llcd, *n;        struct obd_import *import = NULL;        ENTRY;        THREAD_NAME(cfs_curproc_comm(), CFS_CURPROC_COMM_MAX - 1,                    "ll_log_comt_%02d", lcd->lcd_index);                ptlrpc_daemonize(cfs_curproc_comm()); /* thread never needs to do IO */        CDEBUG(D_HA, "%s started\n", cfs_curproc_comm());                do {                struct ptlrpc_request *request;                struct list_head *sending_list;                int rc = 0;                if (import)                        class_import_put(import);                import = NULL;                /* If we do not have enough pages available, allocate some */                while (atomic_read(&lcm->lcm_llcd_numfree) <                       lcm->lcm_llcd_minfree) {                        if (llcd_alloc(lcm) < 0)                                break;                }                spin_lock(&lcm->lcm_thread_lock);                atomic_inc(&lcm->lcm_thread_numidle);                list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle);                spin_unlock(&lcm->lcm_thread_lock);                wait_event_interruptible(lcm->lcm_waitq,                                         !list_empty(&lcm->lcm_llcd_pending) ||                                         lcm->lcm_flags & LLOG_LCM_FL_EXIT);                /* If we are the last available thread, start a new one in case                 * we get blocked on an RPC (nobody else will start a new one)*/                spin_lock(&lcm->lcm_thread_lock);                atomic_dec(&lcm->lcm_thread_numidle);                list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy);                spin_unlock(&lcm->lcm_thread_lock);                sending_list = &lcm->lcm_llcd_pending;        resend:                if (import)                        class_import_put(import);                import = NULL;                if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) {                        lcm->lcm_llcd_maxfree = 0;                        lcm->lcm_llcd_minfree = 0;                        lcm->lcm_thread_max = 0;                        if (list_empty(&lcm->lcm_llcd_pending) ||                            lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE)                                break;                }                if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&                    atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {                        rc = llog_start_commit_thread(lcm);                        if (rc < 0)                                CERROR("error starting thread: rc %d\n", rc);                }                /* Move all of the pending cancels from the same OST off of                 * the list, so we don't get multiple threads blocked and/or                 * doing upcalls on the same OST in case of failure. */                spin_lock(&lcm->lcm_llcd_lock);                if (!list_empty(sending_list)) {                        list_move_tail(sending_list->next,                                       &lcd->lcd_llcd_list);                        llcd = list_entry(lcd->lcd_llcd_list.next,                                          typeof(*llcd), llcd_list);                        LASSERT(llcd->llcd_lcm == lcm);                        import = llcd->llcd_ctxt->loc_imp;                        if (import)                                class_import_get(import);                }                list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {                        LASSERT(llcd->llcd_lcm == lcm);                        if (import == llcd->llcd_ctxt->loc_imp)                                list_move_tail(&llcd->llcd_list,                                               &lcd->lcd_llcd_list);                }                if (sending_list != &lcm->lcm_llcd_resend) {                        list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,                                                 llcd_list) {                                LASSERT(llcd->llcd_lcm == lcm);                                if (import == llcd->llcd_ctxt->loc_imp)                                        list_move_tail(&llcd->llcd_list,                                                       &lcd->lcd_llcd_list);                        }                }                spin_unlock(&lcm->lcm_llcd_lock);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?