ldlm_request.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,663 行 · 第 1/5 页
C
1,663 行
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2002, 2003 Cluster File Systems, Inc. * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * * You may have signed or agreed to another license before downloading * this software. If so, you are bound by the terms and conditions * of that agreement, and the following does not apply to you. See the * LICENSE file included with this distribution for more information. * * If you did not agree to a different license, then this copy of Lustre * is open source software; you can redistribute it and/or modify it * under the terms of version 2 of the GNU General Public License as * published by the Free Software Foundation. * * In either case, Lustre is distributed in the hope that it will be * useful, but WITHOUT ANY WARRANTY; without even the implied warranty * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * license text for more details. */#define DEBUG_SUBSYSTEM S_LDLM#ifndef __KERNEL__#include <signal.h>#include <liblustre.h>#endif#include <lustre_dlm.h>#include <obd_class.h>#include <obd.h>#include "ldlm_internal.h"int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644, "lock enqueue timeout minimum");static void interrupted_completion_wait(void *data){}struct lock_wait_data { struct ldlm_lock *lwd_lock; __u32 lwd_conn_cnt;};struct ldlm_async_args { struct lustre_handle lock_handle;};int ldlm_expired_completion_wait(void *data){ struct lock_wait_data *lwd = data; struct ldlm_lock *lock = lwd->lwd_lock; struct obd_import *imp; struct obd_device *obd; ENTRY; if (lock->l_conn_export == NULL) { static cfs_time_t next_dump = 0, last_dump = 0; LDLM_ERROR(lock, "lock timed out (enqueued at %lu, %lus ago); " "not entering recovery in server code, just going " "back to sleep", lock->l_enqueued_time.tv_sec, cfs_time_current_sec() - lock->l_enqueued_time.tv_sec); if (cfs_time_after(cfs_time_current(), next_dump)) { last_dump = next_dump; next_dump = cfs_time_shift(300); ldlm_namespace_dump(D_DLMTRACE, lock->l_resource->lr_namespace); if (last_dump == 0) libcfs_debug_dumplog(); } RETURN(0); } obd = lock->l_conn_export->exp_obd; imp = obd->u.cli.cl_import; ptlrpc_fail_import(imp, lwd->lwd_conn_cnt); LDLM_ERROR(lock, "lock timed out (enqueued at %lu, %lus ago), entering " "recovery for %s@%s", lock->l_enqueued_time.tv_sec, CURRENT_SECONDS - lock->l_enqueued_time.tv_sec, obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid); RETURN(0);}/* We use the same basis for both server side and client side functions from a single node. */int ldlm_get_enq_timeout(struct ldlm_lock *lock){ int timeout = at_get(&lock->l_resource->lr_namespace->ns_at_estimate); if (AT_OFF) return obd_timeout / 2; /* Since these are non-updating timeouts, we should be conservative. It would be nice to have some kind of "early reply" mechanism for lock callbacks too... */ timeout = timeout + (timeout >> 1); /* 150% */ return max(timeout, ldlm_enqueue_min);}static int is_granted_or_cancelled(struct ldlm_lock *lock){ int ret = 0; lock_res_and_lock(lock); if (((lock->l_req_mode == lock->l_granted_mode) && !(lock->l_flags & LDLM_FL_CP_REQD)) || (lock->l_flags & LDLM_FL_FAILED)) ret = 1; unlock_res_and_lock(lock); return ret;}int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data){ /* XXX ALLOCATE - 160 bytes */ struct lock_wait_data lwd; struct obd_device *obd; struct obd_import *imp = NULL; struct l_wait_info lwi; __u32 timeout; int rc = 0; ENTRY; if (flags == LDLM_FL_WAIT_NOREPROC) { LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock"); goto noreproc; } if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV))) { cfs_waitq_signal(&lock->l_waitq); RETURN(0); } LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, " "sleeping"); ldlm_lock_dump(D_OTHER, lock, 0); ldlm_reprocess_all(lock->l_resource);noreproc: obd = class_exp2obd(lock->l_conn_export); /* if this is a local lock, then there is no import */ if (obd != NULL) { imp = obd->u.cli.cl_import; } /* Wait a long time for enqueue - server may have to callback a lock from another client. Server will evict the other client if it doesn't respond reasonably, and then give us the lock. */ timeout = ldlm_get_enq_timeout(lock) * 2; lwd.lwd_lock = lock; if (lock->l_flags & LDLM_FL_NO_TIMEOUT) { LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT"); lwi = LWI_INTR(interrupted_completion_wait, &lwd); } else { lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout), ldlm_expired_completion_wait, interrupted_completion_wait, &lwd); } if (imp != NULL) { spin_lock(&imp->imp_lock); lwd.lwd_conn_cnt = imp->imp_conn_cnt; spin_unlock(&imp->imp_lock); } /* Go to sleep until the lock is granted or cancelled. */ rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi); if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) { LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed"); RETURN(-EIO); } if (rc) { LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)", rc); RETURN(rc); } LDLM_DEBUG(lock, "client-side enqueue waking up: granted after %lds", cfs_time_current_sec() - lock->l_enqueued_time.tv_sec); /* Update our time estimate */ at_add(&lock->l_resource->lr_namespace->ns_at_estimate, cfs_time_current_sec() - lock->l_enqueued_time.tv_sec); RETURN(0);}/* * ->l_blocking_ast() callback for LDLM locks acquired by server-side OBDs. */int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag){ int do_ast; ENTRY; if (flag == LDLM_CB_CANCELING) { /* Don't need to do anything here. */ RETURN(0); } lock_res_and_lock(lock); /* Get this: if ldlm_blocking_ast is racing with intent_policy, such * that ldlm_blocking_ast is called just before intent_policy method * takes the ns_lock, then by the time we get the lock, we might not * be the correct blocking function anymore. So check, and return * early, if so. */ if (lock->l_blocking_ast != ldlm_blocking_ast) { unlock_res_and_lock(lock); RETURN(0); } lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); unlock_res_and_lock(lock); if (do_ast) { struct lustre_handle lockh; int rc; LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel"); ldlm_lock2handle(lock, &lockh); rc = ldlm_cli_cancel(&lockh); if (rc < 0) CERROR("ldlm_cli_cancel: %d\n", rc); } else { LDLM_DEBUG(lock, "Lock still has references, will be " "cancelled later"); } RETURN(0);}/* * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See * comment in filter_intent_policy() on why you may need this. */int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp){ /* * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for * that is rather subtle: with OST-side locking, it may so happen that * _all_ extent locks are held by the OST. If client wants to obtain * current file size it calls ll{,u}_glimpse_size(), and (as locks are * on the server), dummy glimpse callback fires and does * nothing. Client still receives correct file size due to the * following fragment in filter_intent_policy(): * * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB * if (rc != 0 && res->lr_namespace->ns_lvbo && * res->lr_namespace->ns_lvbo->lvbo_update) { * res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1); * } * * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and * returns correct file size to the client. */ return -ELDLM_NO_LOCK_DATA;}int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, ldlm_type_t type, ldlm_policy_data_t *policy, ldlm_mode_t mode, int *flags, ldlm_blocking_callback blocking, ldlm_completion_callback completion, ldlm_glimpse_callback glimpse, void *data, __u32 lvb_len, void *lvb_swabber, struct lustre_handle *lockh){ struct ldlm_lock *lock; int err; ENTRY; LASSERT(!(*flags & LDLM_FL_REPLAY)); if (ns_is_client(ns)) { CERROR("Trying to enqueue local lock in a shadow namespace\n"); LBUG(); } lock = ldlm_lock_create(ns, *res_id, type, mode, blocking, completion, glimpse, data, lvb_len); if (!lock) GOTO(out_nolock, err = -ENOMEM); LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created"); ldlm_lock_addref_internal(lock, mode); ldlm_lock2handle(lock, lockh); lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_LOCAL; if (*flags & LDLM_FL_ATOMIC_CB) lock->l_flags |= LDLM_FL_ATOMIC_CB; lock->l_lvb_swabber = lvb_swabber; unlock_res_and_lock(lock); if (policy != NULL) lock->l_policy_data = *policy; if (type == LDLM_EXTENT) lock->l_req_extent = policy->l_extent; err = ldlm_lock_enqueue(ns, &lock, policy, flags); if (err != ELDLM_OK) GOTO(out, err); if (policy != NULL) *policy = lock->l_policy_data; if ((*flags) & LDLM_FL_LOCK_CHANGED) *res_id = lock->l_resource->lr_name; LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)", lock); if (lock->l_completion_ast) lock->l_completion_ast(lock, *flags, NULL); LDLM_DEBUG(lock, "client-side local enqueue END"); EXIT; out: LDLM_LOCK_PUT(lock); out_nolock: return err;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?