📄 rxrpc.c
字号:
/* Maintain an RxRPC server socket to do AFS communications through * * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. * Written by David Howells (dhowells@redhat.com) * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version * 2 of the License, or (at your option) any later version. */#include <net/sock.h>#include <net/af_rxrpc.h>#include <rxrpc/packet.h>#include "internal.h"#include "afs_cm.h"static struct socket *afs_socket; /* my RxRPC socket */static struct workqueue_struct *afs_async_calls;static atomic_t afs_outstanding_calls;static atomic_t afs_outstanding_skbs;static void afs_wake_up_call_waiter(struct afs_call *);static int afs_wait_for_call_to_complete(struct afs_call *);static void afs_wake_up_async_call(struct afs_call *);static int afs_dont_wait_for_call_to_complete(struct afs_call *);static void afs_process_async_call(struct work_struct *);static void afs_rx_interceptor(struct sock *, unsigned long, struct sk_buff *);static int afs_deliver_cm_op_id(struct afs_call *, struct sk_buff *, bool);/* synchronous call management */const struct afs_wait_mode afs_sync_call = { .rx_wakeup = afs_wake_up_call_waiter, .wait = afs_wait_for_call_to_complete,};/* asynchronous call management */const struct afs_wait_mode afs_async_call = { .rx_wakeup = afs_wake_up_async_call, .wait = afs_dont_wait_for_call_to_complete,};/* asynchronous incoming call management */static const struct afs_wait_mode afs_async_incoming_call = { .rx_wakeup = afs_wake_up_async_call,};/* asynchronous incoming call initial processing */static const struct afs_call_type afs_RXCMxxxx = { .name = "CB.xxxx", .deliver = afs_deliver_cm_op_id, .abort_to_error = afs_abort_to_error,};static void afs_collect_incoming_call(struct work_struct *);static struct sk_buff_head afs_incoming_calls;static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call);/* * open an RxRPC socket and bind it to be a server for callback notifications * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT */int afs_open_socket(void){ struct sockaddr_rxrpc srx; struct socket *socket; int ret; _enter(""); skb_queue_head_init(&afs_incoming_calls); afs_async_calls = create_singlethread_workqueue("kafsd"); if (!afs_async_calls) { _leave(" = -ENOMEM [wq]"); return -ENOMEM; } ret = sock_create_kern(AF_RXRPC, SOCK_DGRAM, PF_INET, &socket); if (ret < 0) { destroy_workqueue(afs_async_calls); _leave(" = %d [socket]", ret); return ret; } socket->sk->sk_allocation = GFP_NOFS; /* bind the callback manager's address to make this a server socket */ srx.srx_family = AF_RXRPC; srx.srx_service = CM_SERVICE; srx.transport_type = SOCK_DGRAM; srx.transport_len = sizeof(srx.transport.sin); srx.transport.sin.sin_family = AF_INET; srx.transport.sin.sin_port = htons(AFS_CM_PORT); memset(&srx.transport.sin.sin_addr, 0, sizeof(srx.transport.sin.sin_addr)); ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx)); if (ret < 0) { sock_release(socket); _leave(" = %d [bind]", ret); return ret; } rxrpc_kernel_intercept_rx_messages(socket, afs_rx_interceptor); afs_socket = socket; _leave(" = 0"); return 0;}/* * close the RxRPC socket AFS was using */void afs_close_socket(void){ _enter(""); sock_release(afs_socket); _debug("dework"); destroy_workqueue(afs_async_calls); ASSERTCMP(atomic_read(&afs_outstanding_skbs), ==, 0); ASSERTCMP(atomic_read(&afs_outstanding_calls), ==, 0); _leave("");}/* * note that the data in a socket buffer is now delivered and that the buffer * should be freed */static void afs_data_delivered(struct sk_buff *skb){ if (!skb) { _debug("DLVR NULL [%d]", atomic_read(&afs_outstanding_skbs)); dump_stack(); } else { _debug("DLVR %p{%u} [%d]", skb, skb->mark, atomic_read(&afs_outstanding_skbs)); if (atomic_dec_return(&afs_outstanding_skbs) == -1) BUG(); rxrpc_kernel_data_delivered(skb); }}/* * free a socket buffer */static void afs_free_skb(struct sk_buff *skb){ if (!skb) { _debug("FREE NULL [%d]", atomic_read(&afs_outstanding_skbs)); dump_stack(); } else { _debug("FREE %p{%u} [%d]", skb, skb->mark, atomic_read(&afs_outstanding_skbs)); if (atomic_dec_return(&afs_outstanding_skbs) == -1) BUG(); rxrpc_kernel_free_skb(skb); }}/* * free a call */static void afs_free_call(struct afs_call *call){ _debug("DONE %p{%s} [%d]", call, call->type->name, atomic_read(&afs_outstanding_calls)); if (atomic_dec_return(&afs_outstanding_calls) == -1) BUG(); ASSERTCMP(call->rxcall, ==, NULL); ASSERT(!work_pending(&call->async_work)); ASSERT(skb_queue_empty(&call->rx_queue)); ASSERT(call->type->name != NULL); kfree(call->request); kfree(call);}/* * allocate a call with flat request and reply buffers */struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type, size_t request_size, size_t reply_size){ struct afs_call *call; call = kzalloc(sizeof(*call), GFP_NOFS); if (!call) goto nomem_call; _debug("CALL %p{%s} [%d]", call, type->name, atomic_read(&afs_outstanding_calls)); atomic_inc(&afs_outstanding_calls); call->type = type; call->request_size = request_size; call->reply_max = reply_size; if (request_size) { call->request = kmalloc(request_size, GFP_NOFS); if (!call->request) goto nomem_free; } if (reply_size) { call->buffer = kmalloc(reply_size, GFP_NOFS); if (!call->buffer) goto nomem_free; } init_waitqueue_head(&call->waitq); skb_queue_head_init(&call->rx_queue); return call;nomem_free: afs_free_call(call);nomem_call: return NULL;}/* * clean up a call with flat buffer */void afs_flat_call_destructor(struct afs_call *call){ _enter(""); kfree(call->request); call->request = NULL; kfree(call->buffer); call->buffer = NULL;}/* * attach the data from a bunch of pages on an inode to a call */static int afs_send_pages(struct afs_call *call, struct msghdr *msg, struct kvec *iov){ struct page *pages[8]; unsigned count, n, loop, offset, to; pgoff_t first = call->first, last = call->last; int ret; _enter(""); offset = call->first_offset; call->first_offset = 0; do { _debug("attach %lx-%lx", first, last); count = last - first + 1; if (count > ARRAY_SIZE(pages)) count = ARRAY_SIZE(pages); n = find_get_pages_contig(call->mapping, first, count, pages); ASSERTCMP(n, ==, count); loop = 0; do { msg->msg_flags = 0; to = PAGE_SIZE; if (first + loop >= last) to = call->last_to; else msg->msg_flags = MSG_MORE; iov->iov_base = kmap(pages[loop]) + offset; iov->iov_len = to - offset; offset = 0; _debug("- range %u-%u%s", offset, to, msg->msg_flags ? " [more]" : ""); msg->msg_iov = (struct iovec *) iov; msg->msg_iovlen = 1; /* have to change the state *before* sending the last * packet as RxRPC might give us the reply before it * returns from sending the request */ if (first + loop >= last) call->state = AFS_CALL_AWAIT_REPLY; ret = rxrpc_kernel_send_data(call->rxcall, msg, to - offset); kunmap(pages[loop]); if (ret < 0) break; } while (++loop < count); first += count; for (loop = 0; loop < count; loop++) put_page(pages[loop]); if (ret < 0) break; } while (first <= last); _leave(" = %d", ret); return ret;}/* * initiate a call */int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp, const struct afs_wait_mode *wait_mode){ struct sockaddr_rxrpc srx; struct rxrpc_call *rxcall; struct msghdr msg; struct kvec iov[1]; int ret; _enter("%x,{%d},", addr->s_addr, ntohs(call->port)); ASSERT(call->type != NULL); ASSERT(call->type->name != NULL); _debug("____MAKE %p{%s,%x} [%d]____", call, call->type->name, key_serial(call->key), atomic_read(&afs_outstanding_calls)); call->wait_mode = wait_mode; INIT_WORK(&call->async_work, afs_process_async_call); memset(&srx, 0, sizeof(srx)); srx.srx_family = AF_RXRPC; srx.srx_service = call->service_id; srx.transport_type = SOCK_DGRAM; srx.transport_len = sizeof(srx.transport.sin); srx.transport.sin.sin_family = AF_INET; srx.transport.sin.sin_port = call->port; memcpy(&srx.transport.sin.sin_addr, addr, 4); /* create a call */ rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key, (unsigned long) call, gfp); call->key = NULL; if (IS_ERR(rxcall)) { ret = PTR_ERR(rxcall); goto error_kill_call; } call->rxcall = rxcall; /* send the request */ iov[0].iov_base = call->request; iov[0].iov_len = call->request_size; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = (struct iovec *) iov; msg.msg_iovlen = 1; msg.msg_control = NULL; msg.msg_controllen = 0; msg.msg_flags = (call->send_pages ? MSG_MORE : 0); /* have to change the state *before* sending the last packet as RxRPC * might give us the reply before it returns from sending the * request */ if (!call->send_pages) call->state = AFS_CALL_AWAIT_REPLY; ret = rxrpc_kernel_send_data(rxcall, &msg, call->request_size); if (ret < 0) goto error_do_abort; if (call->send_pages) { ret = afs_send_pages(call, &msg, iov); if (ret < 0) goto error_do_abort; } /* at this point, an async call may no longer exist as it may have * already completed */ return wait_mode->wait(call);error_do_abort: rxrpc_kernel_abort_call(rxcall, RX_USER_ABORT); rxrpc_kernel_end_call(rxcall); call->rxcall = NULL;error_kill_call: call->type->destructor(call); afs_free_call(call); _leave(" = %d", ret); return ret;}/* * handles intercepted messages that were arriving in the socket's Rx queue * - called with the socket receive queue lock held to ensure message ordering * - called with softirqs disabled */static void afs_rx_interceptor(struct sock *sk, unsigned long user_call_ID, struct sk_buff *skb){ struct afs_call *call = (struct afs_call *) user_call_ID; _enter("%p,,%u", call, skb->mark); _debug("ICPT %p{%u} [%d]", skb, skb->mark, atomic_read(&afs_outstanding_skbs)); ASSERTCMP(sk, ==, afs_socket->sk); atomic_inc(&afs_outstanding_skbs); if (!call) { /* its an incoming call for our callback service */ skb_queue_tail(&afs_incoming_calls, skb); schedule_work(&afs_collect_incoming_call_work); } else { /* route the messages directly to the appropriate call */ skb_queue_tail(&call->rx_queue, skb); call->wait_mode->rx_wakeup(call); } _leave("");}/* * deliver messages to a call */static void afs_deliver_to_call(struct afs_call *call){ struct sk_buff *skb; bool last; u32 abort_code;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -