📄 rxrpc.c
字号:
int ret; _enter(""); while ((call->state == AFS_CALL_AWAIT_REPLY || call->state == AFS_CALL_AWAIT_OP_ID || call->state == AFS_CALL_AWAIT_REQUEST || call->state == AFS_CALL_AWAIT_ACK) && (skb = skb_dequeue(&call->rx_queue))) { switch (skb->mark) { case RXRPC_SKB_MARK_DATA: _debug("Rcv DATA"); last = rxrpc_kernel_is_data_last(skb); ret = call->type->deliver(call, skb, last); switch (ret) { case 0: if (last && call->state == AFS_CALL_AWAIT_REPLY) call->state = AFS_CALL_COMPLETE; break; case -ENOTCONN: abort_code = RX_CALL_DEAD; goto do_abort; case -ENOTSUPP: abort_code = RX_INVALID_OPERATION; goto do_abort; default: abort_code = RXGEN_CC_UNMARSHAL; if (call->state != AFS_CALL_AWAIT_REPLY) abort_code = RXGEN_SS_UNMARSHAL; do_abort: rxrpc_kernel_abort_call(call->rxcall, abort_code); call->error = ret; call->state = AFS_CALL_ERROR; break; } afs_data_delivered(skb); skb = NULL; continue; case RXRPC_SKB_MARK_FINAL_ACK: _debug("Rcv ACK"); call->state = AFS_CALL_COMPLETE; break; case RXRPC_SKB_MARK_BUSY: _debug("Rcv BUSY"); call->error = -EBUSY; call->state = AFS_CALL_BUSY; break; case RXRPC_SKB_MARK_REMOTE_ABORT: abort_code = rxrpc_kernel_get_abort_code(skb); call->error = call->type->abort_to_error(abort_code); call->state = AFS_CALL_ABORTED; _debug("Rcv ABORT %u -> %d", abort_code, call->error); break; case RXRPC_SKB_MARK_NET_ERROR: call->error = -rxrpc_kernel_get_error_number(skb); call->state = AFS_CALL_ERROR; _debug("Rcv NET ERROR %d", call->error); break; case RXRPC_SKB_MARK_LOCAL_ERROR: call->error = -rxrpc_kernel_get_error_number(skb); call->state = AFS_CALL_ERROR; _debug("Rcv LOCAL ERROR %d", call->error); break; default: BUG(); break; } afs_free_skb(skb); } /* make sure the queue is empty if the call is done with (we might have * aborted the call early because of an unmarshalling error) */ if (call->state >= AFS_CALL_COMPLETE) { while ((skb = skb_dequeue(&call->rx_queue))) afs_free_skb(skb); if (call->incoming) { rxrpc_kernel_end_call(call->rxcall); call->rxcall = NULL; call->type->destructor(call); afs_free_call(call); } } _leave("");}/* * wait synchronously for a call to complete */static int afs_wait_for_call_to_complete(struct afs_call *call){ struct sk_buff *skb; int ret; DECLARE_WAITQUEUE(myself, current); _enter(""); add_wait_queue(&call->waitq, &myself); for (;;) { set_current_state(TASK_INTERRUPTIBLE); /* deliver any messages that are in the queue */ if (!skb_queue_empty(&call->rx_queue)) { __set_current_state(TASK_RUNNING); afs_deliver_to_call(call); continue; } ret = call->error; if (call->state >= AFS_CALL_COMPLETE) break; ret = -EINTR; if (signal_pending(current)) break; schedule(); } remove_wait_queue(&call->waitq, &myself); __set_current_state(TASK_RUNNING); /* kill the call */ if (call->state < AFS_CALL_COMPLETE) { _debug("call incomplete"); rxrpc_kernel_abort_call(call->rxcall, RX_CALL_DEAD); while ((skb = skb_dequeue(&call->rx_queue))) afs_free_skb(skb); } _debug("call complete"); rxrpc_kernel_end_call(call->rxcall); call->rxcall = NULL; call->type->destructor(call); afs_free_call(call); _leave(" = %d", ret); return ret;}/* * wake up a waiting call */static void afs_wake_up_call_waiter(struct afs_call *call){ wake_up(&call->waitq);}/* * wake up an asynchronous call */static void afs_wake_up_async_call(struct afs_call *call){ _enter(""); queue_work(afs_async_calls, &call->async_work);}/* * put a call into asynchronous mode * - mustn't touch the call descriptor as the call my have completed by the * time we get here */static int afs_dont_wait_for_call_to_complete(struct afs_call *call){ _enter(""); return -EINPROGRESS;}/* * delete an asynchronous call */static void afs_delete_async_call(struct work_struct *work){ struct afs_call *call = container_of(work, struct afs_call, async_work); _enter(""); afs_free_call(call); _leave("");}/* * perform processing on an asynchronous call * - on a multiple-thread workqueue this work item may try to run on several * CPUs at the same time */static void afs_process_async_call(struct work_struct *work){ struct afs_call *call = container_of(work, struct afs_call, async_work); _enter(""); if (!skb_queue_empty(&call->rx_queue)) afs_deliver_to_call(call); if (call->state >= AFS_CALL_COMPLETE && call->wait_mode) { if (call->wait_mode->async_complete) call->wait_mode->async_complete(call->reply, call->error); call->reply = NULL; /* kill the call */ rxrpc_kernel_end_call(call->rxcall); call->rxcall = NULL; if (call->type->destructor) call->type->destructor(call); /* we can't just delete the call because the work item may be * queued */ PREPARE_WORK(&call->async_work, afs_delete_async_call); queue_work(afs_async_calls, &call->async_work); } _leave("");}/* * empty a socket buffer into a flat reply buffer */void afs_transfer_reply(struct afs_call *call, struct sk_buff *skb){ size_t len = skb->len; if (skb_copy_bits(skb, 0, call->buffer + call->reply_size, len) < 0) BUG(); call->reply_size += len;}/* * accept the backlog of incoming calls */static void afs_collect_incoming_call(struct work_struct *work){ struct rxrpc_call *rxcall; struct afs_call *call = NULL; struct sk_buff *skb; while ((skb = skb_dequeue(&afs_incoming_calls))) { _debug("new call"); /* don't need the notification */ afs_free_skb(skb); if (!call) { call = kzalloc(sizeof(struct afs_call), GFP_KERNEL); if (!call) { rxrpc_kernel_reject_call(afs_socket); return; } INIT_WORK(&call->async_work, afs_process_async_call); call->wait_mode = &afs_async_incoming_call; call->type = &afs_RXCMxxxx; init_waitqueue_head(&call->waitq); skb_queue_head_init(&call->rx_queue); call->state = AFS_CALL_AWAIT_OP_ID; _debug("CALL %p{%s} [%d]", call, call->type->name, atomic_read(&afs_outstanding_calls)); atomic_inc(&afs_outstanding_calls); } rxcall = rxrpc_kernel_accept_call(afs_socket, (unsigned long) call); if (!IS_ERR(rxcall)) { call->rxcall = rxcall; call = NULL; } } if (call) afs_free_call(call);}/* * grab the operation ID from an incoming cache manager call */static int afs_deliver_cm_op_id(struct afs_call *call, struct sk_buff *skb, bool last){ size_t len = skb->len; void *oibuf = (void *) &call->operation_ID; _enter("{%u},{%zu},%d", call->offset, len, last); ASSERTCMP(call->offset, <, 4); /* the operation ID forms the first four bytes of the request data */ len = min_t(size_t, len, 4 - call->offset); if (skb_copy_bits(skb, 0, oibuf + call->offset, len) < 0) BUG(); if (!pskb_pull(skb, len)) BUG(); call->offset += len; if (call->offset < 4) { if (last) { _leave(" = -EBADMSG [op ID short]"); return -EBADMSG; } _leave(" = 0 [incomplete]"); return 0; } call->state = AFS_CALL_AWAIT_REQUEST; /* ask the cache manager to route the call (it'll change the call type * if successful) */ if (!afs_cm_incoming_call(call)) return -ENOTSUPP; /* pass responsibility for the remainer of this message off to the * cache manager op */ return call->type->deliver(call, skb, last);}/* * send an empty reply */void afs_send_empty_reply(struct afs_call *call){ struct msghdr msg; struct iovec iov[1]; _enter(""); iov[0].iov_base = NULL; iov[0].iov_len = 0; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 0; msg.msg_control = NULL; msg.msg_controllen = 0; msg.msg_flags = 0; call->state = AFS_CALL_AWAIT_ACK; switch (rxrpc_kernel_send_data(call->rxcall, &msg, 0)) { case 0: _leave(" [replied]"); return; case -ENOMEM: _debug("oom"); rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT); default: rxrpc_kernel_end_call(call->rxcall); call->rxcall = NULL; call->type->destructor(call); afs_free_call(call); _leave(" [error]"); return; }}/* * send a simple reply */void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len){ struct msghdr msg; struct iovec iov[1]; int n; _enter(""); iov[0].iov_base = (void *) buf; iov[0].iov_len = len; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; msg.msg_control = NULL; msg.msg_controllen = 0; msg.msg_flags = 0; call->state = AFS_CALL_AWAIT_ACK; n = rxrpc_kernel_send_data(call->rxcall, &msg, len); if (n >= 0) { _leave(" [replied]"); return; } if (n == -ENOMEM) { _debug("oom"); rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT); } rxrpc_kernel_end_call(call->rxcall); call->rxcall = NULL; call->type->destructor(call); afs_free_call(call); _leave(" [error]");}/* * extract a piece of data from the received data socket buffers */int afs_extract_data(struct afs_call *call, struct sk_buff *skb, bool last, void *buf, size_t count){ size_t len = skb->len; _enter("{%u},{%zu},%d,,%zu", call->offset, len, last, count); ASSERTCMP(call->offset, <, count); len = min_t(size_t, len, count - call->offset); if (skb_copy_bits(skb, 0, buf + call->offset, len) < 0 || !pskb_pull(skb, len)) BUG(); call->offset += len; if (call->offset < count) { if (last) { _leave(" = -EBADMSG [%d < %zu]", call->offset, count); return -EBADMSG; } _leave(" = -EAGAIN"); return -EAGAIN; } return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -