📄 link.c
字号:
} if (seq_no == msg_seqno(msg)) { break; } prev = crs; crs = crs->next; } while (crs); /* Message is a duplicate of an existing message */ buf_discard(buf); return 0;}/** * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet */static void link_handle_out_of_seq_msg(struct link *l_ptr, struct sk_buff *buf){ u32 seq_no = msg_seqno(buf_msg(buf)); if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) { link_recv_proto_msg(l_ptr, buf); return; } dbg("rx OOS msg: seq_no %u, expecting %u (%u)\n", seq_no, mod(l_ptr->next_in_no), l_ptr->next_in_no); /* Record OOS packet arrival (force mismatch on next timeout) */ l_ptr->checkpoint--; /* * Discard packet if a duplicate; otherwise add it to deferred queue * and notify peer of gap as per protocol specification */ if (less(seq_no, mod(l_ptr->next_in_no))) { l_ptr->stats.duplicates++; buf_discard(buf); return; } if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in, &l_ptr->newest_deferred_in, buf)) { l_ptr->deferred_inqueue_sz++; l_ptr->stats.deferred_recv++; if ((l_ptr->deferred_inqueue_sz % 16) == 1) tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); } else l_ptr->stats.duplicates++;}/* * Send protocol message to the other endpoint. */void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg, u32 gap, u32 tolerance, u32 priority, u32 ack_mtu){ struct sk_buff *buf = NULL; struct tipc_msg *msg = l_ptr->pmsg; u32 msg_size = sizeof(l_ptr->proto_msg); if (link_blocked(l_ptr)) return; msg_set_type(msg, msg_typ); msg_set_net_plane(msg, l_ptr->b_ptr->net_plane); msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); msg_set_last_bcast(msg, tipc_bclink_get_last_sent()); if (msg_typ == STATE_MSG) { u32 next_sent = mod(l_ptr->next_out_no); if (!tipc_link_is_up(l_ptr)) return; if (l_ptr->next_out) next_sent = msg_seqno(buf_msg(l_ptr->next_out)); msg_set_next_sent(msg, next_sent); if (l_ptr->oldest_deferred_in) { u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in)); gap = mod(rec - mod(l_ptr->next_in_no)); } msg_set_seq_gap(msg, gap); if (gap) l_ptr->stats.sent_nacks++; msg_set_link_tolerance(msg, tolerance); msg_set_linkprio(msg, priority); msg_set_max_pkt(msg, ack_mtu); msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); msg_set_probe(msg, probe_msg != 0); if (probe_msg) { u32 mtu = l_ptr->max_pkt; if ((mtu < l_ptr->max_pkt_target) && link_working_working(l_ptr) && l_ptr->fsm_msg_cnt) { msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; if (l_ptr->max_pkt_probes == 10) { l_ptr->max_pkt_target = (msg_size - 4); l_ptr->max_pkt_probes = 0; msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; } l_ptr->max_pkt_probes++; } l_ptr->stats.sent_probes++; } l_ptr->stats.sent_states++; } else { /* RESET_MSG or ACTIVATE_MSG */ msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1)); msg_set_seq_gap(msg, 0); msg_set_next_sent(msg, 1); msg_set_link_tolerance(msg, l_ptr->tolerance); msg_set_linkprio(msg, l_ptr->priority); msg_set_max_pkt(msg, l_ptr->max_pkt_target); } if (tipc_node_has_redundant_links(l_ptr->owner)) { msg_set_redundant_link(msg); } else { msg_clear_redundant_link(msg); } msg_set_linkprio(msg, l_ptr->priority); /* Ensure sequence number will not fit : */ msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2))); /* Congestion? */ if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { if (!l_ptr->proto_msg_queue) { l_ptr->proto_msg_queue = buf_acquire(sizeof(l_ptr->proto_msg)); } buf = l_ptr->proto_msg_queue; if (!buf) return; skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); return; } msg_set_timestamp(msg, jiffies_to_msecs(jiffies)); /* Message can be sent */ msg_dbg(msg, ">>"); buf = buf_acquire(msg_size); if (!buf) return; skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); msg_set_size(buf_msg(buf), msg_size); if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { l_ptr->unacked_window = 0; buf_discard(buf); return; } /* New congestion */ tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); l_ptr->proto_msg_queue = buf; l_ptr->stats.bearer_congs++;}/* * Receive protocol message : * Note that network plane id propagates through the network, and may * change at any time. The node with lowest address rules */static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf){ u32 rec_gap = 0; u32 max_pkt_info; u32 max_pkt_ack; u32 msg_tol; struct tipc_msg *msg = buf_msg(buf); dbg("AT(%u):", jiffies_to_msecs(jiffies)); msg_dbg(msg, "<<"); if (link_blocked(l_ptr)) goto exit; /* record unnumbered packet arrival (force mismatch on next timeout) */ l_ptr->checkpoint--; if (l_ptr->b_ptr->net_plane != msg_net_plane(msg)) if (tipc_own_addr > msg_prevnode(msg)) l_ptr->b_ptr->net_plane = msg_net_plane(msg); l_ptr->owner->permit_changeover = msg_redundant_link(msg); switch (msg_type(msg)) { case RESET_MSG: if (!link_working_unknown(l_ptr) && l_ptr->peer_session) { if (msg_session(msg) == l_ptr->peer_session) { dbg("Duplicate RESET: %u<->%u\n", msg_session(msg), l_ptr->peer_session); break; /* duplicate: ignore */ } } /* fall thru' */ case ACTIVATE_MSG: /* Update link settings according other endpoint's values */ strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg)); if ((msg_tol = msg_link_tolerance(msg)) && (msg_tol > l_ptr->tolerance)) link_set_supervision_props(l_ptr, msg_tol); if (msg_linkprio(msg) > l_ptr->priority) l_ptr->priority = msg_linkprio(msg); max_pkt_info = msg_max_pkt(msg); if (max_pkt_info) { if (max_pkt_info < l_ptr->max_pkt_target) l_ptr->max_pkt_target = max_pkt_info; if (l_ptr->max_pkt > l_ptr->max_pkt_target) l_ptr->max_pkt = l_ptr->max_pkt_target; } else { l_ptr->max_pkt = l_ptr->max_pkt_target; } l_ptr->owner->bclink.supported = (max_pkt_info != 0); link_state_event(l_ptr, msg_type(msg)); l_ptr->peer_session = msg_session(msg); l_ptr->peer_bearer_id = msg_bearer_id(msg); /* Synchronize broadcast sequence numbers */ if (!tipc_node_has_redundant_links(l_ptr->owner)) { l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg)); } break; case STATE_MSG: if ((msg_tol = msg_link_tolerance(msg))) link_set_supervision_props(l_ptr, msg_tol); if (msg_linkprio(msg) && (msg_linkprio(msg) != l_ptr->priority)) { warn("Resetting link <%s>, priority change %u->%u\n", l_ptr->name, l_ptr->priority, msg_linkprio(msg)); l_ptr->priority = msg_linkprio(msg); tipc_link_reset(l_ptr); /* Enforce change to take effect */ break; } link_state_event(l_ptr, TRAFFIC_MSG_EVT); l_ptr->stats.recv_states++; if (link_reset_unknown(l_ptr)) break; if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) { rec_gap = mod(msg_next_sent(msg) - mod(l_ptr->next_in_no)); } max_pkt_ack = msg_max_pkt(msg); if (max_pkt_ack > l_ptr->max_pkt) { dbg("Link <%s> updated MTU %u -> %u\n", l_ptr->name, l_ptr->max_pkt, max_pkt_ack); l_ptr->max_pkt = max_pkt_ack; l_ptr->max_pkt_probes = 0; } max_pkt_ack = 0; if (msg_probe(msg)) { l_ptr->stats.recv_probes++; if (msg_size(msg) > sizeof(l_ptr->proto_msg)) { max_pkt_ack = msg_size(msg); } } /* Protocol message before retransmits, reduce loss risk */ tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg)); if (rec_gap || (msg_probe(msg))) { tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, rec_gap, 0, 0, max_pkt_ack); } if (msg_seq_gap(msg)) { msg_dbg(msg, "With Gap:"); l_ptr->stats.recv_nacks++; tipc_link_retransmit(l_ptr, l_ptr->first_out, msg_seq_gap(msg)); } break; default: msg_dbg(buf_msg(buf), "<DISCARDING UNKNOWN<"); }exit: buf_discard(buf);}/* * tipc_link_tunnel(): Send one message via a link belonging to * another bearer. Owner node is locked. */void tipc_link_tunnel(struct link *l_ptr, struct tipc_msg *tunnel_hdr, struct tipc_msg *msg, u32 selector){ struct link *tunnel; struct sk_buff *buf; u32 length = msg_size(msg); tunnel = l_ptr->owner->active_links[selector & 1]; if (!tipc_link_is_up(tunnel)) { warn("Link changeover error, " "tunnel link no longer available\n"); return; } msg_set_size(tunnel_hdr, length + INT_H_SIZE); buf = buf_acquire(length + INT_H_SIZE); if (!buf) { warn("Link changeover error, " "unable to send tunnel msg\n"); return; } skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane); msg_dbg(buf_msg(buf), ">SEND>"); tipc_link_send_buf(tunnel, buf);}/* * changeover(): Send whole message queue via the remaining link * Owner node is locked. */void tipc_link_changeover(struct link *l_ptr){ u32 msgcount = l_ptr->out_queue_size; struct sk_buff *crs = l_ptr->first_out; struct link *tunnel = l_ptr->owner->active_links[0]; struct tipc_msg tunnel_hdr; int split_bundles; if (!tunnel) return; if (!l_ptr->owner->permit_changeover) { warn("Link changeover error, " "peer did not permit changeover\n"); return; } msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, ORIGINAL_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr); msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); msg_set_msgcnt(&tunnel_hdr, msgcount); dbg("Link changeover requires %u tunnel messages\n", msgcount); if (!l_ptr->first_out) { struct sk_buff *buf; buf = buf_acquire(INT_H_SIZE); if (buf) { skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE); msg_set_size(&tunnel_hdr, INT_H_SIZE); dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane); msg_dbg(&tunnel_hdr, "EMPTY>SEND>"); tipc_link_send_buf(tunnel, buf); } else { warn("Link changeover error, " "unable to send changeover msg\n"); } return; } split_bundles = (l_ptr->owner->active_links[0] != l_ptr->owner->active_links[1]); while (crs) { struct tipc_msg *msg = buf_msg(crs); if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { struct tipc_msg *m = msg_get_wrapped(msg); unchar* pos = (unchar*)m; msgcount = msg_msgcnt(msg); while (msgcount--) { msg_set_seqno(m,msg_seqno(msg)); tipc_link_tunnel(l_ptr, &tunnel_hdr, m, msg_link_selector(m)); pos += align(msg_size(m)); m = (struct tipc_msg *)pos; } } else { tipc_link_tunnel(l_ptr, &tunnel_hdr, msg, msg_link_selector(msg)); } crs = crs->next; }}void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel){ struct sk_buff *iter; struct tipc_msg tunnel_hdr; msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, DUPLICATE_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr); msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size); msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); iter = l_ptr->first_out; while (iter) { struct sk_buff *outbuf; struct tipc_msg *msg = buf_msg(iter); u32 length = msg_size(msg); if (msg_user(msg) == MSG_BUNDLER) msg_set_type(msg, CLOSED_MSG); msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); msg_set_size(&tunnel_hdr, length + INT_H_SIZE); outbuf = buf_acquire(length + INT_H_SIZE); if (outbuf == NULL) { warn("Link changeover error, " "unable to send duplicate msg\n"); return; } skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, length); dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane); msg_dbg(buf_msg(outbuf), ">SEND>"); tipc_link_send_buf(tunnel, outbuf); if (!tipc_link_is_up(l_ptr)) return; iter = iter->next; }}/** * buf_extract - extracts embedded TIPC message from another message * @skb: encapsulating message buffer * @from_pos: offset to extract from * * Returns a new message buffer containing an embedded message. The * encapsulating message itself is left unchanged. */static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos){ struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos); u32 size = msg_size(msg); struct sk_buff *eb; eb = buf_acquire(size); if (eb) skb_copy_to_linear_data(eb, msg, size); return eb;}/* * link_recv_changeover_msg(): Receive tunneled packet sent * via other link. Node is locked. Return extracted buffer. */static int link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf){ struct sk_buff *tunnel_buf = *buf; struct link *dest_link; struct tipc_msg *msg; struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf); u32 msg_typ = msg_type(tunnel_msg); u32 msg_count = msg_msgcnt(tunnel_msg); dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)]; if (!dest_link) { msg_dbg(tunnel_msg, "NOLINK/<REC<"); goto exit; } if (dest_link == *l_ptr) { err("Unexpected changeove
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -