call.c

来自「Linux Kernel 2.6.9 for OMAP1710」· C语言 代码 · 共 2,276 行 · 第 1/4 页

C
2,276
字号
/* call.c: Rx call routines * * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved. * Written by David Howells (dhowells@redhat.com) * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version * 2 of the License, or (at your option) any later version. */#include <linux/sched.h>#include <linux/slab.h>#include <linux/module.h>#include <rxrpc/rxrpc.h>#include <rxrpc/transport.h>#include <rxrpc/peer.h>#include <rxrpc/connection.h>#include <rxrpc/call.h>#include <rxrpc/message.h>#include "internal.h"__RXACCT_DECL(atomic_t rxrpc_call_count);__RXACCT_DECL(atomic_t rxrpc_message_count);LIST_HEAD(rxrpc_calls);DECLARE_RWSEM(rxrpc_calls_sem);unsigned rxrpc_call_rcv_timeout		= HZ/3;unsigned rxrpc_call_acks_timeout	= HZ/3;unsigned rxrpc_call_dfr_ack_timeout	= HZ/20;unsigned short rxrpc_call_max_resend	= HZ/10;const char *rxrpc_call_states[] = {	"COMPLETE",	"ERROR",	"SRVR_RCV_OPID",	"SRVR_RCV_ARGS",	"SRVR_GOT_ARGS",	"SRVR_SND_REPLY",	"SRVR_RCV_FINAL_ACK",	"CLNT_SND_ARGS",	"CLNT_RCV_REPLY",	"CLNT_GOT_REPLY"};const char *rxrpc_call_error_states[] = {	"NO_ERROR",	"LOCAL_ABORT",	"PEER_ABORT",	"LOCAL_ERROR",	"REMOTE_ERROR"};const char *rxrpc_pkts[] = {	"?00",	"data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug",	"?09", "?10", "?11", "?12", "?13", "?14", "?15"};const char *rxrpc_acks[] = {	"---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",	"-?-"};static const char _acktype[] = "NA-";static void rxrpc_call_receive_packet(struct rxrpc_call *call);static void rxrpc_call_receive_data_packet(struct rxrpc_call *call,					   struct rxrpc_message *msg);static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call,					  struct rxrpc_message *msg);static void rxrpc_call_definitively_ACK(struct rxrpc_call *call,					rxrpc_seq_t higest);static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest);static int __rxrpc_call_read_data(struct rxrpc_call *call);static int rxrpc_call_record_ACK(struct rxrpc_call *call,				 struct rxrpc_message *msg,				 rxrpc_seq_t seq,				 size_t count);#define _state(call) \	_debug("[[[ state %s ]]]", rxrpc_call_states[call->app_call_state]);static void rxrpc_call_default_attn_func(struct rxrpc_call *call){	wake_up(&call->waitq);}static void rxrpc_call_default_error_func(struct rxrpc_call *call){	wake_up(&call->waitq);}static void rxrpc_call_default_aemap_func(struct rxrpc_call *call){	switch (call->app_err_state) {	case RXRPC_ESTATE_LOCAL_ABORT:		call->app_abort_code = -call->app_errno;	case RXRPC_ESTATE_PEER_ABORT:		call->app_errno = -ECONNABORTED;	default:		break;	}}static void __rxrpc_call_acks_timeout(unsigned long _call){	struct rxrpc_call *call = (struct rxrpc_call *) _call;	_debug("ACKS TIMEOUT %05lu", jiffies - call->cjif);	call->flags |= RXRPC_CALL_ACKS_TIMO;	rxrpc_krxiod_queue_call(call);}static void __rxrpc_call_rcv_timeout(unsigned long _call){	struct rxrpc_call *call = (struct rxrpc_call *) _call;	_debug("RCV TIMEOUT %05lu", jiffies - call->cjif);	call->flags |= RXRPC_CALL_RCV_TIMO;	rxrpc_krxiod_queue_call(call);}static void __rxrpc_call_ackr_timeout(unsigned long _call){	struct rxrpc_call *call = (struct rxrpc_call *) _call;	_debug("ACKR TIMEOUT %05lu",jiffies - call->cjif);	call->flags |= RXRPC_CALL_ACKR_TIMO;	rxrpc_krxiod_queue_call(call);}/*****************************************************************************//* * calculate a timeout based on an RTT value */static inline unsigned long __rxrpc_rtt_based_timeout(struct rxrpc_call *call,						      unsigned long val){	unsigned long expiry = call->conn->peer->rtt / (1000000 / HZ);	expiry += 10;	if (expiry < HZ / 25)		expiry = HZ / 25;	if (expiry > HZ)		expiry = HZ;	_leave(" = %lu jiffies", expiry);	return jiffies + expiry;} /* end __rxrpc_rtt_based_timeout() *//*****************************************************************************//* * create a new call record */static inline int __rxrpc_create_call(struct rxrpc_connection *conn,				      struct rxrpc_call **_call){	struct rxrpc_call *call;	_enter("%p", conn);	/* allocate and initialise a call record */	call = (struct rxrpc_call *) get_zeroed_page(GFP_KERNEL);	if (!call) {		_leave(" ENOMEM");		return -ENOMEM;	}	atomic_set(&call->usage, 1);	init_waitqueue_head(&call->waitq);	spin_lock_init(&call->lock);	INIT_LIST_HEAD(&call->link);	INIT_LIST_HEAD(&call->acks_pendq);	INIT_LIST_HEAD(&call->rcv_receiveq);	INIT_LIST_HEAD(&call->rcv_krxiodq_lk);	INIT_LIST_HEAD(&call->app_readyq);	INIT_LIST_HEAD(&call->app_unreadyq);	INIT_LIST_HEAD(&call->app_link);	INIT_LIST_HEAD(&call->app_attn_link);	init_timer(&call->acks_timeout);	call->acks_timeout.data = (unsigned long) call;	call->acks_timeout.function = __rxrpc_call_acks_timeout;	init_timer(&call->rcv_timeout);	call->rcv_timeout.data = (unsigned long) call;	call->rcv_timeout.function = __rxrpc_call_rcv_timeout;	init_timer(&call->ackr_dfr_timo);	call->ackr_dfr_timo.data = (unsigned long) call;	call->ackr_dfr_timo.function = __rxrpc_call_ackr_timeout;	call->conn = conn;	call->ackr_win_bot = 1;	call->ackr_win_top = call->ackr_win_bot + RXRPC_CALL_ACK_WINDOW_SIZE - 1;	call->ackr_prev_seq = 0;	call->app_mark = RXRPC_APP_MARK_EOF;	call->app_attn_func = rxrpc_call_default_attn_func;	call->app_error_func = rxrpc_call_default_error_func;	call->app_aemap_func = rxrpc_call_default_aemap_func;	call->app_scr_alloc = call->app_scratch;	call->cjif = jiffies;	_leave(" = 0 (%p)", call);	*_call = call;	return 0;} /* end __rxrpc_create_call() *//*****************************************************************************//* * create a new call record for outgoing calls */int rxrpc_create_call(struct rxrpc_connection *conn,		      rxrpc_call_attn_func_t attn,		      rxrpc_call_error_func_t error,		      rxrpc_call_aemap_func_t aemap,		      struct rxrpc_call **_call){	DECLARE_WAITQUEUE(myself, current);	struct rxrpc_call *call;	int ret, cix, loop;	_enter("%p", conn);	/* allocate and initialise a call record */	ret = __rxrpc_create_call(conn, &call);	if (ret < 0) {		_leave(" = %d", ret);		return ret;	}	call->app_call_state = RXRPC_CSTATE_CLNT_SND_ARGS;	if (attn)		call->app_attn_func = attn;	if (error)		call->app_error_func = error;	if (aemap)		call->app_aemap_func = aemap;	_state(call);	spin_lock(&conn->lock);	set_current_state(TASK_INTERRUPTIBLE);	add_wait_queue(&conn->chanwait, &myself); try_again:	/* try to find an unused channel */	for (cix = 0; cix < 4; cix++)		if (!conn->channels[cix])			goto obtained_chan;	/* no free channels - wait for one to become available */	ret = -EINTR;	if (signal_pending(current))		goto error_unwait;	spin_unlock(&conn->lock);	schedule();	set_current_state(TASK_INTERRUPTIBLE);	spin_lock(&conn->lock);	goto try_again;	/* got a channel - now attach to the connection */ obtained_chan:	remove_wait_queue(&conn->chanwait, &myself);	set_current_state(TASK_RUNNING);	/* concoct a unique call number */ next_callid:	call->call_id = htonl(++conn->call_counter);	for (loop = 0; loop < 4; loop++)		if (conn->channels[loop] &&		    conn->channels[loop]->call_id == call->call_id)			goto next_callid;	rxrpc_get_connection(conn);	conn->channels[cix] = call; /* assign _after_ done callid check loop */	do_gettimeofday(&conn->atime);	call->chan_ix = htonl(cix);	spin_unlock(&conn->lock);	down_write(&rxrpc_calls_sem);	list_add_tail(&call->call_link, &rxrpc_calls);	up_write(&rxrpc_calls_sem);	__RXACCT(atomic_inc(&rxrpc_call_count));	*_call = call;	_leave(" = 0 (call=%p cix=%u)", call, cix);	return 0; error_unwait:	remove_wait_queue(&conn->chanwait, &myself);	set_current_state(TASK_RUNNING);	spin_unlock(&conn->lock);	free_page((unsigned long) call);	_leave(" = %d", ret);	return ret;} /* end rxrpc_create_call() *//*****************************************************************************//* * create a new call record for incoming calls */int rxrpc_incoming_call(struct rxrpc_connection *conn,			struct rxrpc_message *msg,			struct rxrpc_call **_call){	struct rxrpc_call *call;	unsigned cix;	int ret;	cix = ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK;	_enter("%p,%u,%u", conn, ntohl(msg->hdr.callNumber), cix);	/* allocate and initialise a call record */	ret = __rxrpc_create_call(conn, &call);	if (ret < 0) {		_leave(" = %d", ret);		return ret;	}	call->pkt_rcv_count = 1;	call->app_call_state = RXRPC_CSTATE_SRVR_RCV_OPID;	call->app_mark = sizeof(uint32_t);	_state(call);	/* attach to the connection */	ret = -EBUSY;	call->chan_ix = htonl(cix);	call->call_id = msg->hdr.callNumber;	spin_lock(&conn->lock);	if (!conn->channels[cix] ||	    conn->channels[cix]->app_call_state == RXRPC_CSTATE_COMPLETE ||	    conn->channels[cix]->app_call_state == RXRPC_CSTATE_ERROR	    ) {		conn->channels[cix] = call;		rxrpc_get_connection(conn);		ret = 0;	}	spin_unlock(&conn->lock);	if (ret < 0) {		free_page((unsigned long) call);		call = NULL;	}	if (ret == 0) {		down_write(&rxrpc_calls_sem);		list_add_tail(&call->call_link, &rxrpc_calls);		up_write(&rxrpc_calls_sem);		__RXACCT(atomic_inc(&rxrpc_call_count));		*_call = call;	}	_leave(" = %d [%p]", ret, call);	return ret;} /* end rxrpc_incoming_call() *//*****************************************************************************//* * free a call record */void rxrpc_put_call(struct rxrpc_call *call){	struct rxrpc_connection *conn = call->conn;	struct rxrpc_message *msg;	_enter("%p{u=%d}",call,atomic_read(&call->usage));	/* sanity check */	if (atomic_read(&call->usage) <= 0)		BUG();	/* to prevent a race, the decrement and the de-list must be effectively	 * atomic */	spin_lock(&conn->lock);	if (likely(!atomic_dec_and_test(&call->usage))) {		spin_unlock(&conn->lock);		_leave("");		return;	}	if (conn->channels[ntohl(call->chan_ix)] == call)		conn->channels[ntohl(call->chan_ix)] = NULL;	spin_unlock(&conn->lock);	wake_up(&conn->chanwait);	rxrpc_put_connection(conn);	/* clear the timers and dequeue from krxiod */	del_timer_sync(&call->acks_timeout);	del_timer_sync(&call->rcv_timeout);	del_timer_sync(&call->ackr_dfr_timo);	rxrpc_krxiod_dequeue_call(call);	/* clean up the contents of the struct */	if (call->snd_nextmsg)		rxrpc_put_message(call->snd_nextmsg);	if (call->snd_ping)		rxrpc_put_message(call->snd_ping);	while (!list_empty(&call->acks_pendq)) {		msg = list_entry(call->acks_pendq.next,				 struct rxrpc_message, link);		list_del(&msg->link);		rxrpc_put_message(msg);	}	while (!list_empty(&call->rcv_receiveq)) {		msg = list_entry(call->rcv_receiveq.next,				 struct rxrpc_message, link);		list_del(&msg->link);		rxrpc_put_message(msg);	}	while (!list_empty(&call->app_readyq)) {		msg = list_entry(call->app_readyq.next,				 struct rxrpc_message, link);		list_del(&msg->link);		rxrpc_put_message(msg);	}	while (!list_empty(&call->app_unreadyq)) {		msg = list_entry(call->app_unreadyq.next,				 struct rxrpc_message, link);		list_del(&msg->link);		rxrpc_put_message(msg);	}	module_put(call->owner);	down_write(&rxrpc_calls_sem);	list_del(&call->call_link);	up_write(&rxrpc_calls_sem);	__RXACCT(atomic_dec(&rxrpc_call_count));	free_page((unsigned long) call);	_leave(" [destroyed]");} /* end rxrpc_put_call() *//*****************************************************************************//* * actually generate a normal ACK */static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call *call,					      rxrpc_seq_t seq){	struct rxrpc_message *msg;	struct kvec diov[3];	__be32 aux[4];	int delta, ret;	/* ACKs default to DELAY */	if (!call->ackr.reason)		call->ackr.reason = RXRPC_ACK_DELAY;	_proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",	       jiffies - call->cjif,	       ntohs(call->ackr.maxSkew),	       ntohl(call->ackr.firstPacket),	       ntohl(call->ackr.previousPacket),	       ntohl(call->ackr.serial),	       rxrpc_acks[call->ackr.reason],	       call->ackr.nAcks);	aux[0] = htonl(call->conn->peer->if_mtu);	/* interface MTU */	aux[1] = htonl(1444);				/* max MTU */	aux[2] = htonl(16);				/* rwind */	aux[3] = htonl(4);				/* max packets */	diov[0].iov_len  = sizeof(struct rxrpc_ackpacket);	diov[0].iov_base = &call->ackr;	diov[1].iov_len  = call->ackr_pend_cnt + 3;	diov[1].iov_base = call->ackr_array;	diov[2].iov_len  = sizeof(aux);	diov[2].iov_base = &aux;	/* build and send the message */	ret = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK,				3, diov, GFP_KERNEL, &msg);	if (ret < 0)		goto out;	msg->seq = seq;	msg->hdr.seq = htonl(seq);	msg->hdr.flags |= RXRPC_SLOW_START_OK;	ret = rxrpc_conn_sendmsg(call->conn, msg);	rxrpc_put_message(msg);	if (ret < 0)		goto out;	call->pkt_snd_count++;	/* count how many actual ACKs there were at the front */	for (delta = 0; delta < call->ackr_pend_cnt; delta++)		if (call->ackr_array[delta] != RXRPC_ACK_TYPE_ACK)			break;	call->ackr_pend_cnt -= delta; /* all ACK'd to this point */	/* crank the ACK window around */	if (delta == 0) {		/* un-ACK'd window */	}	else if (delta < RXRPC_CALL_ACK_WINDOW_SIZE) {		/* partially ACK'd window		 * - shuffle down to avoid losing out-of-sequence packets		 */		call->ackr_win_bot += delta;		call->ackr_win_top += delta;		memmove(&call->ackr_array[0],			&call->ackr_array[delta],			call->ackr_pend_cnt);		memset(&call->ackr_array[call->ackr_pend_cnt],		       RXRPC_ACK_TYPE_NACK,		       sizeof(call->ackr_array) - call->ackr_pend_cnt);	}	else {		/* fully ACK'd window		 * - just clear the whole thing		 */		memset(&call->ackr_array,		       RXRPC_ACK_TYPE_NACK,		       sizeof(call->ackr_array));	}	/* clear this ACK */	memset(&call->ackr, 0, sizeof(call->ackr)); out:	if (!call->app_call_state)		printk("___ STATE 0 ___\n");	return ret;} /* end __rxrpc_call_gen_normal_ACK() *//*****************************************************************************//* * note the reception of a packet in the call's ACK records and generate an * appropriate ACK packet if necessary * - returns 0 if packet should be processed, 1 if packet should be ignored *   and -ve on an error */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?