call.c
来自「Linux Kernel 2.6.9 for OMAP1710」· C语言 代码 · 共 2,276 行 · 第 1/4 页
C
2,276 行
/* call.c: Rx call routines * * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. * Written by David Howells (dhowells@redhat.com) * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version * 2 of the License, or (at your option) any later version. */#include <linux/sched.h>#include <linux/slab.h>#include <linux/module.h>#include <rxrpc/rxrpc.h>#include <rxrpc/transport.h>#include <rxrpc/peer.h>#include <rxrpc/connection.h>#include <rxrpc/call.h>#include <rxrpc/message.h>#include "internal.h"__RXACCT_DECL(atomic_t rxrpc_call_count);__RXACCT_DECL(atomic_t rxrpc_message_count);LIST_HEAD(rxrpc_calls);DECLARE_RWSEM(rxrpc_calls_sem);unsigned rxrpc_call_rcv_timeout = HZ/3;unsigned rxrpc_call_acks_timeout = HZ/3;unsigned rxrpc_call_dfr_ack_timeout = HZ/20;unsigned short rxrpc_call_max_resend = HZ/10;const char *rxrpc_call_states[] = { "COMPLETE", "ERROR", "SRVR_RCV_OPID", "SRVR_RCV_ARGS", "SRVR_GOT_ARGS", "SRVR_SND_REPLY", "SRVR_RCV_FINAL_ACK", "CLNT_SND_ARGS", "CLNT_RCV_REPLY", "CLNT_GOT_REPLY"};const char *rxrpc_call_error_states[] = { "NO_ERROR", "LOCAL_ABORT", "PEER_ABORT", "LOCAL_ERROR", "REMOTE_ERROR"};const char *rxrpc_pkts[] = { "?00", "data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug", "?09", "?10", "?11", "?12", "?13", "?14", "?15"};const char *rxrpc_acks[] = { "---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL", "-?-"};static const char _acktype[] = "NA-";static void rxrpc_call_receive_packet(struct rxrpc_call *call);static void rxrpc_call_receive_data_packet(struct rxrpc_call *call, struct rxrpc_message *msg);static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call, struct rxrpc_message *msg);static void rxrpc_call_definitively_ACK(struct rxrpc_call *call, rxrpc_seq_t higest);static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest);static int __rxrpc_call_read_data(struct rxrpc_call *call);static int rxrpc_call_record_ACK(struct rxrpc_call *call, struct rxrpc_message *msg, rxrpc_seq_t seq, size_t count);#define _state(call) \ _debug("[[[ state %s ]]]", rxrpc_call_states[call->app_call_state]);static void rxrpc_call_default_attn_func(struct rxrpc_call *call){ wake_up(&call->waitq);}static void rxrpc_call_default_error_func(struct rxrpc_call *call){ wake_up(&call->waitq);}static void rxrpc_call_default_aemap_func(struct rxrpc_call *call){ switch (call->app_err_state) { case RXRPC_ESTATE_LOCAL_ABORT: call->app_abort_code = -call->app_errno; case RXRPC_ESTATE_PEER_ABORT: call->app_errno = -ECONNABORTED; default: break; }}static void __rxrpc_call_acks_timeout(unsigned long _call){ struct rxrpc_call *call = (struct rxrpc_call *) _call; _debug("ACKS TIMEOUT %05lu", jiffies - call->cjif); call->flags |= RXRPC_CALL_ACKS_TIMO; rxrpc_krxiod_queue_call(call);}static void __rxrpc_call_rcv_timeout(unsigned long _call){ struct rxrpc_call *call = (struct rxrpc_call *) _call; _debug("RCV TIMEOUT %05lu", jiffies - call->cjif); call->flags |= RXRPC_CALL_RCV_TIMO; rxrpc_krxiod_queue_call(call);}static void __rxrpc_call_ackr_timeout(unsigned long _call){ struct rxrpc_call *call = (struct rxrpc_call *) _call; _debug("ACKR TIMEOUT %05lu",jiffies - call->cjif); call->flags |= RXRPC_CALL_ACKR_TIMO; rxrpc_krxiod_queue_call(call);}/*****************************************************************************//* * calculate a timeout based on an RTT value */static inline unsigned long __rxrpc_rtt_based_timeout(struct rxrpc_call *call, unsigned long val){ unsigned long expiry = call->conn->peer->rtt / (1000000 / HZ); expiry += 10; if (expiry < HZ / 25) expiry = HZ / 25; if (expiry > HZ) expiry = HZ; _leave(" = %lu jiffies", expiry); return jiffies + expiry;} /* end __rxrpc_rtt_based_timeout() *//*****************************************************************************//* * create a new call record */static inline int __rxrpc_create_call(struct rxrpc_connection *conn, struct rxrpc_call **_call){ struct rxrpc_call *call; _enter("%p", conn); /* allocate and initialise a call record */ call = (struct rxrpc_call *) get_zeroed_page(GFP_KERNEL); if (!call) { _leave(" ENOMEM"); return -ENOMEM; } atomic_set(&call->usage, 1); init_waitqueue_head(&call->waitq); spin_lock_init(&call->lock); INIT_LIST_HEAD(&call->link); INIT_LIST_HEAD(&call->acks_pendq); INIT_LIST_HEAD(&call->rcv_receiveq); INIT_LIST_HEAD(&call->rcv_krxiodq_lk); INIT_LIST_HEAD(&call->app_readyq); INIT_LIST_HEAD(&call->app_unreadyq); INIT_LIST_HEAD(&call->app_link); INIT_LIST_HEAD(&call->app_attn_link); init_timer(&call->acks_timeout); call->acks_timeout.data = (unsigned long) call; call->acks_timeout.function = __rxrpc_call_acks_timeout; init_timer(&call->rcv_timeout); call->rcv_timeout.data = (unsigned long) call; call->rcv_timeout.function = __rxrpc_call_rcv_timeout; init_timer(&call->ackr_dfr_timo); call->ackr_dfr_timo.data = (unsigned long) call; call->ackr_dfr_timo.function = __rxrpc_call_ackr_timeout; call->conn = conn; call->ackr_win_bot = 1; call->ackr_win_top = call->ackr_win_bot + RXRPC_CALL_ACK_WINDOW_SIZE - 1; call->ackr_prev_seq = 0; call->app_mark = RXRPC_APP_MARK_EOF; call->app_attn_func = rxrpc_call_default_attn_func; call->app_error_func = rxrpc_call_default_error_func; call->app_aemap_func = rxrpc_call_default_aemap_func; call->app_scr_alloc = call->app_scratch; call->cjif = jiffies; _leave(" = 0 (%p)", call); *_call = call; return 0;} /* end __rxrpc_create_call() *//*****************************************************************************//* * create a new call record for outgoing calls */int rxrpc_create_call(struct rxrpc_connection *conn, rxrpc_call_attn_func_t attn, rxrpc_call_error_func_t error, rxrpc_call_aemap_func_t aemap, struct rxrpc_call **_call){ DECLARE_WAITQUEUE(myself, current); struct rxrpc_call *call; int ret, cix, loop; _enter("%p", conn); /* allocate and initialise a call record */ ret = __rxrpc_create_call(conn, &call); if (ret < 0) { _leave(" = %d", ret); return ret; } call->app_call_state = RXRPC_CSTATE_CLNT_SND_ARGS; if (attn) call->app_attn_func = attn; if (error) call->app_error_func = error; if (aemap) call->app_aemap_func = aemap; _state(call); spin_lock(&conn->lock); set_current_state(TASK_INTERRUPTIBLE); add_wait_queue(&conn->chanwait, &myself); try_again: /* try to find an unused channel */ for (cix = 0; cix < 4; cix++) if (!conn->channels[cix]) goto obtained_chan; /* no free channels - wait for one to become available */ ret = -EINTR; if (signal_pending(current)) goto error_unwait; spin_unlock(&conn->lock); schedule(); set_current_state(TASK_INTERRUPTIBLE); spin_lock(&conn->lock); goto try_again; /* got a channel - now attach to the connection */ obtained_chan: remove_wait_queue(&conn->chanwait, &myself); set_current_state(TASK_RUNNING); /* concoct a unique call number */ next_callid: call->call_id = htonl(++conn->call_counter); for (loop = 0; loop < 4; loop++) if (conn->channels[loop] && conn->channels[loop]->call_id == call->call_id) goto next_callid; rxrpc_get_connection(conn); conn->channels[cix] = call; /* assign _after_ done callid check loop */ do_gettimeofday(&conn->atime); call->chan_ix = htonl(cix); spin_unlock(&conn->lock); down_write(&rxrpc_calls_sem); list_add_tail(&call->call_link, &rxrpc_calls); up_write(&rxrpc_calls_sem); __RXACCT(atomic_inc(&rxrpc_call_count)); *_call = call; _leave(" = 0 (call=%p cix=%u)", call, cix); return 0; error_unwait: remove_wait_queue(&conn->chanwait, &myself); set_current_state(TASK_RUNNING); spin_unlock(&conn->lock); free_page((unsigned long) call); _leave(" = %d", ret); return ret;} /* end rxrpc_create_call() *//*****************************************************************************//* * create a new call record for incoming calls */int rxrpc_incoming_call(struct rxrpc_connection *conn, struct rxrpc_message *msg, struct rxrpc_call **_call){ struct rxrpc_call *call; unsigned cix; int ret; cix = ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK; _enter("%p,%u,%u", conn, ntohl(msg->hdr.callNumber), cix); /* allocate and initialise a call record */ ret = __rxrpc_create_call(conn, &call); if (ret < 0) { _leave(" = %d", ret); return ret; } call->pkt_rcv_count = 1; call->app_call_state = RXRPC_CSTATE_SRVR_RCV_OPID; call->app_mark = sizeof(uint32_t); _state(call); /* attach to the connection */ ret = -EBUSY; call->chan_ix = htonl(cix); call->call_id = msg->hdr.callNumber; spin_lock(&conn->lock); if (!conn->channels[cix] || conn->channels[cix]->app_call_state == RXRPC_CSTATE_COMPLETE || conn->channels[cix]->app_call_state == RXRPC_CSTATE_ERROR ) { conn->channels[cix] = call; rxrpc_get_connection(conn); ret = 0; } spin_unlock(&conn->lock); if (ret < 0) { free_page((unsigned long) call); call = NULL; } if (ret == 0) { down_write(&rxrpc_calls_sem); list_add_tail(&call->call_link, &rxrpc_calls); up_write(&rxrpc_calls_sem); __RXACCT(atomic_inc(&rxrpc_call_count)); *_call = call; } _leave(" = %d [%p]", ret, call); return ret;} /* end rxrpc_incoming_call() *//*****************************************************************************//* * free a call record */void rxrpc_put_call(struct rxrpc_call *call){ struct rxrpc_connection *conn = call->conn; struct rxrpc_message *msg; _enter("%p{u=%d}",call,atomic_read(&call->usage)); /* sanity check */ if (atomic_read(&call->usage) <= 0) BUG(); /* to prevent a race, the decrement and the de-list must be effectively * atomic */ spin_lock(&conn->lock); if (likely(!atomic_dec_and_test(&call->usage))) { spin_unlock(&conn->lock); _leave(""); return; } if (conn->channels[ntohl(call->chan_ix)] == call) conn->channels[ntohl(call->chan_ix)] = NULL; spin_unlock(&conn->lock); wake_up(&conn->chanwait); rxrpc_put_connection(conn); /* clear the timers and dequeue from krxiod */ del_timer_sync(&call->acks_timeout); del_timer_sync(&call->rcv_timeout); del_timer_sync(&call->ackr_dfr_timo); rxrpc_krxiod_dequeue_call(call); /* clean up the contents of the struct */ if (call->snd_nextmsg) rxrpc_put_message(call->snd_nextmsg); if (call->snd_ping) rxrpc_put_message(call->snd_ping); while (!list_empty(&call->acks_pendq)) { msg = list_entry(call->acks_pendq.next, struct rxrpc_message, link); list_del(&msg->link); rxrpc_put_message(msg); } while (!list_empty(&call->rcv_receiveq)) { msg = list_entry(call->rcv_receiveq.next, struct rxrpc_message, link); list_del(&msg->link); rxrpc_put_message(msg); } while (!list_empty(&call->app_readyq)) { msg = list_entry(call->app_readyq.next, struct rxrpc_message, link); list_del(&msg->link); rxrpc_put_message(msg); } while (!list_empty(&call->app_unreadyq)) { msg = list_entry(call->app_unreadyq.next, struct rxrpc_message, link); list_del(&msg->link); rxrpc_put_message(msg); } module_put(call->owner); down_write(&rxrpc_calls_sem); list_del(&call->call_link); up_write(&rxrpc_calls_sem); __RXACCT(atomic_dec(&rxrpc_call_count)); free_page((unsigned long) call); _leave(" [destroyed]");} /* end rxrpc_put_call() *//*****************************************************************************//* * actually generate a normal ACK */static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call *call, rxrpc_seq_t seq){ struct rxrpc_message *msg; struct kvec diov[3]; __be32 aux[4]; int delta, ret; /* ACKs default to DELAY */ if (!call->ackr.reason) call->ackr.reason = RXRPC_ACK_DELAY; _proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", jiffies - call->cjif, ntohs(call->ackr.maxSkew), ntohl(call->ackr.firstPacket), ntohl(call->ackr.previousPacket), ntohl(call->ackr.serial), rxrpc_acks[call->ackr.reason], call->ackr.nAcks); aux[0] = htonl(call->conn->peer->if_mtu); /* interface MTU */ aux[1] = htonl(1444); /* max MTU */ aux[2] = htonl(16); /* rwind */ aux[3] = htonl(4); /* max packets */ diov[0].iov_len = sizeof(struct rxrpc_ackpacket); diov[0].iov_base = &call->ackr; diov[1].iov_len = call->ackr_pend_cnt + 3; diov[1].iov_base = call->ackr_array; diov[2].iov_len = sizeof(aux); diov[2].iov_base = &aux; /* build and send the message */ ret = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK, 3, diov, GFP_KERNEL, &msg); if (ret < 0) goto out; msg->seq = seq; msg->hdr.seq = htonl(seq); msg->hdr.flags |= RXRPC_SLOW_START_OK; ret = rxrpc_conn_sendmsg(call->conn, msg); rxrpc_put_message(msg); if (ret < 0) goto out; call->pkt_snd_count++; /* count how many actual ACKs there were at the front */ for (delta = 0; delta < call->ackr_pend_cnt; delta++) if (call->ackr_array[delta] != RXRPC_ACK_TYPE_ACK) break; call->ackr_pend_cnt -= delta; /* all ACK'd to this point */ /* crank the ACK window around */ if (delta == 0) { /* un-ACK'd window */ } else if (delta < RXRPC_CALL_ACK_WINDOW_SIZE) { /* partially ACK'd window * - shuffle down to avoid losing out-of-sequence packets */ call->ackr_win_bot += delta; call->ackr_win_top += delta; memmove(&call->ackr_array[0], &call->ackr_array[delta], call->ackr_pend_cnt); memset(&call->ackr_array[call->ackr_pend_cnt], RXRPC_ACK_TYPE_NACK, sizeof(call->ackr_array) - call->ackr_pend_cnt); } else { /* fully ACK'd window * - just clear the whole thing */ memset(&call->ackr_array, RXRPC_ACK_TYPE_NACK, sizeof(call->ackr_array)); } /* clear this ACK */ memset(&call->ackr, 0, sizeof(call->ackr)); out: if (!call->app_call_state) printk("___ STATE 0 ___\n"); return ret;} /* end __rxrpc_call_gen_normal_ACK() *//*****************************************************************************//* * note the reception of a packet in the call's ACK records and generate an * appropriate ACK packet if necessary * - returns 0 if packet should be processed, 1 if packet should be ignored * and -ve on an error */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?