ldlm_flock.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 580 行 · 第 1/2 页
C
580 行
} if (added) { ldlm_flock_destroy(lock, mode, *flags); } else { new = lock; added = 1; } continue; } if (new->l_policy_data.l_flock.start > lock->l_policy_data.l_flock.end) continue; if (new->l_policy_data.l_flock.end < lock->l_policy_data.l_flock.start) break; ++overlaps; if (new->l_policy_data.l_flock.start <= lock->l_policy_data.l_flock.start) { if (new->l_policy_data.l_flock.end < lock->l_policy_data.l_flock.end) { lock->l_policy_data.l_flock.start = new->l_policy_data.l_flock.end + 1; break; } ldlm_flock_destroy(lock, lock->l_req_mode, *flags); continue; } if (new->l_policy_data.l_flock.end >= lock->l_policy_data.l_flock.end) { lock->l_policy_data.l_flock.end = new->l_policy_data.l_flock.start - 1; continue; } /* split the existing lock into two locks */ /* if this is an F_UNLCK operation then we could avoid * allocating a new lock and use the req lock passed in * with the request but this would complicate the reply * processing since updates to req get reflected in the * reply. The client side replays the lock request so * it must see the original lock data in the reply. */ /* XXX - if ldlm_lock_new() can sleep we should * release the ns_lock, allocate the new lock, * and restart processing this lock. */ if (!new2) { unlock_res_and_lock(req); new2 = ldlm_lock_create(ns, res->lr_name, LDLM_FLOCK, lock->l_granted_mode, NULL, NULL, NULL, NULL, 0); lock_res_and_lock(req); if (!new2) { ldlm_flock_destroy(req, lock->l_granted_mode, *flags); *err = -ENOLCK; RETURN(LDLM_ITER_STOP); } goto reprocess; } splitted = 1; new2->l_granted_mode = lock->l_granted_mode; new2->l_policy_data.l_flock.pid = new->l_policy_data.l_flock.pid; new2->l_policy_data.l_flock.start = lock->l_policy_data.l_flock.start; new2->l_policy_data.l_flock.end = new->l_policy_data.l_flock.start - 1; lock->l_policy_data.l_flock.start = new->l_policy_data.l_flock.end + 1; new2->l_conn_export = lock->l_conn_export; if (lock->l_export != NULL) { new2->l_export = class_export_get(lock->l_export); spin_lock(&new2->l_export->exp_ldlm_data.led_lock); list_add(&new2->l_export_chain, &new2->l_export->exp_ldlm_data.led_held_locks); spin_unlock(&new2->l_export->exp_ldlm_data.led_lock); } if (*flags == LDLM_FL_WAIT_NOREPROC) { ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode); } /* insert new2 at lock */ ldlm_resource_add_lock(res, ownlocks, new2); LDLM_LOCK_PUT(new2); break; } /* if new2 is created but never used, destroy it*/ if (splitted == 0 && new2 != NULL) ldlm_lock_destroy_nolock(new2); /* At this point we're granting the lock request. */ req->l_granted_mode = req->l_req_mode; /* Add req to the granted queue before calling ldlm_reprocess_all(). */ if (!added) { list_del_init(&req->l_res_link); /* insert new lock before ownlocks in list. */ ldlm_resource_add_lock(res, ownlocks, req); } if (*flags != LDLM_FL_WAIT_NOREPROC) { if (first_enq) { /* If this is an unlock, reprocess the waitq and * send completions ASTs for locks that can now be * granted. The only problem with doing this * reprocessing here is that the completion ASTs for * newly granted locks will be sent before the unlock * completion is sent. It shouldn't be an issue. Also * note that ldlm_process_flock_lock() will recurse, * but only once because first_enq will be false from * ldlm_reprocess_queue. */ if ((mode == LCK_NL) && overlaps) { struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list); int rc;restart: ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list); unlock_res_and_lock(req); rc = ldlm_run_cp_ast_work(&rpc_list); lock_res_and_lock(req); if (rc == -ERESTART) GOTO(restart, -ERESTART); } } else { LASSERT(req->l_completion_ast); ldlm_add_ast_work_item(req, NULL, work_list); } } /* In case we're reprocessing the requested lock we can't destroy * it until after calling ldlm_ast_work_item() above so that lawi() * can bump the reference count on req. Otherwise req could be freed * before the completion AST can be sent. */ if (added) ldlm_flock_destroy(req, mode, *flags); ldlm_resource_dump(D_OTHER, res); RETURN(LDLM_ITER_CONTINUE);}struct ldlm_flock_wait_data { struct ldlm_lock *fwd_lock; int fwd_generation;};static voidldlm_flock_interrupted_wait(void *data){ struct ldlm_lock *lock; struct lustre_handle lockh; int rc; ENTRY; lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock; /* take lock off the deadlock detection waitq. */ list_del_init(&lock->l_flock_waitq); /* client side - set flag to prevent lock from being put on lru list */ lock->l_flags |= LDLM_FL_CBPENDING; ldlm_lock_decref_internal(lock, lock->l_req_mode); ldlm_lock2handle(lock, &lockh); rc = ldlm_cli_cancel(&lockh); if (rc != ELDLM_OK) CERROR("ldlm_cli_cancel: %d\n", rc); EXIT;}intldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data){ struct ldlm_namespace *ns; cfs_flock_t *getlk = lock->l_ast_data; struct ldlm_flock_wait_data fwd; struct obd_device *obd; struct obd_import *imp = NULL; ldlm_error_t err; int rc = 0; struct l_wait_info lwi; ENTRY; CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n", flags, data, getlk); LASSERT(flags != LDLM_FL_WAIT_NOREPROC); if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV))) goto granted; LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, " "sleeping"); fwd.fwd_lock = lock; obd = class_exp2obd(lock->l_conn_export); /* if this is a local lock, then there is no import */ if (obd != NULL) imp = obd->u.cli.cl_import; if (imp != NULL) { spin_lock(&imp->imp_lock); fwd.fwd_generation = imp->imp_generation; spin_unlock(&imp->imp_lock); } lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd); /* Go to sleep until the lock is granted. */ rc = l_wait_event(lock->l_waitq, ((lock->l_req_mode == lock->l_granted_mode) || lock->l_destroyed), &lwi); LDLM_DEBUG(lock, "client-side enqueue waking up: rc = %d", rc); RETURN(rc); granted: LDLM_DEBUG(lock, "client-side enqueue granted"); ns = lock->l_resource->lr_namespace; lock_res_and_lock(lock); /* take lock off the deadlock detection waitq. */ list_del_init(&lock->l_flock_waitq); /* ldlm_lock_enqueue() has already placed lock on the granted list. */ list_del_init(&lock->l_res_link); if (flags & LDLM_FL_TEST_LOCK) { /* fcntl(F_GETLK) request */ /* The old mode was saved in getlk->fl_type so that if the mode * in the lock changes we can decref the approprate refcount. */ ldlm_flock_destroy(lock, cfs_flock_type(getlk), LDLM_FL_WAIT_NOREPROC); switch (lock->l_granted_mode) { case LCK_PR: cfs_flock_set_type(getlk, F_RDLCK); break; case LCK_PW: cfs_flock_set_type(getlk, F_WRLCK); break; default: cfs_flock_set_type(getlk, F_UNLCK); } cfs_flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid); cfs_flock_set_start(getlk, (loff_t)lock->l_policy_data.l_flock.start); cfs_flock_set_end(getlk, (loff_t)lock->l_policy_data.l_flock.end); } else { int noreproc = LDLM_FL_WAIT_NOREPROC; /* We need to reprocess the lock to do merges or splits * with existing locks owned by this process. */ ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL); if (flags == 0) cfs_waitq_signal(&lock->l_waitq); } unlock_res_and_lock(lock); RETURN(0);}EXPORT_SYMBOL(ldlm_flock_completion_ast);int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag){ struct ldlm_namespace *ns; ENTRY; LASSERT(lock); LASSERT(flag == LDLM_CB_CANCELING); ns = lock->l_resource->lr_namespace; /* take lock off the deadlock detection waitq. */ lock_res_and_lock(lock); list_del_init(&lock->l_flock_waitq); unlock_res_and_lock(lock); RETURN(0);}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?