filter.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,698 行 · 第 1/5 页
C
1,698 行
if (rc) CERROR("error unlinking objid %.*s: rc %d\n", dchild->d_name.len, dchild->d_name.name, rc); return(rc);}struct filter_intent_args { struct ldlm_lock **victim; __u64 size; int *liblustre;};static enum interval_iter filter_intent_cb(struct interval_node *n, void *args){ struct ldlm_interval *node = (struct ldlm_interval *)n; struct filter_intent_args *arg = (struct filter_intent_args*)args; __u64 size = arg->size; struct ldlm_lock **v = arg->victim; struct ldlm_lock *lck; /* If the interval is lower than the current file size, * just break. */ if (interval_high(n) <= size) return INTERVAL_ITER_STOP; list_for_each_entry(lck, &node->li_group, l_sl_policy) { /* Don't send glimpse ASTs to liblustre clients. * They aren't listening for them, and they do * entirely synchronous I/O anyways. */ if (lck->l_export == NULL || lck->l_export->exp_libclient == 1) continue; if (*arg->liblustre) *arg->liblustre = 0; if (*v == NULL) { *v = LDLM_LOCK_GET(lck); } else if ((*v)->l_policy_data.l_extent.start < lck->l_policy_data.l_extent.start) { LDLM_LOCK_PUT(*v); *v = LDLM_LOCK_GET(lck); } /* the same policy group - every lock has the * same extent, so needn't do it any more */ break; } return INTERVAL_ITER_CONT;}static int filter_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, void *req_cookie, ldlm_mode_t mode, int flags, void *data){ struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); struct ptlrpc_request *req = req_cookie; struct ldlm_lock *lock = *lockp, *l = NULL; struct ldlm_resource *res = lock->l_resource; ldlm_processing_policy policy; struct ost_lvb *res_lvb, *reply_lvb; struct ldlm_reply *rep; ldlm_error_t err; int idx, rc, tmpflags = 0, only_liblustre = 1; struct ldlm_interval_tree *tree; struct filter_intent_args arg; int repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREPLY_OFF] = sizeof(*rep), [DLM_REPLY_REC_OFF] = sizeof(*reply_lvb) }; ENTRY; policy = ldlm_get_processing_policy(res); LASSERT(policy != NULL); LASSERT(req != NULL); rc = lustre_pack_reply(req, 3, repsize, NULL); if (rc) RETURN(req->rq_status = rc); rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep)); LASSERT(rep != NULL); reply_lvb = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*reply_lvb)); LASSERT(reply_lvb != NULL); //fixup_handle_for_resent_req(req, lock, &lockh); /* If we grant any lock at all, it will be a whole-file read lock. * Call the extent policy function to see if our request can be * granted, or is blocked. * If the OST lock has LDLM_FL_HAS_INTENT set, it means a glimpse lock */ lock->l_policy_data.l_extent.start = 0; lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF; lock->l_req_mode = LCK_PR; LASSERT(ns == res->lr_namespace); lock_res(res); rc = policy(lock, &tmpflags, 0, &err, &rpc_list); check_res_locked(res); /* FIXME: we should change the policy function slightly, to not make * this list at all, since we just turn around and free it */ while (!list_empty(&rpc_list)) { struct ldlm_lock *wlock = list_entry(rpc_list.next, struct ldlm_lock, l_cp_ast); LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0); LASSERT(lock->l_flags & LDLM_FL_CP_REQD); lock->l_flags &= ~LDLM_FL_CP_REQD; list_del_init(&wlock->l_cp_ast); LDLM_LOCK_PUT(wlock); } /* The lock met with no resistance; we're finished. */ if (rc == LDLM_ITER_CONTINUE) { /* do not grant locks to the liblustre clients: they cannot * handle ASTs robustly. We need to do this while still * holding ns_lock to avoid the lock remaining on the res_link * list (and potentially being added to l_pending_list by an * AST) when we are going to drop this lock ASAP. */ if (lock->l_export->exp_libclient || OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2)) { ldlm_resource_unlink_lock(lock); err = ELDLM_LOCK_ABORTED; } else { err = ELDLM_LOCK_REPLACED; } unlock_res(res); RETURN(err); } /* Do not grant any lock, but instead send GL callbacks. The extent * policy nicely created a list of all PW locks for us. We will choose * the highest of those which are larger than the size in the LVB, if * any, and perform a glimpse callback. */ res_lvb = res->lr_lvb_data; LASSERT(res_lvb != NULL); *reply_lvb = *res_lvb; /* * ->ns_lock guarantees that no new locks are granted, and, * therefore, that res->lr_lvb_data cannot increase beyond the * end of already granted lock. As a result, it is safe to * check against "stale" reply_lvb->lvb_size value without * res->lr_lvb_sem. */ arg.size = reply_lvb->lvb_size; arg.victim = &l; arg.liblustre = &only_liblustre; for (idx = 0; idx < LCK_MODE_NUM; idx++) { tree = &res->lr_itree[idx]; if (tree->lit_mode == LCK_PR) continue; interval_iterate_reverse(tree->lit_root, filter_intent_cb, &arg); } unlock_res(res); /* There were no PW locks beyond the size in the LVB; finished. */ if (l == NULL) { if (only_liblustre) { /* If we discovered a liblustre client with a PW lock, * however, the LVB may be out of date! The LVB is * updated only on glimpse (which we don't do for * liblustre clients) and cancel (which the client * obviously has not yet done). So if it has written * data but kept the lock, the LVB is stale and needs * to be updated from disk. * * Of course, this will all disappear when we switch to * taking liblustre locks on the OST. */ ldlm_res_lvbo_update(res, NULL, 0, 1); } RETURN(ELDLM_LOCK_ABORTED); } /* * This check is for lock taken in filter_prepare_destroy() that does * not have l_glimpse_ast set. So the logic is: if there is a lock * with no l_glimpse_ast set, this object is being destroyed already. * * Hence, if you are grabbing DLM locks on the server, always set * non-NULL glimpse_ast (e.g., ldlm_request.c:ldlm_glimpse_ast()). */ if (l->l_glimpse_ast == NULL) { /* We are racing with unlink(); just return -ENOENT */ rep->lock_policy_res1 = -ENOENT; goto out; } LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l); rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */ /* Update the LVB from disk if the AST failed (this is a legal race) */ /* * XXX nikita: situation when ldlm_server_glimpse_ast() failed before * sending ast is not handled. This can result in lost client writes. */ if (rc != 0) ldlm_res_lvbo_update(res, NULL, 0, 1); lock_res(res); *reply_lvb = *res_lvb; unlock_res(res); out: LDLM_LOCK_PUT(l); RETURN(ELDLM_LOCK_ABORTED);}/* * per-obd_device iobuf pool. * * To avoid memory deadlocks in low-memory setups, amount of dynamic * allocations in write-path has to be minimized (see bug 5137). * * Pages, niobuf_local's and niobuf_remote's are pre-allocated and attached to * OST threads (see ost_thread_{init,done}()). * * "iobuf's" used by filter cannot be attached to OST thread, however, because * at the OST layer there are only (potentially) multiple obd_device of type * unknown at the time of OST thread creation. * * Instead array of iobuf's is attached to struct filter_obd (->fo_iobuf_pool * field). This array has size OST_MAX_THREADS, so that each OST thread uses * it's very own iobuf. * * Functions below * * filter_kiobuf_pool_init() * * filter_kiobuf_pool_done() * * filter_iobuf_get() * * operate on this array. They are "generic" in a sense that they don't depend * on actual type of iobuf's (the latter depending on Linux kernel version). *//* * destroy pool created by filter_iobuf_pool_init */static void filter_iobuf_pool_done(struct filter_obd *filter){ struct filter_iobuf **pool; int i; ENTRY; pool = filter->fo_iobuf_pool; if (pool != NULL) { for (i = 0; i < filter->fo_iobuf_count; ++ i) { if (pool[i] != NULL) filter_free_iobuf(pool[i]); } OBD_FREE(pool, filter->fo_iobuf_count * sizeof pool[0]); filter->fo_iobuf_pool = NULL; } EXIT;}/* * pre-allocate pool of iobuf's to be used by filter_{prep,commit}rw_write(). */static int filter_iobuf_pool_init(struct filter_obd *filter){ void **pool; ENTRY; OBD_ALLOC_GFP(filter->fo_iobuf_pool, OSS_THREADS_MAX * sizeof(*pool), GFP_KERNEL); if (filter->fo_iobuf_pool == NULL) RETURN(-ENOMEM); filter->fo_iobuf_count = OSS_THREADS_MAX; RETURN(0);}/* Return iobuf allocated for @thread_id. We don't know in advance how * many threads there will be so we allocate a large empty array and only * fill in those slots that are actually in use. * If we haven't allocated a pool entry for this thread before, do so now. */void *filter_iobuf_get(struct filter_obd *filter, struct obd_trans_info *oti){ int thread_id = oti ? oti->oti_thread_id : -1; struct filter_iobuf *pool = NULL; struct filter_iobuf **pool_place = NULL; if (thread_id >= 0) { LASSERT(thread_id < filter->fo_iobuf_count); pool = *(pool_place = &filter->fo_iobuf_pool[thread_id]); } if (unlikely(pool == NULL)) { pool = filter_alloc_iobuf(filter, OBD_BRW_WRITE, PTLRPC_MAX_BRW_PAGES); if (pool_place != NULL) *pool_place = pool; } return pool;}/* mount the file system (secretly). lustre_cfg parameters are: * 1 = device * 2 = fstype * 3 = flags: failover=f, failout=n * 4 = mount options */int filter_common_setup(struct obd_device *obd, obd_count len, void *buf, void *option){ struct lustre_cfg* lcfg = buf; struct filter_obd *filter = &obd->u.filter; struct vfsmount *mnt; struct lustre_mount_info *lmi; struct obd_uuid uuid; __u8 *uuid_ptr; char *str, *label; char ns_name[48]; int rc; ENTRY; if (lcfg->lcfg_bufcount < 3 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1 || LUSTRE_CFG_BUFLEN(lcfg, 2) < 1) RETURN(-EINVAL); lmi = server_get_mount(obd->obd_name); if (lmi) { /* We already mounted in lustre_fill_super. lcfg bufs 1, 2, 4 (devi
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?