filter.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,698 行 · 第 1/5 页

C
1,698
字号
        if (rc)                CERROR("error unlinking objid %.*s: rc %d\n",                       dchild->d_name.len, dchild->d_name.name, rc);        return(rc);}struct filter_intent_args {        struct ldlm_lock **victim;        __u64 size;        int *liblustre;};static enum interval_iter filter_intent_cb(struct interval_node *n,                                           void *args){        struct ldlm_interval *node = (struct ldlm_interval *)n;        struct filter_intent_args *arg = (struct filter_intent_args*)args;        __u64 size = arg->size;        struct ldlm_lock **v = arg->victim;        struct ldlm_lock *lck;        /* If the interval is lower than the current file size,         * just break. */        if (interval_high(n) <= size)                return INTERVAL_ITER_STOP;        list_for_each_entry(lck, &node->li_group, l_sl_policy) {                /* Don't send glimpse ASTs to liblustre clients.                 * They aren't listening for them, and they do                 * entirely synchronous I/O anyways. */                if (lck->l_export == NULL ||                    lck->l_export->exp_libclient == 1)                        continue;                if (*arg->liblustre)                        *arg->liblustre = 0;                if (*v == NULL) {                        *v = LDLM_LOCK_GET(lck);                } else if ((*v)->l_policy_data.l_extent.start <                           lck->l_policy_data.l_extent.start) {                        LDLM_LOCK_PUT(*v);                        *v = LDLM_LOCK_GET(lck);                }                /* the same policy group - every lock has the                 * same extent, so needn't do it any more */                break;        }        return INTERVAL_ITER_CONT;}static int filter_intent_policy(struct ldlm_namespace *ns,                                struct ldlm_lock **lockp, void *req_cookie,                                ldlm_mode_t mode, int flags, void *data){        struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);        struct ptlrpc_request *req = req_cookie;        struct ldlm_lock *lock = *lockp, *l = NULL;        struct ldlm_resource *res = lock->l_resource;        ldlm_processing_policy policy;        struct ost_lvb *res_lvb, *reply_lvb;        struct ldlm_reply *rep;        ldlm_error_t err;        int idx, rc, tmpflags = 0, only_liblustre = 1;        struct ldlm_interval_tree *tree;        struct filter_intent_args arg;        int repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),                           [DLM_LOCKREPLY_OFF]   = sizeof(*rep),                           [DLM_REPLY_REC_OFF]   = sizeof(*reply_lvb) };        ENTRY;        policy = ldlm_get_processing_policy(res);        LASSERT(policy != NULL);        LASSERT(req != NULL);        rc = lustre_pack_reply(req, 3, repsize, NULL);        if (rc)                RETURN(req->rq_status = rc);        rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep));        LASSERT(rep != NULL);        reply_lvb = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,                                   sizeof(*reply_lvb));        LASSERT(reply_lvb != NULL);        //fixup_handle_for_resent_req(req, lock, &lockh);        /* If we grant any lock at all, it will be a whole-file read lock.         * Call the extent policy function to see if our request can be         * granted, or is blocked.          * If the OST lock has LDLM_FL_HAS_INTENT set, it means a glimpse lock         */        lock->l_policy_data.l_extent.start = 0;        lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;        lock->l_req_mode = LCK_PR;        LASSERT(ns == res->lr_namespace);        lock_res(res);        rc = policy(lock, &tmpflags, 0, &err, &rpc_list);        check_res_locked(res);        /* FIXME: we should change the policy function slightly, to not make         * this list at all, since we just turn around and free it */        while (!list_empty(&rpc_list)) {                struct ldlm_lock *wlock =                        list_entry(rpc_list.next, struct ldlm_lock, l_cp_ast);                LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);                LASSERT(lock->l_flags & LDLM_FL_CP_REQD);                lock->l_flags &= ~LDLM_FL_CP_REQD;                list_del_init(&wlock->l_cp_ast);                LDLM_LOCK_PUT(wlock);        }        /* The lock met with no resistance; we're finished. */        if (rc == LDLM_ITER_CONTINUE) {                /* do not grant locks to the liblustre clients: they cannot                 * handle ASTs robustly.  We need to do this while still                 * holding ns_lock to avoid the lock remaining on the res_link                 * list (and potentially being added to l_pending_list by an                 * AST) when we are going to drop this lock ASAP. */                if (lock->l_export->exp_libclient ||                    OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2)) {                        ldlm_resource_unlink_lock(lock);                        err = ELDLM_LOCK_ABORTED;                } else {                        err = ELDLM_LOCK_REPLACED;                }                unlock_res(res);                RETURN(err);        }        /* Do not grant any lock, but instead send GL callbacks.  The extent         * policy nicely created a list of all PW locks for us.  We will choose         * the highest of those which are larger than the size in the LVB, if         * any, and perform a glimpse callback. */        res_lvb = res->lr_lvb_data;        LASSERT(res_lvb != NULL);        *reply_lvb = *res_lvb;        /*         * ->ns_lock guarantees that no new locks are granted, and,         * therefore, that res->lr_lvb_data cannot increase beyond the         * end of already granted lock. As a result, it is safe to         * check against "stale" reply_lvb->lvb_size value without         * res->lr_lvb_sem.         */        arg.size = reply_lvb->lvb_size;        arg.victim = &l;        arg.liblustre = &only_liblustre;        for (idx = 0; idx < LCK_MODE_NUM; idx++) {                tree = &res->lr_itree[idx];                if (tree->lit_mode == LCK_PR)                        continue;                interval_iterate_reverse(tree->lit_root,                                          filter_intent_cb, &arg);        }        unlock_res(res);        /* There were no PW locks beyond the size in the LVB; finished. */        if (l == NULL) {                if (only_liblustre) {                        /* If we discovered a liblustre client with a PW lock,                         * however, the LVB may be out of date!  The LVB is                         * updated only on glimpse (which we don't do for                         * liblustre clients) and cancel (which the client                         * obviously has not yet done).  So if it has written                         * data but kept the lock, the LVB is stale and needs                         * to be updated from disk.                         *                         * Of course, this will all disappear when we switch to                         * taking liblustre locks on the OST. */                        ldlm_res_lvbo_update(res, NULL, 0, 1);                }                RETURN(ELDLM_LOCK_ABORTED);        }        /*         * This check is for lock taken in filter_prepare_destroy() that does         * not have l_glimpse_ast set. So the logic is: if there is a lock         * with no l_glimpse_ast set, this object is being destroyed already.         *         * Hence, if you are grabbing DLM locks on the server, always set         * non-NULL glimpse_ast (e.g., ldlm_request.c:ldlm_glimpse_ast()).         */        if (l->l_glimpse_ast == NULL) {                /* We are racing with unlink(); just return -ENOENT */                rep->lock_policy_res1 = -ENOENT;                goto out;        }        LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);        rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */        /* Update the LVB from disk if the AST failed (this is a legal race) */        /*         * XXX nikita: situation when ldlm_server_glimpse_ast() failed before         * sending ast is not handled. This can result in lost client writes.         */        if (rc != 0)                ldlm_res_lvbo_update(res, NULL, 0, 1);        lock_res(res);        *reply_lvb = *res_lvb;        unlock_res(res); out:        LDLM_LOCK_PUT(l);        RETURN(ELDLM_LOCK_ABORTED);}/* * per-obd_device iobuf pool. * * To avoid memory deadlocks in low-memory setups, amount of dynamic * allocations in write-path has to be minimized (see bug 5137). * * Pages, niobuf_local's and niobuf_remote's are pre-allocated and attached to * OST threads (see ost_thread_{init,done}()). * * "iobuf's" used by filter cannot be attached to OST thread, however, because * at the OST layer there are only (potentially) multiple obd_device of type * unknown at the time of OST thread creation. * * Instead array of iobuf's is attached to struct filter_obd (->fo_iobuf_pool * field). This array has size OST_MAX_THREADS, so that each OST thread uses * it's very own iobuf. * * Functions below * *     filter_kiobuf_pool_init() * *     filter_kiobuf_pool_done() * *     filter_iobuf_get() * * operate on this array. They are "generic" in a sense that they don't depend * on actual type of iobuf's (the latter depending on Linux kernel version). *//* * destroy pool created by filter_iobuf_pool_init */static void filter_iobuf_pool_done(struct filter_obd *filter){        struct filter_iobuf **pool;        int i;        ENTRY;        pool = filter->fo_iobuf_pool;        if (pool != NULL) {                for (i = 0; i < filter->fo_iobuf_count; ++ i) {                        if (pool[i] != NULL)                                filter_free_iobuf(pool[i]);                }                OBD_FREE(pool, filter->fo_iobuf_count * sizeof pool[0]);                filter->fo_iobuf_pool = NULL;        }        EXIT;}/* * pre-allocate pool of iobuf's to be used by filter_{prep,commit}rw_write(). */static int filter_iobuf_pool_init(struct filter_obd *filter){        void **pool;        ENTRY;        OBD_ALLOC_GFP(filter->fo_iobuf_pool, OSS_THREADS_MAX * sizeof(*pool),                      GFP_KERNEL);        if (filter->fo_iobuf_pool == NULL)                RETURN(-ENOMEM);        filter->fo_iobuf_count = OSS_THREADS_MAX;        RETURN(0);}/* Return iobuf allocated for @thread_id.  We don't know in advance how * many threads there will be so we allocate a large empty array and only * fill in those slots that are actually in use. * If we haven't allocated a pool entry for this thread before, do so now. */void *filter_iobuf_get(struct filter_obd *filter, struct obd_trans_info *oti){        int thread_id                    = oti ? oti->oti_thread_id : -1;        struct filter_iobuf  *pool       = NULL;        struct filter_iobuf **pool_place = NULL;        if (thread_id >= 0) {                LASSERT(thread_id < filter->fo_iobuf_count);                pool = *(pool_place = &filter->fo_iobuf_pool[thread_id]);        }        if (unlikely(pool == NULL)) {                pool = filter_alloc_iobuf(filter, OBD_BRW_WRITE,                                          PTLRPC_MAX_BRW_PAGES);                if (pool_place != NULL)                        *pool_place = pool;        }        return pool;}/* mount the file system (secretly).  lustre_cfg parameters are: * 1 = device * 2 = fstype * 3 = flags: failover=f, failout=n * 4 = mount options */int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,                        void *option){        struct lustre_cfg* lcfg = buf;        struct filter_obd *filter = &obd->u.filter;        struct vfsmount *mnt;        struct lustre_mount_info *lmi;        struct obd_uuid uuid;        __u8 *uuid_ptr;        char *str, *label;        char ns_name[48];        int rc;        ENTRY;        if (lcfg->lcfg_bufcount < 3 ||            LUSTRE_CFG_BUFLEN(lcfg, 1) < 1 ||            LUSTRE_CFG_BUFLEN(lcfg, 2) < 1)                RETURN(-EINVAL);        lmi = server_get_mount(obd->obd_name);        if (lmi) {                /* We already mounted in lustre_fill_super.                   lcfg bufs 1, 2, 4 (devi

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?