ldlm_lockd.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,721 行 · 第 1/5 页
C
1,721 行
/* Don't move a pending lock onto the export if it has already * been evicted. Cancel it now instead. (bug 5683) */ if (req->rq_export->exp_failed || OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT)) { LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export); rc = -ENOTCONN; } else if (lock->l_flags & LDLM_FL_AST_SENT) { dlm_rep->lock_flags |= LDLM_FL_AST_SENT; if (lock->l_granted_mode == lock->l_req_mode) { /* Only cancel lock if it was granted, because it * would be destroyed immediatelly and would never * be granted in the future, causing timeouts on client. * Not granted lock will be cancelled immediatelly after * sending completion AST. */ if (dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK) { unlock_res_and_lock(lock); ldlm_lock_cancel(lock); lock_res_and_lock(lock); } else ldlm_add_waiting_lock(lock); } } /* Make sure we never ever grant usual metadata locks to liblustre clients */ if ((dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN || dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) && req->rq_export->exp_libclient) { if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) || !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK)) { CERROR("Granting sync lock to libclient. " "req fl %d, rep fl %d, lock fl %d\n", dlm_req->lock_flags, dlm_rep->lock_flags, lock->l_flags); LDLM_ERROR(lock, "sync lock"); if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) { struct ldlm_intent *it; it = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF, sizeof(*it)); if (it != NULL) { CERROR("This is intent %s ("LPU64")\n", ldlm_it2str(it->opc), it->opc); } } } } unlock_res_and_lock(lock); EXIT; out: req->rq_status = rc ?: err; /* return either error - bug 11190 */ if (!req->rq_packed_final) { err = lustre_pack_reply(req, 1, NULL, NULL); if (rc == 0) rc = err; } /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this * ldlm_reprocess_all. If this moves, revisit that code. -phil */ if (lock) { LDLM_DEBUG(lock, "server-side enqueue handler, sending reply" "(err=%d, rc=%d)", err, rc); if (rc == 0) { lock_res_and_lock(lock); size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len; if (size[DLM_REPLY_REC_OFF] > 0) { void *lvb = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, size[DLM_REPLY_REC_OFF]); LASSERTF(lvb != NULL, "req %p, lock %p\n", req, lock); memcpy(lvb, lock->l_resource->lr_lvb_data, size[DLM_REPLY_REC_OFF]); } unlock_res_and_lock(lock); } else { lock_res_and_lock(lock); ldlm_resource_unlink_lock(lock); ldlm_lock_destroy_nolock(lock); unlock_res_and_lock(lock); } if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK) ldlm_reprocess_all(lock->l_resource); LDLM_LOCK_PUT(lock); } LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)", lock, rc); return rc;}int ldlm_handle_convert(struct ptlrpc_request *req){ struct ldlm_request *dlm_req; struct ldlm_reply *dlm_rep; struct ldlm_lock *lock; int rc; int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREPLY_OFF] = sizeof(*dlm_rep) }; ENTRY; dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req), lustre_swab_ldlm_request); if (dlm_req == NULL) { CERROR ("Can't unpack dlm_req\n"); RETURN (-EFAULT); } if (req->rq_export && req->rq_export->exp_ldlm_stats) lprocfs_counter_incr(req->rq_export->exp_ldlm_stats, LDLM_CONVERT - LDLM_FIRST_OPC); rc = lustre_pack_reply(req, 2, size, NULL); if (rc) RETURN(rc); dlm_rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*dlm_rep)); dlm_rep->lock_flags = dlm_req->lock_flags; lock = ldlm_handle2lock(&dlm_req->lock_handle[0]); if (!lock) { req->rq_status = EINVAL; } else { void *res = NULL; LDLM_DEBUG(lock, "server-side convert handler START"); do_gettimeofday(&lock->l_enqueued_time); res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode, &dlm_rep->lock_flags); if (res) { if (ldlm_del_waiting_lock(lock)) LDLM_DEBUG(lock, "converted waiting lock"); req->rq_status = 0; } else { req->rq_status = EDEADLOCK; } } if (lock) { if (!req->rq_status) ldlm_reprocess_all(lock->l_resource); LDLM_DEBUG(lock, "server-side convert handler END"); LDLM_LOCK_PUT(lock); } else LDLM_DEBUG_NOLOCK("server-side convert handler END"); RETURN(0);}/* Cancel all the locks whos handles are packed into ldlm_request */int ldlm_request_cancel(struct ptlrpc_request *req, struct ldlm_request *dlm_req, int first){ struct ldlm_resource *res, *pres = NULL; struct ldlm_lock *lock; int i, count, done = 0; ENTRY; count = dlm_req->lock_count ? dlm_req->lock_count : 1; if (first >= count) RETURN(0); /* There is no lock on the server at the replay time, * skip lock cancelling to make replay tests to pass. */ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) RETURN(0); LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks", count - first); for (i = first; i < count; i++) { lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); if (!lock) { LDLM_DEBUG_NOLOCK("server-side cancel handler stale " "lock (cookie "LPU64")", dlm_req->lock_handle[i].cookie); continue; } done++; res = lock->l_resource; if (res != pres) { if (pres != NULL) { ldlm_reprocess_all(pres); ldlm_resource_putref(pres); } if (res != NULL) { ldlm_resource_getref(res); ldlm_res_lvbo_update(res, NULL, 0, 1); } pres = res; } ldlm_lock_cancel(lock); LDLM_LOCK_PUT(lock); } if (pres != NULL) { ldlm_reprocess_all(pres); ldlm_resource_putref(pres); } LDLM_DEBUG_NOLOCK("server-side cancel handler END"); RETURN(done);}int ldlm_handle_cancel(struct ptlrpc_request *req){ struct ldlm_request *dlm_req; int rc; ENTRY; dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req), lustre_swab_ldlm_request); if (dlm_req == NULL) { CERROR("bad request buffer for cancel\n"); RETURN(-EFAULT); } if (req->rq_export && req->rq_export->exp_ldlm_stats) lprocfs_counter_incr(req->rq_export->exp_ldlm_stats, LDLM_CANCEL - LDLM_FIRST_OPC); rc = lustre_pack_reply(req, 1, NULL, NULL); if (rc) RETURN(rc); if (!ldlm_request_cancel(req, dlm_req, 0)) req->rq_status = ESTALE; if (ptlrpc_reply(req) != 0) LBUG(); RETURN(0);}void ldlm_handle_bl_callback(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct ldlm_lock *lock){ int do_ast; ENTRY; LDLM_DEBUG(lock, "client blocking AST callback handler START"); lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CBPENDING; if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) lock->l_flags |= LDLM_FL_CANCEL; do_ast = (!lock->l_readers && !lock->l_writers); unlock_res_and_lock(lock); if (do_ast) { LDLM_DEBUG(lock, "already unused, calling " "callback (%p)", lock->l_blocking_ast); if (lock->l_blocking_ast != NULL) lock->l_blocking_ast(lock, ld, lock->l_ast_data, LDLM_CB_BLOCKING); } else { LDLM_DEBUG(lock, "Lock still has references, will be" " cancelled later"); } LDLM_DEBUG(lock, "client blocking callback handler END"); LDLM_LOCK_PUT(lock); EXIT;}static void ldlm_handle_cp_callback(struct ptlrpc_request *req, struct ldlm_namespace *ns, struct ldlm_request *dlm_req, struct ldlm_lock *lock){ CFS_LIST_HEAD(ast_list); ENTRY; LDLM_DEBUG(lock, "client completion callback handler START"); if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) { int to = cfs_time_seconds(1); while (to > 0) { to = schedule_timeout(to); if (lock->l_granted_mode == lock->l_req_mode || lock->l_destroyed) break; } } lock_res_and_lock(lock); if (lock->l_destroyed || lock->l_granted_mode == lock->l_req_mode) { /* bug 11300: the lock has already been granted */ unlock_res_and_lock(lock); LDLM_DEBUG(lock, "Double grant race happened"); LDLM_LOCK_PUT(lock); EXIT; return; } /* If we receive the completion AST before the actual enqueue returned, * then we might need to switch lock modes, resources, or extents. */ if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) { lock->l_req_mode = dlm_req->lock_desc.l_granted_mode; LDLM_DEBUG(lock, "completion AST, new lock mode"); } if (lock->l_resource->lr_type != LDLM_PLAIN) { lock->l_policy_data = dlm_req->lock_desc.l_policy_data; LDLM_DEBUG(lock, "completion AST, new policy data"); } ldlm_resource_unlink_lock(lock); if (memcmp(&dlm_req->lock_desc.l_resource.lr_name, &lock->l_resource->lr_name, sizeof(lock->l_resource->lr_name)) != 0) { unlock_res_and_lock(lock); ldlm_lock_change_resource(ns, lock, dlm_req->lock_desc.l_resource.lr_name); LDLM_DEBUG(lock, "completion AST, new resource"); CERROR("change resource!\n"); lock_res_and_lock(lock); } if (dlm_req->lock_flags & LDLM_FL_AST_SENT) { /* BL_AST locks are not needed in lru. * let ldlm_cancel_lru() be fast. */ ldlm_lock_remove_from_lru(lock); lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST; LDLM_DEBUG(lock, "completion AST includes blocking AST"); } if (lock->l_lvb_len) { void *lvb; lvb = lustre_swab_reqbuf(req, DLM_REQ_REC_OFF, lock->l_lvb_len, lock->l_lvb_swabber); if (lvb == NULL) { LDLM_ERROR(lock, "completion AST did not contain " "expected LVB!");
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?