📄 transport.c
字号:
goto out2; /* * Allocate pre-registered send and receive buffers for headers and * any inline data. Also specify any padding which will be provided * from a preregistered zero buffer. */ rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia, &new_xprt->rx_data); if (rc) goto out3; /* * Register a callback for connection events. This is necessary because * connection loss notification is async. We also catch connection loss * when reaping receives. */ INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker); new_ep->rep_func = rpcrdma_conn_func; new_ep->rep_xprt = xprt; xprt_rdma_format_addresses(xprt); if (!try_module_get(THIS_MODULE)) goto out4; return xprt;out4: xprt_rdma_free_addresses(xprt); rc = -EINVAL;out3: (void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);out2: rpcrdma_ia_close(&new_xprt->rx_ia);out1: kfree(xprt->slot); kfree(xprt); return ERR_PTR(rc);}/* * Close a connection, during shutdown or timeout/reconnect */static voidxprt_rdma_close(struct rpc_xprt *xprt){ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); dprintk("RPC: %s: closing\n", __func__); xprt_disconnect(xprt); (void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);}static voidxprt_rdma_set_port(struct rpc_xprt *xprt, u16 port){ struct sockaddr_in *sap; sap = (struct sockaddr_in *)&xprt->addr; sap->sin_port = htons(port); sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr; sap->sin_port = htons(port); dprintk("RPC: %s: %u\n", __func__, port);}static voidxprt_rdma_connect(struct rpc_task *task){ struct rpc_xprt *xprt = (struct rpc_xprt *)task->tk_xprt; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); if (!xprt_test_and_set_connecting(xprt)) { if (r_xprt->rx_ep.rep_connected != 0) { /* Reconnect */ schedule_delayed_work(&r_xprt->rdma_connect, xprt->reestablish_timeout); } else { schedule_delayed_work(&r_xprt->rdma_connect, 0); if (!RPC_IS_ASYNC(task)) flush_scheduled_work(); } }}static intxprt_rdma_reserve_xprt(struct rpc_task *task){ struct rpc_xprt *xprt = task->tk_xprt; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); int credits = atomic_read(&r_xprt->rx_buf.rb_credits); /* == RPC_CWNDSCALE @ init, but *after* setup */ if (r_xprt->rx_buf.rb_cwndscale == 0UL) { r_xprt->rx_buf.rb_cwndscale = xprt->cwnd; dprintk("RPC: %s: cwndscale %lu\n", __func__, r_xprt->rx_buf.rb_cwndscale); BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0); } xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale; return xprt_reserve_xprt_cong(task);}/* * The RDMA allocate/free functions need the task structure as a place * to hide the struct rpcrdma_req, which is necessary for the actual send/recv * sequence. For this reason, the recv buffers are attached to send * buffers for portions of the RPC. Note that the RPC layer allocates * both send and receive buffers in the same call. We may register * the receive buffer portion when using reply chunks. */static void *xprt_rdma_allocate(struct rpc_task *task, size_t size){ struct rpc_xprt *xprt = task->tk_xprt; struct rpcrdma_req *req, *nreq; req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf); BUG_ON(NULL == req); if (size > req->rl_size) { dprintk("RPC: %s: size %zd too large for buffer[%zd]: " "prog %d vers %d proc %d\n", __func__, size, req->rl_size, task->tk_client->cl_prog, task->tk_client->cl_vers, task->tk_msg.rpc_proc->p_proc); /* * Outgoing length shortage. Our inline write max must have * been configured to perform direct i/o. * * This is therefore a large metadata operation, and the * allocate call was made on the maximum possible message, * e.g. containing long filename(s) or symlink data. In * fact, while these metadata operations *might* carry * large outgoing payloads, they rarely *do*. However, we * have to commit to the request here, so reallocate and * register it now. The data path will never require this * reallocation. * * If the allocation or registration fails, the RPC framework * will (doggedly) retry. */ if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS) { /* forced to "pure inline" */ dprintk("RPC: %s: too much data (%zd) for inline " "(r/w max %d/%d)\n", __func__, size, rpcx_to_rdmad(xprt).inline_rsize, rpcx_to_rdmad(xprt).inline_wsize); size = req->rl_size; rpc_exit(task, -EIO); /* fail the operation */ rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++; goto out; } if (task->tk_flags & RPC_TASK_SWAPPER) nreq = kmalloc(sizeof *req + size, GFP_ATOMIC); else nreq = kmalloc(sizeof *req + size, GFP_NOFS); if (nreq == NULL) goto outfail; if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia, nreq->rl_base, size + sizeof(struct rpcrdma_req) - offsetof(struct rpcrdma_req, rl_base), &nreq->rl_handle, &nreq->rl_iov)) { kfree(nreq); goto outfail; } rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size; nreq->rl_size = size; nreq->rl_niovs = 0; nreq->rl_nchunks = 0; nreq->rl_buffer = (struct rpcrdma_buffer *)req; nreq->rl_reply = req->rl_reply; memcpy(nreq->rl_segments, req->rl_segments, sizeof nreq->rl_segments); /* flag the swap with an unused field */ nreq->rl_iov.length = 0; req->rl_reply = NULL; req = nreq; } dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);out: return req->rl_xdr_buf;outfail: rpcrdma_buffer_put(req); rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++; return NULL;}/* * This function returns all RDMA resources to the pool. */static voidxprt_rdma_free(void *buffer){ struct rpcrdma_req *req; struct rpcrdma_xprt *r_xprt; struct rpcrdma_rep *rep; int i; if (buffer == NULL) return; req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]); r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf); rep = req->rl_reply; dprintk("RPC: %s: called on 0x%p%s\n", __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : ""); /* * Finish the deregistration. When using mw bind, this was * begun in rpcrdma_reply_handler(). In all other modes, we * do it here, in thread context. The process is considered * complete when the rr_func vector becomes NULL - this * was put in place during rpcrdma_reply_handler() - the wait * call below will not block if the dereg is "done". If * interrupted, our framework will clean up. */ for (i = 0; req->rl_nchunks;) { --req->rl_nchunks; i += rpcrdma_deregister_external( &req->rl_segments[i], r_xprt, NULL); } if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) { rep->rr_func = NULL; /* abandon the callback */ req->rl_reply = NULL; } if (req->rl_iov.length == 0) { /* see allocate above */ struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer; oreq->rl_reply = req->rl_reply; (void) rpcrdma_deregister_internal(&r_xprt->rx_ia, req->rl_handle, &req->rl_iov); kfree(req); req = oreq; } /* Put back request+reply buffers */ rpcrdma_buffer_put(req);}/* * send_request invokes the meat of RPC RDMA. It must do the following: * 1. Marshal the RPC request into an RPC RDMA request, which means * putting a header in front of data, and creating IOVs for RDMA * from those in the request. * 2. In marshaling, detect opportunities for RDMA, and use them. * 3. Post a recv message to set up asynch completion, then send * the request (rpcrdma_ep_post). * 4. No partial sends are possible in the RPC-RDMA protocol (as in UDP). */static intxprt_rdma_send_request(struct rpc_task *task){ struct rpc_rqst *rqst = task->tk_rqstp; struct rpc_xprt *xprt = task->tk_xprt; struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); /* marshal the send itself */ if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) { r_xprt->rx_stats.failed_marshal_count++; dprintk("RPC: %s: rpcrdma_marshal_req failed\n", __func__); return -EIO; } if (req->rl_reply == NULL) /* e.g. reconnection */ rpcrdma_recv_buffer_get(req); if (req->rl_reply) { req->rl_reply->rr_func = rpcrdma_reply_handler; /* this need only be done once, but... */ req->rl_reply->rr_xprt = xprt; } if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) { xprt_disconnect(xprt); return -ENOTCONN; /* implies disconnect */ } rqst->rq_bytes_sent = 0; return 0;}static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq){ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); long idle_time = 0; if (xprt_connected(xprt)) idle_time = (long)(jiffies - xprt->last_used) / HZ; seq_printf(seq, "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu " "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n", 0, /* need a local port? */ xprt->stat.bind_count, xprt->stat.connect_count, xprt->stat.connect_time, idle_time, xprt->stat.sends, xprt->stat.recvs, xprt->stat.bad_xids, xprt->stat.req_u, xprt->stat.bklog_u, r_xprt->rx_stats.read_chunk_count, r_xprt->rx_stats.write_chunk_count, r_xprt->rx_stats.reply_chunk_count, r_xprt->rx_stats.total_rdma_request, r_xprt->rx_stats.total_rdma_reply, r_xprt->rx_stats.pullup_copy_count, r_xprt->rx_stats.fixup_copy_count, r_xprt->rx_stats.hardway_register_count, r_xprt->rx_stats.failed_marshal_count, r_xprt->rx_stats.bad_reply_count);}/* * Plumbing for rpc transport switch and kernel module */static struct rpc_xprt_ops xprt_rdma_procs = { .reserve_xprt = xprt_rdma_reserve_xprt, .release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */ .release_request = xprt_release_rqst_cong, /* ditto */ .set_retrans_timeout = xprt_set_retrans_timeout_def, /* ditto */ .rpcbind = rpcb_getport_async, /* sunrpc/rpcb_clnt.c */ .set_port = xprt_rdma_set_port, .connect = xprt_rdma_connect, .buf_alloc = xprt_rdma_allocate, .buf_free = xprt_rdma_free, .send_request = xprt_rdma_send_request, .close = xprt_rdma_close, .destroy = xprt_rdma_destroy, .print_stats = xprt_rdma_print_stats};static struct xprt_class xprt_rdma = { .list = LIST_HEAD_INIT(xprt_rdma.list), .name = "rdma", .owner = THIS_MODULE, .ident = XPRT_TRANSPORT_RDMA, .setup = xprt_setup_rdma,};static void __exit xprt_rdma_cleanup(void){ int rc; dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");#ifdef RPC_DEBUG if (sunrpc_table_header) { unregister_sysctl_table(sunrpc_table_header); sunrpc_table_header = NULL; }#endif rc = xprt_unregister_transport(&xprt_rdma); if (rc) dprintk("RPC: %s: xprt_unregister returned %i\n", __func__, rc);}static int __init xprt_rdma_init(void){ int rc; rc = xprt_register_transport(&xprt_rdma); if (rc) return rc; dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n"); dprintk(KERN_INFO "Defaults:\n"); dprintk(KERN_INFO "\tSlots %d\n" "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n", xprt_rdma_slot_table_entries, xprt_rdma_max_inline_read, xprt_rdma_max_inline_write); dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n", xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);#ifdef RPC_DEBUG if (!sunrpc_table_header) sunrpc_table_header = register_sysctl_table(sunrpc_table);#endif return 0;}module_init(xprt_rdma_init);module_exit(xprt_rdma_cleanup);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -