client.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,808 行 · 第 1/5 页
C
1,808 行
out: OBD_FREE(msgcpy, oldlen); RETURN(rc);}void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool){ struct list_head *l, *tmp; struct ptlrpc_request *req; if (!pool) return; list_for_each_safe(l, tmp, &pool->prp_req_list) { req = list_entry(l, struct ptlrpc_request, rq_list); list_del(&req->rq_list); LASSERT (req->rq_reqmsg); OBD_FREE(req->rq_reqmsg, pool->prp_rq_size); OBD_FREE(req, sizeof(*req)); } OBD_FREE(pool, sizeof(*pool));}void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq){ int i; int size = 1; while (size < pool->prp_rq_size) size <<= 1; LASSERTF(list_empty(&pool->prp_req_list) || size == pool->prp_rq_size, "Trying to change pool size with nonempty pool " "from %d to %d bytes\n", pool->prp_rq_size, size); spin_lock(&pool->prp_lock); pool->prp_rq_size = size; for (i = 0; i < num_rq; i++) { struct ptlrpc_request *req; struct lustre_msg *msg; spin_unlock(&pool->prp_lock); OBD_ALLOC(req, sizeof(struct ptlrpc_request)); if (!req) return; OBD_ALLOC_GFP(msg, size, CFS_ALLOC_STD); if (!msg) { OBD_FREE(req, sizeof(struct ptlrpc_request)); return; } req->rq_reqmsg = msg; req->rq_pool = pool; spin_lock(&pool->prp_lock); list_add_tail(&req->rq_list, &pool->prp_req_list); } spin_unlock(&pool->prp_lock); return;}struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int num_rq, int msgsize, void (*populate_pool)(struct ptlrpc_request_pool *, int)){ struct ptlrpc_request_pool *pool; OBD_ALLOC(pool, sizeof (struct ptlrpc_request_pool)); if (!pool) return NULL; /* Request next power of two for the allocation, because internally kernel would do exactly this */ spin_lock_init(&pool->prp_lock); CFS_INIT_LIST_HEAD(&pool->prp_req_list); pool->prp_rq_size = msgsize; pool->prp_populate = populate_pool; populate_pool(pool, num_rq); if (list_empty(&pool->prp_req_list)) { /* have not allocated a single request for the pool */ OBD_FREE(pool, sizeof (struct ptlrpc_request_pool)); pool = NULL; } return pool;}static struct ptlrpc_request *ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool){ struct ptlrpc_request *request; struct lustre_msg *reqmsg; if (!pool) return NULL; spin_lock(&pool->prp_lock); /* See if we have anything in a pool, and bail out if nothing, * in writeout path, where this matters, this is safe to do, because * nothing is lost in this case, and when some in-flight requests * complete, this code will be called again. */ if (unlikely(list_empty(&pool->prp_req_list))) { spin_unlock(&pool->prp_lock); return NULL; } request = list_entry(pool->prp_req_list.next, struct ptlrpc_request, rq_list); list_del(&request->rq_list); spin_unlock(&pool->prp_lock); LASSERT(request->rq_reqmsg); LASSERT(request->rq_pool); reqmsg = request->rq_reqmsg; memset(request, 0, sizeof(*request)); request->rq_reqmsg = reqmsg; request->rq_pool = pool; request->rq_reqlen = pool->prp_rq_size; return request;}struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode, int count, int *lengths, char **bufs, struct ptlrpc_request_pool *pool){ struct ptlrpc_request *request = NULL; int rc; ENTRY; /* The obd disconnected */ if (imp == NULL) return NULL; LASSERT(imp != LP_POISON); LASSERT((unsigned long)imp->imp_client > 0x1000); LASSERT(imp->imp_client != LP_POISON); if (pool) request = ptlrpc_prep_req_from_pool(pool); if (!request) OBD_ALLOC(request, sizeof(*request)); if (!request) { CERROR("request allocation out of memory\n"); RETURN(NULL); } rc = lustre_pack_request(request, imp->imp_msg_magic, count, lengths, bufs); if (rc) { LASSERT(!request->rq_pool); OBD_FREE(request, sizeof(*request)); RETURN(NULL); } lustre_msg_add_version(request->rq_reqmsg, version); request->rq_send_state = LUSTRE_IMP_FULL; request->rq_type = PTL_RPC_MSG_REQUEST; request->rq_import = class_import_get(imp); request->rq_export = NULL; request->rq_req_cbid.cbid_fn = request_out_callback; request->rq_req_cbid.cbid_arg = request; request->rq_reply_cbid.cbid_fn = reply_in_callback; request->rq_reply_cbid.cbid_arg = request; request->rq_phase = RQ_PHASE_NEW; request->rq_request_portal = imp->imp_client->cli_request_portal; request->rq_reply_portal = imp->imp_client->cli_reply_portal; ptlrpc_at_set_req_timeout(request); spin_lock_init(&request->rq_lock); CFS_INIT_LIST_HEAD(&request->rq_list); CFS_INIT_LIST_HEAD(&request->rq_replay_list); CFS_INIT_LIST_HEAD(&request->rq_set_chain); CFS_INIT_LIST_HEAD(&request->rq_history_list); cfs_waitq_init(&request->rq_reply_waitq); request->rq_xid = ptlrpc_next_xid(); atomic_set(&request->rq_refcount, 1); lustre_msg_set_opc(request->rq_reqmsg, opcode); RETURN(request);}struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count, int *lengths, char **bufs){ return ptlrpc_prep_req_pool(imp, version, opcode, count, lengths, bufs, NULL);}struct ptlrpc_request_set *ptlrpc_prep_set(void){ struct ptlrpc_request_set *set; ENTRY; OBD_ALLOC(set, sizeof *set); if (!set) RETURN(NULL); CFS_INIT_LIST_HEAD(&set->set_requests); cfs_waitq_init(&set->set_waitq); set->set_remaining = 0; spin_lock_init(&set->set_new_req_lock); CFS_INIT_LIST_HEAD(&set->set_new_requests); CFS_INIT_LIST_HEAD(&set->set_cblist); RETURN(set);}/* Finish with this set; opposite of prep_set. */void ptlrpc_set_destroy(struct ptlrpc_request_set *set){ struct list_head *tmp; struct list_head *next; int expected_phase; int n = 0; ENTRY; /* Requests on the set should either all be completed, or all be new */ expected_phase = (set->set_remaining == 0) ? RQ_PHASE_COMPLETE : RQ_PHASE_NEW; list_for_each (tmp, &set->set_requests) { struct ptlrpc_request *req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); LASSERT(req->rq_phase == expected_phase); n++; } LASSERT(set->set_remaining == 0 || set->set_remaining == n); list_for_each_safe(tmp, next, &set->set_requests) { struct ptlrpc_request *req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); list_del_init(&req->rq_set_chain); LASSERT(req->rq_phase == expected_phase); if (req->rq_phase == RQ_PHASE_NEW) { if (req->rq_interpret_reply != NULL) { int (*interpreter)(struct ptlrpc_request *, void *, int) = req->rq_interpret_reply; /* higher level (i.e. LOV) failed; * let the sub reqs clean up */ req->rq_status = -EBADR; interpreter(req, &req->rq_async_args, req->rq_status); } set->set_remaining--; } req->rq_set = NULL; ptlrpc_req_finished (req); } LASSERT(set->set_remaining == 0); OBD_FREE(set, sizeof(*set)); EXIT;}int ptlrpc_set_add_cb(struct ptlrpc_request_set *set, set_interpreter_func fn, void *data){ struct ptlrpc_set_cbdata *cbdata; OBD_SLAB_ALLOC(cbdata, ptlrpc_cbdata_slab, CFS_ALLOC_STD, sizeof(*cbdata)); if (cbdata == NULL) RETURN(-ENOMEM); cbdata->psc_interpret = fn; cbdata->psc_data = data; list_add_tail(&cbdata->psc_item, &set->set_cblist); RETURN(0);}void ptlrpc_set_add_req(struct ptlrpc_request_set *set, struct ptlrpc_request *req){ /* The set takes over the caller's request reference */ list_add_tail(&req->rq_set_chain, &set->set_requests); req->rq_set = set; set->set_remaining++; atomic_inc(&req->rq_import->imp_inflight);}/* lock so many callers can add things, the context that owns the set * is supposed to notice these and move them into the set proper. */void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set, struct ptlrpc_request *req){ spin_lock(&set->set_new_req_lock); /* The set takes over the caller's request reference */ list_add_tail(&req->rq_set_chain, &set->set_new_requests); req->rq_set = set; spin_unlock(&set->set_new_req_lock);}/* * Based on the current state of the import, determine if the request * can be sent, is an error, or should be delayed. * * Returns true if this request should be delayed. If false, and * *status is set, then the request can not be sent and *status is the * error code. If false and status is 0, then request can be sent. * * The imp->imp_lock must be held. */static int ptlrpc_import_delay_req(struct obd_import *imp, struct ptlrpc_request *req, int *status){ int delay = 0; ENTRY; LASSERT (status != NULL); *status = 0; if (imp->imp_state == LUSTRE_IMP_NEW) { DEBUG_REQ(D_ERROR, req, "Uninitialized import."); *status = -EIO; LBUG(); } else if (imp->imp_state == LUSTRE_IMP_CLOSED) { DEBUG_REQ(D_ERROR, req, "IMP_CLOSED "); *status = -EIO; } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING && imp->imp_state == LUSTRE_IMP_CONNECTING) { /* allow CONNECT even if import is invalid */ ; if (atomic_read(&imp->imp_inval_count) != 0) { DEBUG_REQ(D_ERROR, req, "invalidate in flight"); *status = -EIO; } } else if ((imp->imp_invalid && (!imp->imp_recon_bk)) || imp->imp_obd->obd_no_recov) { /* If the import has been invalidated (such as by an OST * failure), and if the import(MGC) tried all of its connection * list (Bug 13464), the request must fail with -ESHUTDOWN. * This indicates the requests should be discarded; an -EIO * may result in a resend of the request. */ if (!imp->imp_deactive) DEBUG_REQ(D_ERROR, req, "IMP_INVALID"); *status = -ESHUTDOWN; /* bz 12940 */ } else if (req->rq_import_generation != imp->imp_generation) { DEBUG_REQ(D_ERROR, req, "req wrong generation:"); *status = -EIO; } else if (req->rq_send_state != imp->imp_state) { /* invalidate in progress - any requests should be drop */ if (atomic_read(&imp->imp_inval_count) != 0) { DEBUG_REQ(D_ERROR, req, "invalidate in flight");
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?