ldlm_lock.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,767 行 · 第 1/5 页
C
1,767 行
LDLM_FL_WAIT_NOREPROC, NULL); if (err) { if (flags & LDLM_FL_TEST_LOCK) LDLM_LOCK_PUT(lock); else ldlm_lock_decref_internal(lock, mode); rc = 0; goto out2; } } lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout), NULL, LWI_ON_SIGNAL_NOOP, NULL); /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */ l_wait_event(lock->l_waitq, (lock->l_flags & LDLM_FL_LVB_READY), &lwi); } } out2: if (rc) { LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")", (type == LDLM_PLAIN || type == LDLM_IBITS) ? res_id->name[2] : policy->l_extent.start, (type == LDLM_PLAIN || type == LDLM_IBITS) ? res_id->name[3] : policy->l_extent.end); } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/ LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res " LPU64"/"LPU64" ("LPU64" "LPU64")", ns, type, mode, res_id->name[0], res_id->name[1], (type == LDLM_PLAIN || type == LDLM_IBITS) ? res_id->name[2] :policy->l_extent.start, (type == LDLM_PLAIN || type == LDLM_IBITS) ? res_id->name[3] : policy->l_extent.end); } if (old_lock) LDLM_LOCK_PUT(old_lock); if (flags & LDLM_FL_TEST_LOCK && rc) LDLM_LOCK_PUT(lock); return rc ? mode : 0;}/* Returns a referenced lock */struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, struct ldlm_res_id res_id, ldlm_type_t type, ldlm_mode_t mode, ldlm_blocking_callback blocking, ldlm_completion_callback completion, ldlm_glimpse_callback glimpse, void *data, __u32 lvb_len){ struct ldlm_lock *lock; struct ldlm_resource *res; ENTRY; res = ldlm_resource_get(ns, NULL, res_id, type, 1); if (res == NULL) RETURN(NULL); lock = ldlm_lock_new(res); ldlm_resource_putref(res); if (lock == NULL) RETURN(NULL); lock->l_req_mode = mode; lock->l_ast_data = data; lock->l_blocking_ast = blocking; lock->l_completion_ast = completion; lock->l_glimpse_ast = glimpse; lock->l_pid = cfs_curproc_pid(); lock->l_tree_node = NULL; /* if this is the extent lock, allocate the interval tree node */ if (type == LDLM_EXTENT) { if (ldlm_interval_alloc(lock) == NULL) GOTO(out, 0); } if (lvb_len) { lock->l_lvb_len = lvb_len; OBD_ALLOC(lock->l_lvb_data, lvb_len); if (lock->l_lvb_data == NULL) GOTO(out, 0); } RETURN(lock);out: if (lock->l_lvb_data) OBD_FREE(lock->l_lvb_data, lvb_len); ldlm_interval_free(ldlm_interval_detach(lock)); OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock)); return NULL;}ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, struct ldlm_lock **lockp, void *cookie, int *flags){ struct ldlm_lock *lock = *lockp; struct ldlm_resource *res = lock->l_resource; int local = ns_is_client(res->lr_namespace); ldlm_processing_policy policy; ldlm_error_t rc = ELDLM_OK; struct ldlm_interval *node = NULL; ENTRY; do_gettimeofday(&lock->l_enqueued_time); /* policies are not executed on the client or during replay */ if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT && !local && ns->ns_policy) { rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags, NULL); if (rc == ELDLM_LOCK_REPLACED) { /* The lock that was returned has already been granted, * and placed into lockp. If it's not the same as the * one we passed in, then destroy the old one and our * work here is done. */ if (lock != *lockp) { ldlm_lock_destroy(lock); LDLM_LOCK_PUT(lock); } *flags |= LDLM_FL_LOCK_CHANGED; RETURN(0); } else if (rc != ELDLM_OK || (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) { ldlm_lock_destroy(lock); RETURN(rc); } } /* For a replaying lock, it might be already in granted list. So * unlinking the lock will cause the interval node to be freed, we * have to allocate the interval node early otherwise we can't regrant * this lock in the future. - jay */ if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT) OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node)); lock_res_and_lock(lock); if (local && lock->l_req_mode == lock->l_granted_mode) { /* The server returned a blocked lock, but it was granted * before we got a chance to actually enqueue it. We don't * need to do anything else. */ *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT); GOTO(out, ELDLM_OK); } ldlm_resource_unlink_lock(lock); if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) { if (node == NULL) { ldlm_lock_destroy_nolock(lock); GOTO(out, rc = -ENOMEM); } CFS_INIT_LIST_HEAD(&node->li_group); ldlm_interval_attach(node, lock); node = NULL; } /* Some flags from the enqueue want to make it into the AST, via the * lock's l_flags. */ lock->l_flags |= *flags & LDLM_AST_DISCARD_DATA; /* This distinction between local lock trees is very important; a client * namespace only has information about locks taken by that client, and * thus doesn't have enough information to decide for itself if it can * be granted (below). In this case, we do exactly what the server * tells us to do, as dictated by the 'flags'. * * We do exactly the same thing during recovery, when the server is * more or less trusting the clients not to lie. * * FIXME (bug 268): Detect obvious lies by checking compatibility in * granted/converting queues. */ if (local) { if (*flags & LDLM_FL_BLOCK_CONV) ldlm_resource_add_lock(res, &res->lr_converting, lock); else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); else ldlm_grant_lock(lock, NULL); GOTO(out, ELDLM_OK); } else if (*flags & LDLM_FL_REPLAY) { if (*flags & LDLM_FL_BLOCK_CONV) { ldlm_resource_add_lock(res, &res->lr_converting, lock); GOTO(out, ELDLM_OK); } else if (*flags & LDLM_FL_BLOCK_WAIT) { ldlm_resource_add_lock(res, &res->lr_waiting, lock); GOTO(out, ELDLM_OK); } else if (*flags & LDLM_FL_BLOCK_GRANTED) { ldlm_grant_lock(lock, NULL); GOTO(out, ELDLM_OK); } /* If no flags, fall through to normal enqueue path. */ } policy = ldlm_processing_policy_table[res->lr_type]; policy(lock, flags, 1, &rc, NULL); GOTO(out, rc);out: unlock_res_and_lock(lock); if (node) OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node)); return rc;}/* Must be called with namespace taken: queue is waiting or converting. */int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, struct list_head *work_list){ struct list_head *tmp, *pos; ldlm_processing_policy policy; int flags; int rc = LDLM_ITER_CONTINUE; ldlm_error_t err; ENTRY; check_res_locked(res); policy = ldlm_processing_policy_table[res->lr_type]; LASSERT(policy); list_for_each_safe(tmp, pos, queue) { struct ldlm_lock *pending; pending = list_entry(tmp, struct ldlm_lock, l_res_link); CDEBUG(D_INFO, "Reprocessing lock %p\n", pending); flags = 0; rc = policy(pending, &flags, 0, &err, work_list); if (rc != LDLM_ITER_CONTINUE) break; } RETURN(rc);}/* Helper function for pair ldlm_run_{bl,cp}_ast_work(). * * Send an existing rpc set specified by @arg->set and then * destroy it. Create new one if @do_create flag is set. */static voidldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create){ int rc; rc = ptlrpc_set_wait(arg->set); if (arg->type == LDLM_BL_CALLBACK) OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2); ptlrpc_set_destroy(arg->set); if (do_create) arg->set = ptlrpc_prep_set();}int ldlm_run_bl_ast_work(struct list_head *rpc_list){ struct ldlm_cb_set_arg arg; struct list_head *tmp, *pos; struct ldlm_lock_desc d; int ast_count; int rc = 0; ENTRY; arg.set = ptlrpc_prep_set(); atomic_set(&arg.restart, 0); arg.type = LDLM_BL_CALLBACK; ast_count = 0; list_for_each_safe(tmp, pos, rpc_list) { struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_bl_ast); /* nobody should touch l_bl_ast */ lock_res_and_lock(lock); list_del_init(&lock->l_bl_ast); LASSERT(lock->l_flags & LDLM_FL_AST_SENT); LASSERT(lock->l_bl_ast_run == 0); LASSERT(lock->l_blocking_lock); lock->l_bl_ast_run++; unlock_res_and_lock(lock); ldlm_lock2desc(lock->l_blocking_lock, &d); LDLM_LOCK_PUT(lock->l_blocking_lock); lock->l_blocking_lock = NULL; rc = lock->l_blocking_ast(lock, &d, (void *)&arg, LDLM_CB_BLOCKING); LDLM_LOCK_PUT(lock); ast_count++; /* Send the request set if it exceeds the PARALLEL_AST_LIMIT, * and create a new set for requests that remained in * @rpc_list */ if (unlikely(ast_count == PARALLEL_AST_LIMIT)) { ldlm_send_and_maybe_create_set(&arg, 1); ast_count = 0; } } if (ast_count > 0) ldlm_send_and_maybe_create_set(&arg, 0); else /* In case when number of ASTs is multiply of * PARALLEL_AST_LIMIT or @rpc_list was initially empty, * @arg.set must be destroyed here, otherwise we get * write memory leaking. */ ptlrpc_set_destroy(arg.set); RETURN(atomic_read(&arg.restart) ? -ERESTART : 0);}int ldlm_run_cp_ast_work(struct list_head *rpc_list){ struct ldlm_cb_set_arg arg; struct list_head *tmp, *pos; int ast_count; int rc = 0; ENTRY; arg.set = ptlrpc_prep_set(); atomic_set(&arg.restart, 0); arg.type = LDLM_CP_CALLBACK; /* It's possible to receive a completion AST before we've set * the l_completion_ast pointer: either because the AST arrived * before the reply, or simply because there's a small race * window between receiving the reply and finishing the local * enqueue. (bug 842) * * This can't happen with the blocking_ast, however, because we * will never call the local blocking_ast until we drop our * reader/writer reference, which we won't do until we get the * reply and finish enqueueing. */ ast_count = 0; list_for_each_safe(tmp, pos, rpc_list) { struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_cp_ast); ldlm_completion_callback completion_callback; /* nobody should touch l_cp_ast */ lock_res_and_lock(lock); list_del_init(&lock->l_cp_ast); LASSERT(lock->l_flags & LDLM_FL_CP_REQD); /* save l_completion_ast since it can be changed by * mds_intent_policy(), see bug 14225 */ completion_callback = lock->l_completion_ast;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?