recov_thread.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 645 行 · 第 1/2 页
C
645 行
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2003 Cluster File Systems, Inc. * Author: Andreas Dilger <adilger@clusterfs.com> * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * * You may have signed or agreed to another license before downloading * this software. If so, you are bound by the terms and conditions * of that agreement, and the following does not apply to you. See the * LICENSE file included with this distribution for more information. * * If you did not agree to a different license, then this copy of Lustre * is open source software; you can redistribute it and/or modify it * under the terms of version 2 of the GNU General Public License as * published by the Free Software Foundation. * * In either case, Lustre is distributed in the hope that it will be * useful, but WITHOUT ANY WARRANTY; without even the implied warranty * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * license text for more details. * * OST<->MDS recovery logging thread. * * Invariants in implementation: * - we do not share logs among different OST<->MDS connections, so that * if an OST or MDS fails it need only look at log(s) relevant to itself */#define DEBUG_SUBSYSTEM S_LOG#ifndef EXPORT_SYMTAB# define EXPORT_SYMTAB#endif#ifdef __KERNEL__# include <libcfs/libcfs.h>#else# include <libcfs/list.h># include <liblustre.h>#endif#include <libcfs/kp30.h>#include <obd_class.h>#include <lustre_commit_confd.h>#include <obd_support.h>#include <obd_class.h>#include <lustre_net.h>#include <lnet/types.h>#include <libcfs/list.h>#include <lustre_log.h>#include "ptlrpc_internal.h"#ifdef __KERNEL__/* Allocate new commit structs in case we do not have enough. * Make the llcd size small enough that it fits into a single page when we * are sending/receiving it. */static int llcd_alloc(struct llog_commit_master *lcm){ struct llog_canceld_ctxt *llcd; int llcd_size; /* payload of lustre_msg V2 is bigger */ llcd_size = 4096 - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL); OBD_ALLOC(llcd, llcd_size + offsetof(struct llog_canceld_ctxt, llcd_cookies)); if (llcd == NULL) return -ENOMEM; llcd->llcd_size = llcd_size; llcd->llcd_lcm = lcm; spin_lock(&lcm->lcm_llcd_lock); list_add(&llcd->llcd_list, &lcm->lcm_llcd_free); atomic_inc(&lcm->lcm_llcd_numfree); spin_unlock(&lcm->lcm_llcd_lock); return 0;}/* Get a free cookie struct from the list */static struct llog_canceld_ctxt *llcd_grab(struct llog_commit_master *lcm){ struct llog_canceld_ctxt *llcd;repeat: spin_lock(&lcm->lcm_llcd_lock); if (list_empty(&lcm->lcm_llcd_free)) { spin_unlock(&lcm->lcm_llcd_lock); if (llcd_alloc(lcm) < 0) { CERROR("unable to allocate log commit data!\n"); return NULL; } /* check new llcd wasn't grabbed while lock dropped, b=7407 */ goto repeat; } llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list); list_del(&llcd->llcd_list); atomic_dec(&lcm->lcm_llcd_numfree); spin_unlock(&lcm->lcm_llcd_lock); llcd->llcd_cookiebytes = 0; return llcd;}static void llcd_put(struct llog_canceld_ctxt *llcd){ struct llog_commit_master *lcm = llcd->llcd_lcm; llog_ctxt_put(llcd->llcd_ctxt); if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) { int llcd_size = llcd->llcd_size + offsetof(struct llog_canceld_ctxt, llcd_cookies); OBD_FREE(llcd, llcd_size); } else { spin_lock(&lcm->lcm_llcd_lock); list_add(&llcd->llcd_list, &lcm->lcm_llcd_free); atomic_inc(&lcm->lcm_llcd_numfree); spin_unlock(&lcm->lcm_llcd_lock); }}/* Send some cookies to the appropriate target */static void llcd_send(struct llog_canceld_ctxt *llcd){ if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) { spin_lock(&llcd->llcd_lcm->lcm_llcd_lock); list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending); spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock); } cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);}/* deleted objects have a commit callback that cancels the MDS * log record for the deletion. The commit callback calls this * function */int llog_obd_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags){ struct llog_canceld_ctxt *llcd; int rc = 0; ENTRY; LASSERT(ctxt); mutex_down(&ctxt->loc_sem); if (ctxt->loc_imp == NULL) { CDEBUG(D_RPCTRACE, "no import for ctxt %p\n", ctxt); GOTO(out, rc = 0); } llcd = ctxt->loc_llcd; if (count > 0 && cookies != NULL) { if (llcd == NULL) { llcd = llcd_grab(ctxt->loc_lcm); if (llcd == NULL) { CERROR("couldn't get an llcd - dropped "LPX64 ":%x+%u\n", cookies->lgc_lgl.lgl_oid, cookies->lgc_lgl.lgl_ogen, cookies->lgc_index); GOTO(out, rc = -ENOMEM); } llcd->llcd_ctxt = llog_ctxt_get(ctxt); ctxt->loc_llcd = llcd; } memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies, sizeof(*cookies)); llcd->llcd_cookiebytes += sizeof(*cookies); } else { if (llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW)) GOTO(out, rc); } if ((llcd->llcd_size - llcd->llcd_cookiebytes) < sizeof(*cookies) || (flags & OBD_LLOG_FL_SENDNOW)) { CDEBUG(D_RPCTRACE, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt); ctxt->loc_llcd = NULL; llcd_send(llcd); }out: mutex_up(&ctxt->loc_sem); return rc;}EXPORT_SYMBOL(llog_obd_repl_cancel);int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp){ int rc = 0; ENTRY; if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) { CDEBUG(D_RPCTRACE,"reverse import disconnect, put llcd %p:%p\n", ctxt->loc_llcd, ctxt); mutex_down(&ctxt->loc_sem); if (ctxt->loc_llcd != NULL) { llcd_put(ctxt->loc_llcd); ctxt->loc_llcd = NULL; } ctxt->loc_imp = NULL; mutex_up(&ctxt->loc_sem); } else { rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW); } RETURN(rc);}EXPORT_SYMBOL(llog_obd_repl_sync);static void llog_lcm_dec(struct llog_commit_master *lcm){ atomic_dec(&lcm->lcm_thread_total); cfs_waitq_signal(&lcm->lcm_waitq);}static int log_commit_thread(void *arg){ struct llog_commit_daemon *lcd = arg; struct llog_commit_master *lcm = lcd->lcd_lcm; struct llog_canceld_ctxt *llcd, *n; struct obd_import *import = NULL; ENTRY; THREAD_NAME(cfs_curproc_comm(), CFS_CURPROC_COMM_MAX - 1, "ll_log_comt_%02d", lcd->lcd_index); ptlrpc_daemonize(cfs_curproc_comm()); /* thread never needs to do IO */ CDEBUG(D_HA, "%s started\n", cfs_curproc_comm()); do { struct ptlrpc_request *request; struct list_head *sending_list; int rc = 0; if (import) class_import_put(import); import = NULL; /* If we do not have enough pages available, allocate some */ while (atomic_read(&lcm->lcm_llcd_numfree) < lcm->lcm_llcd_minfree) { if (llcd_alloc(lcm) < 0) break; } spin_lock(&lcm->lcm_thread_lock); atomic_inc(&lcm->lcm_thread_numidle); list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle); spin_unlock(&lcm->lcm_thread_lock); wait_event_interruptible(lcm->lcm_waitq, !list_empty(&lcm->lcm_llcd_pending) || lcm->lcm_flags & LLOG_LCM_FL_EXIT); /* If we are the last available thread, start a new one in case * we get blocked on an RPC (nobody else will start a new one)*/ spin_lock(&lcm->lcm_thread_lock); atomic_dec(&lcm->lcm_thread_numidle); list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy); spin_unlock(&lcm->lcm_thread_lock); sending_list = &lcm->lcm_llcd_pending; resend: if (import) class_import_put(import); import = NULL; if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) { lcm->lcm_llcd_maxfree = 0; lcm->lcm_llcd_minfree = 0; lcm->lcm_thread_max = 0; if (list_empty(&lcm->lcm_llcd_pending) || lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) break; } if (atomic_read(&lcm->lcm_thread_numidle) <= 1 && atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) { rc = llog_start_commit_thread(lcm); if (rc < 0) CERROR("error starting thread: rc %d\n", rc); } /* Move all of the pending cancels from the same OST off of * the list, so we don't get multiple threads blocked and/or * doing upcalls on the same OST in case of failure. */ spin_lock(&lcm->lcm_llcd_lock); if (!list_empty(sending_list)) { list_move_tail(sending_list->next, &lcd->lcd_llcd_list); llcd = list_entry(lcd->lcd_llcd_list.next, typeof(*llcd), llcd_list); LASSERT(llcd->llcd_lcm == lcm); import = llcd->llcd_ctxt->loc_imp; if (import) class_import_get(import); } list_for_each_entry_safe(llcd, n, sending_list, llcd_list) { LASSERT(llcd->llcd_lcm == lcm); if (import == llcd->llcd_ctxt->loc_imp) list_move_tail(&llcd->llcd_list, &lcd->lcd_llcd_list); } if (sending_list != &lcm->lcm_llcd_resend) { list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend, llcd_list) { LASSERT(llcd->llcd_lcm == lcm); if (import == llcd->llcd_ctxt->loc_imp) list_move_tail(&llcd->llcd_list, &lcd->lcd_llcd_list); } } spin_unlock(&lcm->lcm_llcd_lock);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?