⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 link.c

📁 linux 内核源代码
💻 C
📖 第 1 页 / 共 5 页
字号:
		struct tipc_msg *msg = buf_msg(buf);		l_ptr->stats.sent_fragments++;		msg_set_long_msgno(msg, l_ptr->long_msg_seq_no);		link_add_to_outqueue(l_ptr, buf, msg);		msg_dbg(msg, ">ADD>");		buf = next;	}	/* Send it, if possible: */	tipc_link_push_queue(l_ptr);	tipc_node_unlock(node);	return dsz;}/* * tipc_link_push_packet: Push one unsent packet to the media */u32 tipc_link_push_packet(struct link *l_ptr){	struct sk_buff *buf = l_ptr->first_out;	u32 r_q_size = l_ptr->retransm_queue_size;	u32 r_q_head = l_ptr->retransm_queue_head;	/* Step to position where retransmission failed, if any,    */	/* consider that buffers may have been released in meantime */	if (r_q_size && buf) {		u32 last = lesser(mod(r_q_head + r_q_size),				  link_last_sent(l_ptr));		u32 first = msg_seqno(buf_msg(buf));		while (buf && less(first, r_q_head)) {			first = mod(first + 1);			buf = buf->next;		}		l_ptr->retransm_queue_head = r_q_head = first;		l_ptr->retransm_queue_size = r_q_size = mod(last - first);	}	/* Continue retransmission now, if there is anything: */	if (r_q_size && buf && !skb_cloned(buf)) {		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {			msg_dbg(buf_msg(buf), ">DEF-RETR>");			l_ptr->retransm_queue_head = mod(++r_q_head);			l_ptr->retransm_queue_size = --r_q_size;			l_ptr->stats.retransmitted++;			return TIPC_OK;		} else {			l_ptr->stats.bearer_congs++;			msg_dbg(buf_msg(buf), "|>DEF-RETR>");			return PUSH_FAILED;		}	}	/* Send deferred protocol message, if any: */	buf = l_ptr->proto_msg_queue;	if (buf) {		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));		msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in);		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {			msg_dbg(buf_msg(buf), ">DEF-PROT>");			l_ptr->unacked_window = 0;			buf_discard(buf);			l_ptr->proto_msg_queue = NULL;			return TIPC_OK;		} else {			msg_dbg(buf_msg(buf), "|>DEF-PROT>");			l_ptr->stats.bearer_congs++;			return PUSH_FAILED;		}	}	/* Send one deferred data message, if send window not full: */	buf = l_ptr->next_out;	if (buf) {		struct tipc_msg *msg = buf_msg(buf);		u32 next = msg_seqno(msg);		u32 first = msg_seqno(buf_msg(l_ptr->first_out));		if (mod(next - first) < l_ptr->queue_limit[0]) {			msg_set_ack(msg, mod(l_ptr->next_in_no - 1));			msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);			if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {				if (msg_user(msg) == MSG_BUNDLER)					msg_set_type(msg, CLOSED_MSG);				msg_dbg(msg, ">PUSH-DATA>");				l_ptr->next_out = buf->next;				return TIPC_OK;			} else {				msg_dbg(msg, "|PUSH-DATA|");				l_ptr->stats.bearer_congs++;				return PUSH_FAILED;			}		}	}	return PUSH_FINISHED;}/* * push_queue(): push out the unsent messages of a link where *               congestion has abated. Node is locked */void tipc_link_push_queue(struct link *l_ptr){	u32 res;	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))		return;	do {		res = tipc_link_push_packet(l_ptr);	}	while (res == TIPC_OK);	if (res == PUSH_FAILED)		tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);}static void link_reset_all(unsigned long addr){	struct node *n_ptr;	char addr_string[16];	u32 i;	read_lock_bh(&tipc_net_lock);	n_ptr = tipc_node_find((u32)addr);	if (!n_ptr) {		read_unlock_bh(&tipc_net_lock);		return;	/* node no longer exists */	}	tipc_node_lock(n_ptr);	warn("Resetting all links to %s\n",	     addr_string_fill(addr_string, n_ptr->addr));	for (i = 0; i < MAX_BEARERS; i++) {		if (n_ptr->links[i]) {			link_print(n_ptr->links[i], TIPC_OUTPUT,				   "Resetting link\n");			tipc_link_reset(n_ptr->links[i]);		}	}	tipc_node_unlock(n_ptr);	read_unlock_bh(&tipc_net_lock);}static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf){	struct tipc_msg *msg = buf_msg(buf);	warn("Retransmission failure on link <%s>\n", l_ptr->name);	tipc_msg_print(TIPC_OUTPUT, msg, ">RETR-FAIL>");	if (l_ptr->addr) {		/* Handle failure on standard link */		link_print(l_ptr, TIPC_OUTPUT, "Resetting link\n");		tipc_link_reset(l_ptr);	} else {		/* Handle failure on broadcast link */		struct node *n_ptr;		char addr_string[16];		tipc_printf(TIPC_OUTPUT, "Msg seq number: %u,  ", msg_seqno(msg));		tipc_printf(TIPC_OUTPUT, "Outstanding acks: %lu\n",				     (unsigned long) TIPC_SKB_CB(buf)->handle);		n_ptr = l_ptr->owner->next;		tipc_node_lock(n_ptr);		addr_string_fill(addr_string, n_ptr->addr);		tipc_printf(TIPC_OUTPUT, "Multicast link info for %s\n", addr_string);		tipc_printf(TIPC_OUTPUT, "Supported: %d,  ", n_ptr->bclink.supported);		tipc_printf(TIPC_OUTPUT, "Acked: %u\n", n_ptr->bclink.acked);		tipc_printf(TIPC_OUTPUT, "Last in: %u,  ", n_ptr->bclink.last_in);		tipc_printf(TIPC_OUTPUT, "Gap after: %u,  ", n_ptr->bclink.gap_after);		tipc_printf(TIPC_OUTPUT, "Gap to: %u\n", n_ptr->bclink.gap_to);		tipc_printf(TIPC_OUTPUT, "Nack sync: %u\n\n", n_ptr->bclink.nack_sync);		tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);		tipc_node_unlock(n_ptr);		l_ptr->stale_count = 0;	}}void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf,			  u32 retransmits){	struct tipc_msg *msg;	if (!buf)		return;	msg = buf_msg(buf);	dbg("Retransmitting %u in link %x\n", retransmits, l_ptr);	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {		if (!skb_cloned(buf)) {			msg_dbg(msg, ">NO_RETR->BCONG>");			dbg_print_link(l_ptr, "   ");			l_ptr->retransm_queue_head = msg_seqno(msg);			l_ptr->retransm_queue_size = retransmits;			return;		} else {			/* Don't retransmit if driver already has the buffer */		}	} else {		/* Detect repeated retransmit failures on uncongested bearer */		if (l_ptr->last_retransmitted == msg_seqno(msg)) {			if (++l_ptr->stale_count > 100) {				link_retransmit_failure(l_ptr, buf);				return;			}		} else {			l_ptr->last_retransmitted = msg_seqno(msg);			l_ptr->stale_count = 1;		}	}	while (retransmits && (buf != l_ptr->next_out) && buf && !skb_cloned(buf)) {		msg = buf_msg(buf);		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {			msg_dbg(buf_msg(buf), ">RETR>");			buf = buf->next;			retransmits--;			l_ptr->stats.retransmitted++;		} else {			tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);			l_ptr->stats.bearer_congs++;			l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf));			l_ptr->retransm_queue_size = retransmits;			return;		}	}	l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;}/* * link_recv_non_seq: Receive packets which are outside *                    the link sequence flow */static void link_recv_non_seq(struct sk_buff *buf){	struct tipc_msg *msg = buf_msg(buf);	if (msg_user(msg) ==  LINK_CONFIG)		tipc_disc_recv_msg(buf);	else		tipc_bclink_recv_pkt(buf);}/** * link_insert_deferred_queue - insert deferred messages back into receive chain */static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr,						  struct sk_buff *buf){	u32 seq_no;	if (l_ptr->oldest_deferred_in == NULL)		return buf;	seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));	if (seq_no == mod(l_ptr->next_in_no)) {		l_ptr->newest_deferred_in->next = buf;		buf = l_ptr->oldest_deferred_in;		l_ptr->oldest_deferred_in = NULL;		l_ptr->deferred_inqueue_sz = 0;	}	return buf;}void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr){	read_lock_bh(&tipc_net_lock);	while (head) {		struct bearer *b_ptr;		struct node *n_ptr;		struct link *l_ptr;		struct sk_buff *crs;		struct sk_buff *buf = head;		struct tipc_msg *msg = buf_msg(buf);		u32 seq_no = msg_seqno(msg);		u32 ackd = msg_ack(msg);		u32 released = 0;		int type;		b_ptr = (struct bearer *)tb_ptr;		TIPC_SKB_CB(buf)->handle = b_ptr;		head = head->next;		if (unlikely(msg_version(msg) != TIPC_VERSION))			goto cont;#if 0		if (msg_user(msg) != LINK_PROTOCOL)#endif			msg_dbg(msg,"<REC<");		if (unlikely(msg_non_seq(msg))) {			link_recv_non_seq(buf);			continue;		}		if (unlikely(!msg_short(msg) &&			     (msg_destnode(msg) != tipc_own_addr)))			goto cont;		n_ptr = tipc_node_find(msg_prevnode(msg));		if (unlikely(!n_ptr))			goto cont;		tipc_node_lock(n_ptr);		l_ptr = n_ptr->links[b_ptr->identity];		if (unlikely(!l_ptr)) {			tipc_node_unlock(n_ptr);			goto cont;		}		/*		 * Release acked messages		 */		if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) {			if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)				tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));		}		crs = l_ptr->first_out;		while ((crs != l_ptr->next_out) &&		       less_eq(msg_seqno(buf_msg(crs)), ackd)) {			struct sk_buff *next = crs->next;			buf_discard(crs);			crs = next;			released++;		}		if (released) {			l_ptr->first_out = crs;			l_ptr->out_queue_size -= released;		}		if (unlikely(l_ptr->next_out))			tipc_link_push_queue(l_ptr);		if (unlikely(!list_empty(&l_ptr->waiting_ports)))			tipc_link_wakeup_ports(l_ptr, 0);		if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {			l_ptr->stats.sent_acks++;			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);		}protocol_check:		if (likely(link_working_working(l_ptr))) {			if (likely(seq_no == mod(l_ptr->next_in_no))) {				l_ptr->next_in_no++;				if (unlikely(l_ptr->oldest_deferred_in))					head = link_insert_deferred_queue(l_ptr,									  head);				if (likely(msg_is_dest(msg, tipc_own_addr))) {deliver:					if (likely(msg_isdata(msg))) {						tipc_node_unlock(n_ptr);						tipc_port_recv_msg(buf);						continue;					}					switch (msg_user(msg)) {					case MSG_BUNDLER:						l_ptr->stats.recv_bundles++;						l_ptr->stats.recv_bundled +=							msg_msgcnt(msg);						tipc_node_unlock(n_ptr);						tipc_link_recv_bundle(buf);						continue;					case ROUTE_DISTRIBUTOR:						tipc_node_unlock(n_ptr);						tipc_cltr_recv_routing_table(buf);						continue;					case NAME_DISTRIBUTOR:						tipc_node_unlock(n_ptr);						tipc_named_recv(buf);						continue;					case CONN_MANAGER:						tipc_node_unlock(n_ptr);						tipc_port_recv_proto_msg(buf);						continue;					case MSG_FRAGMENTER:						l_ptr->stats.recv_fragments++;						if (tipc_link_recv_fragment(&l_ptr->defragm_buf,									    &buf, &msg)) {							l_ptr->stats.recv_fragmented++;							goto deliver;						}						break;					case CHANGEOVER_PROTOCOL:						type = msg_type(msg);						if (link_recv_changeover_msg(&l_ptr, &buf)) {							msg = buf_msg(buf);							seq_no = msg_seqno(msg);							TIPC_SKB_CB(buf)->handle								= b_ptr;							if (type == ORIGINAL_MSG)								goto deliver;							goto protocol_check;						}						break;					}				}				tipc_node_unlock(n_ptr);				tipc_net_route_msg(buf);				continue;			}			link_handle_out_of_seq_msg(l_ptr, buf);			head = link_insert_deferred_queue(l_ptr, head);			tipc_node_unlock(n_ptr);			continue;		}		if (msg_user(msg) == LINK_PROTOCOL) {			link_recv_proto_msg(l_ptr, buf);			head = link_insert_deferred_queue(l_ptr, head);			tipc_node_unlock(n_ptr);			continue;		}		msg_dbg(msg,"NSEQ<REC<");		link_state_event(l_ptr, TRAFFIC_MSG_EVT);		if (link_working_working(l_ptr)) {			/* Re-insert in front of queue */			msg_dbg(msg,"RECV-REINS:");			buf->next = head;			head = buf;			tipc_node_unlock(n_ptr);			continue;		}		tipc_node_unlock(n_ptr);cont:		buf_discard(buf);	}	read_unlock_bh(&tipc_net_lock);}/* * link_defer_buf(): Sort a received out-of-sequence packet *                   into the deferred reception queue. * Returns the increase of the queue length,i.e. 0 or 1 */u32 tipc_link_defer_pkt(struct sk_buff **head,			struct sk_buff **tail,			struct sk_buff *buf){	struct sk_buff *prev = NULL;	struct sk_buff *crs = *head;	u32 seq_no = msg_seqno(buf_msg(buf));	buf->next = NULL;	/* Empty queue ? */	if (*head == NULL) {		*head = *tail = buf;		return 1;	}	/* Last ? */	if (less(msg_seqno(buf_msg(*tail)), seq_no)) {		(*tail)->next = buf;		*tail = buf;		return 1;	}	/* Scan through queue and sort it in */	do {		struct tipc_msg *msg = buf_msg(crs);		if (less(seq_no, msg_seqno(msg))) {			buf->next = crs;			if (prev)				prev->next = buf;			else				*head = buf;			return 1;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -