ldlm_lockd.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,721 行 · 第 1/5 页
C
1,721 行
* Called with namespace lock held. */int __ldlm_del_waiting_lock(struct ldlm_lock *lock){ struct list_head *list_next; if (list_empty(&lock->l_pending_chain)) return 0; list_next = lock->l_pending_chain.next; if (lock->l_pending_chain.prev == &waiting_locks_list) { /* Removing the head of the list, adjust timer. */ if (list_next == &waiting_locks_list) { /* No more, just cancel. */ cfs_timer_disarm(&waiting_locks_timer); } else { struct ldlm_lock *next; next = list_entry(list_next, struct ldlm_lock, l_pending_chain); cfs_timer_arm(&waiting_locks_timer, round_timeout(next->l_callback_timeout)); } } list_del_init(&lock->l_pending_chain); return 1;}int ldlm_del_waiting_lock(struct ldlm_lock *lock){ int ret; if (lock->l_export == NULL) { /* We don't have a "waiting locks list" on clients. */ LDLM_DEBUG(lock, "client lock: no-op"); return 0; } spin_lock_bh(&waiting_locks_spinlock); ret = __ldlm_del_waiting_lock(lock); spin_unlock_bh(&waiting_locks_spinlock); LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed"); return ret;}/* * Prolong the lock * * Called with namespace lock held. */int ldlm_refresh_waiting_lock(struct ldlm_lock *lock){ if (lock->l_export == NULL) { /* We don't have a "waiting locks list" on clients. */ LDLM_DEBUG(lock, "client lock: no-op"); return 0; } spin_lock_bh(&waiting_locks_spinlock); if (list_empty(&lock->l_pending_chain)) { spin_unlock_bh(&waiting_locks_spinlock); LDLM_DEBUG(lock, "wasn't waiting"); return 0; } __ldlm_del_waiting_lock(lock); __ldlm_add_waiting_lock(lock); spin_unlock_bh(&waiting_locks_spinlock); LDLM_DEBUG(lock, "refreshed"); return 1;}#else /* !__KERNEL__ */static int ldlm_add_waiting_lock(struct ldlm_lock *lock){ LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)); RETURN(1);}int ldlm_del_waiting_lock(struct ldlm_lock *lock){ RETURN(0);}int ldlm_refresh_waiting_lock(struct ldlm_lock *lock){ RETURN(0);}#endif /* __KERNEL__ */static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, const char *ast_type){ struct ptlrpc_connection *conn = lock->l_export->exp_connection; char *str = libcfs_nid2str(conn->c_peer.nid); LCONSOLE_ERROR_MSG(0x138, "%s: A client on nid %s was evicted due " "to a lock %s callback to %s timed out: rc %d\n", lock->l_export->exp_obd->obd_name, str, ast_type, obd_export_nid2str(lock->l_export), rc); if (obd_dump_on_timeout) libcfs_debug_dumplog(); class_fail_export(lock->l_export);}static int ldlm_handle_ast_error(struct ldlm_lock *lock, struct ptlrpc_request *req, int rc, const char *ast_type){ lnet_process_id_t peer = req->rq_import->imp_connection->c_peer; if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) { LASSERT(lock->l_export); if (lock->l_export->exp_libclient) { LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)" " timeout, just cancelling lock", ast_type, libcfs_nid2str(peer.nid)); ldlm_lock_cancel(lock); rc = -ERESTART; } else if (lock->l_flags & LDLM_FL_CANCEL) { LDLM_DEBUG(lock, "%s AST timeout from nid %s, but " "cancel was received (AST reply lost?)", ast_type, libcfs_nid2str(peer.nid)); ldlm_lock_cancel(lock); rc = -ERESTART; } else { ldlm_del_waiting_lock(lock); ldlm_failed_ast(lock, rc, ast_type); } } else if (rc) { if (rc == -EINVAL) LDLM_DEBUG(lock, "client (nid %s) returned %d" " from %s AST - normal race", libcfs_nid2str(peer.nid), lustre_msg_get_status(req->rq_repmsg), ast_type); else LDLM_ERROR(lock, "client (nid %s) returned %d " "from %s AST", libcfs_nid2str(peer.nid), (req->rq_repmsg != NULL) ? lustre_msg_get_status(req->rq_repmsg) : 0, ast_type); ldlm_lock_cancel(lock); /* Server-side AST functions are called from ldlm_reprocess_all, * which needs to be told to please restart its reprocessing. */ rc = -ERESTART; } return rc;}static int ldlm_cb_interpret(struct ptlrpc_request *req, void *data, int rc){ struct ldlm_cb_set_arg *arg; struct ldlm_lock *lock; ENTRY; LASSERT(data != NULL); arg = req->rq_async_args.pointer_arg[0]; lock = req->rq_async_args.pointer_arg[1]; LASSERT(lock != NULL); if (rc != 0) { /* If client canceled the lock but the cancel has not * been recieved yet, we need to update lvbo to have the * proper attributes cached. */ if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK) ldlm_res_lvbo_update(lock->l_resource, NULL, 0, 1); rc = ldlm_handle_ast_error(lock, req, rc, arg->type == LDLM_BL_CALLBACK ? "blocking" : "completion"); } LDLM_LOCK_PUT(lock); if (rc == -ERESTART) atomic_set(&arg->restart, 1); RETURN(0);}static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req, struct ldlm_cb_set_arg *arg, struct ldlm_lock *lock, int instant_cancel){ int rc = 0; ENTRY; if (unlikely(instant_cancel)) { rc = ptl_send_rpc(req, 1); ptlrpc_req_finished(req); if (rc == 0) /* If we cancelled the lock, we need to restart * ldlm_reprocess_queue */ atomic_set(&arg->restart, 1); } else { LDLM_LOCK_GET(lock); ptlrpc_set_add_req(arg->set, req); } RETURN(rc);}/* * ->l_blocking_ast() method for server-side locks. This is invoked when newly * enqueued server lock conflicts with given one. * * Sends blocking ast rpc to the client owning that lock; arms timeout timer * to wait for client response. */int ldlm_server_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag){ struct ldlm_cb_set_arg *arg = data; struct ldlm_request *body; struct ptlrpc_request *req; int size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(*body) }; int instant_cancel = 0, rc; ENTRY; if (flag == LDLM_CB_CANCELING) { /* Don't need to do anything here. */ RETURN(0); } LASSERT(lock); LASSERT(data != NULL); req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse, LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK, 2, size, NULL); if (req == NULL) RETURN(-ENOMEM); req->rq_async_args.pointer_arg[0] = arg; req->rq_async_args.pointer_arg[1] = lock; req->rq_interpret_reply = ldlm_cb_interpret; req->rq_no_resend = 1; lock_res(lock->l_resource); if (lock->l_granted_mode != lock->l_req_mode) { /* this blocking AST will be communicated as part of the * completion AST instead */ unlock_res(lock->l_resource); ptlrpc_req_finished(req); LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); RETURN(0); } if (lock->l_destroyed) { /* What's the point? */ unlock_res(lock->l_resource); ptlrpc_req_finished(req); RETURN(0); }#if 0 if (CURRENT_SECONDS - lock->l_export->exp_last_request_time > 30){ unlock_res(lock->l_resource); ptlrpc_req_finished(req); ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking"); RETURN(-ETIMEDOUT); }#endif if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) instant_cancel = 1; body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); body->lock_handle[0] = lock->l_remote_handle; body->lock_desc = *desc; body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS); LDLM_DEBUG(lock, "server preparing blocking AST"); ptlrpc_req_set_repsize(req, 1, NULL); if (instant_cancel) { unlock_res(lock->l_resource); ldlm_lock_cancel(lock); } else { LASSERT(lock->l_granted_mode == lock->l_req_mode); ldlm_add_waiting_lock(lock); unlock_res(lock->l_resource); } req->rq_send_state = LUSTRE_IMP_FULL; /* ptlrpc_prep_req already set timeout */ if (AT_OFF) req->rq_timeout = ldlm_get_rq_timeout(); if (lock->l_export && lock->l_export->exp_ldlm_stats) lprocfs_counter_incr(lock->l_export->exp_ldlm_stats, LDLM_BL_CALLBACK - LDLM_FIRST_OPC); rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel); RETURN(rc);}int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data){ struct ldlm_cb_set_arg *arg = data; struct ldlm_request *body; struct ptlrpc_request *req; struct timeval granted_time; long total_enqueue_wait; int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(*body) }; int rc, buffers = 2, instant_cancel = 0; ENTRY; LASSERT(lock != NULL); LASSERT(data != NULL); do_gettimeofday(&granted_time); total_enqueue_wait = cfs_timeval_sub(&granted_time, &lock->l_enqueued_time, NULL); if (total_enqueue_wait / ONE_MILLION > obd_timeout) /* non-fatal with AT - change to LDLM_DEBUG? */ LDLM_ERROR(lock, "enqueue wait took %luus from %lu", total_enqueue_wait, lock->l_enqueued_time.tv_sec); lock_res_and_lock(lock); if (lock->l_resource->lr_lvb_len) { size[DLM_REQ_REC_OFF] = lock->l_resource->lr_lvb_len; buffers = 3; } unlock_res_and_lock(lock); req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK, buffers, size, NULL); if (req == NULL) RETURN(-ENOMEM);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?