ldlm_request.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,663 行 · 第 1/5 页
C
1,663 行
/* lock not sent to server yet */ if (reqp == NULL || *reqp == NULL) { req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); if (req == NULL) { failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode); LDLM_LOCK_PUT(lock); RETURN(-ENOMEM); } req_passed_in = 0; if (reqp) *reqp = req; } else { req = *reqp; LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) >= sizeof(*body), "buflen[%d] = %d, not %d\n", DLM_LOCKREQ_OFF, lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF), (int)sizeof(*body)); } lock->l_conn_export = exp; lock->l_export = NULL; lock->l_blocking_ast = einfo->ei_cb_bl; /* Dump lock data into the request buffer */ body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); ldlm_lock2desc(lock, &body->lock_desc); body->lock_flags = *flags; body->lock_handle[0] = *lockh; /* Continue as normal. */ if (!req_passed_in) { size[DLM_LOCKREPLY_OFF] = sizeof(*reply); ptlrpc_req_set_repsize(req, 2 + (lvb_len > 0), size); } /* * Liblustre client doesn't get extent locks, except for O_APPEND case * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where * [i_size, OBD_OBJECT_EOF] lock is taken. */ LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT || policy->l_extent.end == OBD_OBJECT_EOF)); if (async) { LASSERT(reqp != NULL); RETURN(0); } LDLM_DEBUG(lock, "sending request"); rc = ptlrpc_queue_wait(req); err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0, einfo->ei_mode, flags, lvb, lvb_len, lvb_swabber, lockh, rc); /* If ldlm_cli_enqueue_fini did not find the lock, we need to free * one reference that we took */ if (err == -ENOLCK) LDLM_LOCK_PUT(lock); else rc = err; if (!req_passed_in && req != NULL) { ptlrpc_req_finished(req); if (reqp) *reqp = NULL; } RETURN(rc);}static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, __u32 *flags){ struct ldlm_resource *res; int rc; ENTRY; if (ns_is_client(lock->l_resource->lr_namespace)) { CERROR("Trying to cancel local lock\n"); LBUG(); } LDLM_DEBUG(lock, "client-side local convert"); res = ldlm_lock_convert(lock, new_mode, flags); if (res) { ldlm_reprocess_all(res); rc = 0; } else { rc = EDEADLOCK; } LDLM_DEBUG(lock, "client-side local convert handler END"); LDLM_LOCK_PUT(lock); RETURN(rc);}/* FIXME: one of ldlm_cli_convert or the server side should reject attempted * conversion of locks which are on the waiting or converting queue *//* Caller of this code is supposed to take care of lock readers/writers accounting */int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags){ struct ldlm_request *body; struct ldlm_reply *reply; struct ldlm_lock *lock; struct ldlm_resource *res; struct ptlrpc_request *req; int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(*body) }; int rc; ENTRY; lock = ldlm_handle2lock(lockh); if (!lock) { LBUG(); RETURN(-EINVAL); } *flags = 0; if (lock->l_conn_export == NULL) RETURN(ldlm_cli_convert_local(lock, new_mode, flags)); LDLM_DEBUG(lock, "client-side convert"); req = ptlrpc_prep_req(class_exp2cliimp(lock->l_conn_export), LUSTRE_DLM_VERSION, LDLM_CONVERT, 2, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); body->lock_handle[0] = lock->l_remote_handle; body->lock_desc.l_req_mode = new_mode; body->lock_flags = *flags; size[DLM_LOCKREPLY_OFF] = sizeof(*reply); ptlrpc_req_set_repsize(req, 2, size); rc = ptlrpc_queue_wait(req); if (rc != ELDLM_OK) GOTO(out, rc); reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply), lustre_swab_ldlm_reply); if (reply == NULL) { CERROR ("Can't unpack ldlm_reply\n"); GOTO (out, rc = -EPROTO); } if (req->rq_status) GOTO(out, rc = req->rq_status); res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags); if (res != NULL) { ldlm_reprocess_all(res); /* Go to sleep until the lock is granted. */ /* FIXME: or cancelled. */ if (lock->l_completion_ast) { rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC, NULL); if (rc) GOTO(out, rc); } } else { rc = EDEADLOCK; } EXIT; out: LDLM_LOCK_PUT(lock); ptlrpc_req_finished(req); return rc;}/* Cancel locks locally. * Returns: * LDLM_FL_LOCAL_ONLY if tere is no need in a CANCEL rpc to the server; * LDLM_FL_CANCELING otherwise; * LDLM_FL_BL_AST if there is a need in a separate CANCEL rpc. */static int ldlm_cli_cancel_local(struct ldlm_lock *lock){ int rc = LDLM_FL_LOCAL_ONLY; ENTRY; if (lock->l_conn_export) { int local_only; LDLM_DEBUG(lock, "client-side cancel"); /* Set this flag to prevent others from getting new references*/ lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CBPENDING; local_only = (lock->l_flags & (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK)); ldlm_cancel_callback(lock); rc = (lock->l_flags & LDLM_FL_BL_AST) ? LDLM_FL_BL_AST : LDLM_FL_CANCELING; unlock_res_and_lock(lock); if (local_only) { CDEBUG(D_DLMTRACE, "not sending request (at caller's " "instruction)\n"); rc = LDLM_FL_LOCAL_ONLY; } ldlm_lock_cancel(lock); } else { if (ns_is_client(lock->l_resource->lr_namespace)) { LDLM_ERROR(lock, "Trying to cancel local lock"); LBUG(); } LDLM_DEBUG(lock, "server-side local cancel"); ldlm_lock_cancel(lock); ldlm_reprocess_all(lock->l_resource); LDLM_DEBUG(lock, "server-side local cancel handler END"); } RETURN(rc);}/* Pack @count locks in @head into ldlm_request buffer at the offset @off, of the request @req. */static void ldlm_cancel_pack(struct ptlrpc_request *req, int off, struct list_head *head, int count){ struct ldlm_request *dlm; struct ldlm_lock *lock; int max, packed = 0; ENTRY; dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm)); LASSERT(dlm != NULL); /* Check the room in the request buffer. */ max = lustre_msg_buflen(req->rq_reqmsg, off) - sizeof(struct ldlm_request); max /= sizeof(struct lustre_handle); max += LDLM_LOCKREQ_HANDLES; LASSERT(max >= dlm->lock_count + count); /* XXX: it would be better to pack lock handles grouped by resource. * so that the server cancel would call filter_lvbo_update() less * frequently. */ list_for_each_entry(lock, head, l_bl_ast) { if (!count--) break; LASSERT(lock->l_conn_export); /* Pack the lock handle to the given request buffer. */ LDLM_DEBUG(lock, "packing"); dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle; packed++; } CDEBUG(D_DLMTRACE, "%d locks packed\n", packed); EXIT;}/* Prepare and send a batched cancel rpc, it will include count lock handles * of locks given in @head. */int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels, int count){ struct ptlrpc_request *req = NULL; struct ldlm_request *body; int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(*body) }; struct obd_import *imp; int free, sent = 0; int rc = 0; ENTRY; LASSERT(exp != NULL); LASSERT(count > 0); OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, obd_fail_val); if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE)) RETURN(count); free = ldlm_req_handles_avail(exp, size, 2, 0); if (count > free) count = free; size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_CANCEL); while (1) { imp = class_exp2cliimp(exp); if (imp == NULL || imp->imp_invalid) { CDEBUG(D_DLMTRACE, "skipping cancel on invalid import %p\n", imp); RETURN(count); } req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); req->rq_no_resend = 1; req->rq_no_delay = 1; req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL; req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; ptlrpc_at_set_req_timeout(req); body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); ldlm_cancel_pack(req, DLM_LOCKREQ_OFF, cancels, count); ptlrpc_req_set_repsize(req, 1, NULL); rc = ptlrpc_queue_wait(req); if (rc == ESTALE) { CDEBUG(D_DLMTRACE, "client/server (nid %s) " "out of sync -- not fatal\n", libcfs_nid2str(req->rq_import-> imp_connection->c_peer.nid)); rc = 0; } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/ req->rq_import_generation == imp->imp_generation) { ptlrpc_req_finished(req); continue; } else if (rc != ELDLM_OK) { CERROR("Got rc %d from cancel RPC: canceling " "anyway\n", rc); break; } sent = count; break; } ptlrpc_req_finished(req); EXIT;out: return sent ? sent : rc;}static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?