client.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,808 行 · 第 1/5 页

C
1,808
字号
        out:        OBD_FREE(msgcpy, oldlen);        RETURN(rc);}void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool){        struct list_head *l, *tmp;        struct ptlrpc_request *req;        if (!pool)                return;        list_for_each_safe(l, tmp, &pool->prp_req_list) {                req = list_entry(l, struct ptlrpc_request, rq_list);                list_del(&req->rq_list);                LASSERT (req->rq_reqmsg);                OBD_FREE(req->rq_reqmsg, pool->prp_rq_size);                OBD_FREE(req, sizeof(*req));        }        OBD_FREE(pool, sizeof(*pool));}void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq){        int i;        int size = 1;        while (size < pool->prp_rq_size)                size <<= 1;        LASSERTF(list_empty(&pool->prp_req_list) || size == pool->prp_rq_size,                 "Trying to change pool size with nonempty pool "                 "from %d to %d bytes\n", pool->prp_rq_size, size);        spin_lock(&pool->prp_lock);        pool->prp_rq_size = size;        for (i = 0; i < num_rq; i++) {                struct ptlrpc_request *req;                struct lustre_msg *msg;                spin_unlock(&pool->prp_lock);                OBD_ALLOC(req, sizeof(struct ptlrpc_request));                if (!req)                        return;                OBD_ALLOC_GFP(msg, size, CFS_ALLOC_STD);                if (!msg) {                        OBD_FREE(req, sizeof(struct ptlrpc_request));                        return;                }                req->rq_reqmsg = msg;                req->rq_pool = pool;                spin_lock(&pool->prp_lock);                list_add_tail(&req->rq_list, &pool->prp_req_list);        }        spin_unlock(&pool->prp_lock);        return;}struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int num_rq, int msgsize,                                                void (*populate_pool)(struct ptlrpc_request_pool *, int)){        struct ptlrpc_request_pool *pool;        OBD_ALLOC(pool, sizeof (struct ptlrpc_request_pool));        if (!pool)                return NULL;        /* Request next power of two for the allocation, because internally           kernel would do exactly this */        spin_lock_init(&pool->prp_lock);        CFS_INIT_LIST_HEAD(&pool->prp_req_list);        pool->prp_rq_size = msgsize;        pool->prp_populate = populate_pool;        populate_pool(pool, num_rq);        if (list_empty(&pool->prp_req_list)) {                /* have not allocated a single request for the pool */                OBD_FREE(pool, sizeof (struct ptlrpc_request_pool));                pool = NULL;        }        return pool;}static struct ptlrpc_request *ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool){        struct ptlrpc_request *request;        struct lustre_msg *reqmsg;        if (!pool)                return NULL;        spin_lock(&pool->prp_lock);        /* See if we have anything in a pool, and bail out if nothing,         * in writeout path, where this matters, this is safe to do, because         * nothing is lost in this case, and when some in-flight requests         * complete, this code will be called again. */        if (unlikely(list_empty(&pool->prp_req_list))) {                spin_unlock(&pool->prp_lock);                return NULL;        }        request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,                             rq_list);        list_del(&request->rq_list);        spin_unlock(&pool->prp_lock);        LASSERT(request->rq_reqmsg);        LASSERT(request->rq_pool);        reqmsg = request->rq_reqmsg;        memset(request, 0, sizeof(*request));        request->rq_reqmsg = reqmsg;        request->rq_pool = pool;        request->rq_reqlen = pool->prp_rq_size;        return request;}struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode,                     int count, int *lengths, char **bufs,                     struct ptlrpc_request_pool *pool){        struct ptlrpc_request *request = NULL;        int rc;        ENTRY;        /* The obd disconnected */        if (imp == NULL)                return NULL;        LASSERT(imp != LP_POISON);        LASSERT((unsigned long)imp->imp_client > 0x1000);        LASSERT(imp->imp_client != LP_POISON);        if (pool)                request = ptlrpc_prep_req_from_pool(pool);        if (!request)                OBD_ALLOC(request, sizeof(*request));        if (!request) {                CERROR("request allocation out of memory\n");                RETURN(NULL);        }        rc = lustre_pack_request(request, imp->imp_msg_magic, count, lengths,                                 bufs);        if (rc) {                LASSERT(!request->rq_pool);                OBD_FREE(request, sizeof(*request));                RETURN(NULL);        }        lustre_msg_add_version(request->rq_reqmsg, version);        request->rq_send_state = LUSTRE_IMP_FULL;        request->rq_type = PTL_RPC_MSG_REQUEST;        request->rq_import = class_import_get(imp);        request->rq_export = NULL;        request->rq_req_cbid.cbid_fn  = request_out_callback;        request->rq_req_cbid.cbid_arg = request;        request->rq_reply_cbid.cbid_fn  = reply_in_callback;        request->rq_reply_cbid.cbid_arg = request;        request->rq_phase = RQ_PHASE_NEW;        request->rq_request_portal = imp->imp_client->cli_request_portal;        request->rq_reply_portal = imp->imp_client->cli_reply_portal;                ptlrpc_at_set_req_timeout(request);        spin_lock_init(&request->rq_lock);        CFS_INIT_LIST_HEAD(&request->rq_list);        CFS_INIT_LIST_HEAD(&request->rq_replay_list);        CFS_INIT_LIST_HEAD(&request->rq_set_chain);        CFS_INIT_LIST_HEAD(&request->rq_history_list);        cfs_waitq_init(&request->rq_reply_waitq);        request->rq_xid = ptlrpc_next_xid();        atomic_set(&request->rq_refcount, 1);        lustre_msg_set_opc(request->rq_reqmsg, opcode);        RETURN(request);}struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count,                int *lengths, char **bufs){        return ptlrpc_prep_req_pool(imp, version, opcode, count, lengths, bufs,                                    NULL);}struct ptlrpc_request_set *ptlrpc_prep_set(void){        struct ptlrpc_request_set *set;        ENTRY;        OBD_ALLOC(set, sizeof *set);        if (!set)                RETURN(NULL);        CFS_INIT_LIST_HEAD(&set->set_requests);        cfs_waitq_init(&set->set_waitq);        set->set_remaining = 0;        spin_lock_init(&set->set_new_req_lock);        CFS_INIT_LIST_HEAD(&set->set_new_requests);        CFS_INIT_LIST_HEAD(&set->set_cblist);        RETURN(set);}/* Finish with this set; opposite of prep_set. */void ptlrpc_set_destroy(struct ptlrpc_request_set *set){        struct list_head *tmp;        struct list_head *next;        int               expected_phase;        int               n = 0;        ENTRY;        /* Requests on the set should either all be completed, or all be new */        expected_phase = (set->set_remaining == 0) ?                         RQ_PHASE_COMPLETE : RQ_PHASE_NEW;        list_for_each (tmp, &set->set_requests) {                struct ptlrpc_request *req =                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);                LASSERT(req->rq_phase == expected_phase);                n++;        }        LASSERT(set->set_remaining == 0 || set->set_remaining == n);        list_for_each_safe(tmp, next, &set->set_requests) {                struct ptlrpc_request *req =                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);                list_del_init(&req->rq_set_chain);                LASSERT(req->rq_phase == expected_phase);                if (req->rq_phase == RQ_PHASE_NEW) {                        if (req->rq_interpret_reply != NULL) {                                int (*interpreter)(struct ptlrpc_request *,                                                   void *, int) =                                        req->rq_interpret_reply;                                /* higher level (i.e. LOV) failed;                                 * let the sub reqs clean up */                                req->rq_status = -EBADR;                                interpreter(req, &req->rq_async_args,                                            req->rq_status);                        }                        set->set_remaining--;                }                req->rq_set = NULL;                ptlrpc_req_finished (req);        }        LASSERT(set->set_remaining == 0);        OBD_FREE(set, sizeof(*set));        EXIT;}int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,                      set_interpreter_func fn, void *data){        struct ptlrpc_set_cbdata *cbdata;        OBD_SLAB_ALLOC(cbdata, ptlrpc_cbdata_slab,                         CFS_ALLOC_STD, sizeof(*cbdata));        if (cbdata == NULL)                RETURN(-ENOMEM);        cbdata->psc_interpret = fn;        cbdata->psc_data = data;        list_add_tail(&cbdata->psc_item, &set->set_cblist);                RETURN(0);}void ptlrpc_set_add_req(struct ptlrpc_request_set *set,                        struct ptlrpc_request *req){        /* The set takes over the caller's request reference */        list_add_tail(&req->rq_set_chain, &set->set_requests);        req->rq_set = set;        set->set_remaining++;        atomic_inc(&req->rq_import->imp_inflight);}/* lock so many callers can add things, the context that owns the set * is supposed to notice these and move them into the set proper. */void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,                            struct ptlrpc_request *req){        spin_lock(&set->set_new_req_lock);        /* The set takes over the caller's request reference */        list_add_tail(&req->rq_set_chain, &set->set_new_requests);        req->rq_set = set;        spin_unlock(&set->set_new_req_lock);}/* * Based on the current state of the import, determine if the request * can be sent, is an error, or should be delayed. * * Returns true if this request should be delayed. If false, and * *status is set, then the request can not be sent and *status is the * error code.  If false and status is 0, then request can be sent. * * The imp->imp_lock must be held. */static int ptlrpc_import_delay_req(struct obd_import *imp,                                   struct ptlrpc_request *req, int *status){        int delay = 0;        ENTRY;        LASSERT (status != NULL);        *status = 0;        if (imp->imp_state == LUSTRE_IMP_NEW) {                DEBUG_REQ(D_ERROR, req, "Uninitialized import.");                *status = -EIO;                LBUG();        } else if (imp->imp_state == LUSTRE_IMP_CLOSED) {                DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");                *status = -EIO;        } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&                 imp->imp_state == LUSTRE_IMP_CONNECTING) {                /* allow CONNECT even if import is invalid */ ;                if (atomic_read(&imp->imp_inval_count) != 0) {                        DEBUG_REQ(D_ERROR, req, "invalidate in flight");                        *status = -EIO;                }        } else if ((imp->imp_invalid && (!imp->imp_recon_bk)) ||                                         imp->imp_obd->obd_no_recov) {                /* If the import has been invalidated (such as by an OST                 * failure), and if the import(MGC) tried all of its connection                 * list (Bug 13464), the request must fail with -ESHUTDOWN.                 * This indicates the requests should be discarded; an -EIO                 * may result in a resend of the request. */                if (!imp->imp_deactive)                        DEBUG_REQ(D_ERROR, req, "IMP_INVALID");                *status = -ESHUTDOWN; /* bz 12940 */        } else if (req->rq_import_generation != imp->imp_generation) {                DEBUG_REQ(D_ERROR, req, "req wrong generation:");                *status = -EIO;        } else if (req->rq_send_state != imp->imp_state) {                /* invalidate in progress - any requests should be drop */                if (atomic_read(&imp->imp_inval_count) != 0) {                        DEBUG_REQ(D_ERROR, req, "invalidate in flight");

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?