📄 link.c
字号:
struct tipc_msg *msg = buf_msg(buf); l_ptr->stats.sent_fragments++; msg_set_long_msgno(msg, l_ptr->long_msg_seq_no); link_add_to_outqueue(l_ptr, buf, msg); msg_dbg(msg, ">ADD>"); buf = next; } /* Send it, if possible: */ tipc_link_push_queue(l_ptr); tipc_node_unlock(node); return dsz;}/* * tipc_link_push_packet: Push one unsent packet to the media */u32 tipc_link_push_packet(struct link *l_ptr){ struct sk_buff *buf = l_ptr->first_out; u32 r_q_size = l_ptr->retransm_queue_size; u32 r_q_head = l_ptr->retransm_queue_head; /* Step to position where retransmission failed, if any, */ /* consider that buffers may have been released in meantime */ if (r_q_size && buf) { u32 last = lesser(mod(r_q_head + r_q_size), link_last_sent(l_ptr)); u32 first = msg_seqno(buf_msg(buf)); while (buf && less(first, r_q_head)) { first = mod(first + 1); buf = buf->next; } l_ptr->retransm_queue_head = r_q_head = first; l_ptr->retransm_queue_size = r_q_size = mod(last - first); } /* Continue retransmission now, if there is anything: */ if (r_q_size && buf && !skb_cloned(buf)) { msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { msg_dbg(buf_msg(buf), ">DEF-RETR>"); l_ptr->retransm_queue_head = mod(++r_q_head); l_ptr->retransm_queue_size = --r_q_size; l_ptr->stats.retransmitted++; return TIPC_OK; } else { l_ptr->stats.bearer_congs++; msg_dbg(buf_msg(buf), "|>DEF-RETR>"); return PUSH_FAILED; } } /* Send deferred protocol message, if any: */ buf = l_ptr->proto_msg_queue; if (buf) { msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in); if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { msg_dbg(buf_msg(buf), ">DEF-PROT>"); l_ptr->unacked_window = 0; buf_discard(buf); l_ptr->proto_msg_queue = NULL; return TIPC_OK; } else { msg_dbg(buf_msg(buf), "|>DEF-PROT>"); l_ptr->stats.bearer_congs++; return PUSH_FAILED; } } /* Send one deferred data message, if send window not full: */ buf = l_ptr->next_out; if (buf) { struct tipc_msg *msg = buf_msg(buf); u32 next = msg_seqno(msg); u32 first = msg_seqno(buf_msg(l_ptr->first_out)); if (mod(next - first) < l_ptr->queue_limit[0]) { msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { if (msg_user(msg) == MSG_BUNDLER) msg_set_type(msg, CLOSED_MSG); msg_dbg(msg, ">PUSH-DATA>"); l_ptr->next_out = buf->next; return TIPC_OK; } else { msg_dbg(msg, "|PUSH-DATA|"); l_ptr->stats.bearer_congs++; return PUSH_FAILED; } } } return PUSH_FINISHED;}/* * push_queue(): push out the unsent messages of a link where * congestion has abated. Node is locked */void tipc_link_push_queue(struct link *l_ptr){ u32 res; if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) return; do { res = tipc_link_push_packet(l_ptr); } while (res == TIPC_OK); if (res == PUSH_FAILED) tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);}static void link_reset_all(unsigned long addr){ struct node *n_ptr; char addr_string[16]; u32 i; read_lock_bh(&tipc_net_lock); n_ptr = tipc_node_find((u32)addr); if (!n_ptr) { read_unlock_bh(&tipc_net_lock); return; /* node no longer exists */ } tipc_node_lock(n_ptr); warn("Resetting all links to %s\n", addr_string_fill(addr_string, n_ptr->addr)); for (i = 0; i < MAX_BEARERS; i++) { if (n_ptr->links[i]) { link_print(n_ptr->links[i], TIPC_OUTPUT, "Resetting link\n"); tipc_link_reset(n_ptr->links[i]); } } tipc_node_unlock(n_ptr); read_unlock_bh(&tipc_net_lock);}static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf){ struct tipc_msg *msg = buf_msg(buf); warn("Retransmission failure on link <%s>\n", l_ptr->name); tipc_msg_print(TIPC_OUTPUT, msg, ">RETR-FAIL>"); if (l_ptr->addr) { /* Handle failure on standard link */ link_print(l_ptr, TIPC_OUTPUT, "Resetting link\n"); tipc_link_reset(l_ptr); } else { /* Handle failure on broadcast link */ struct node *n_ptr; char addr_string[16]; tipc_printf(TIPC_OUTPUT, "Msg seq number: %u, ", msg_seqno(msg)); tipc_printf(TIPC_OUTPUT, "Outstanding acks: %lu\n", (unsigned long) TIPC_SKB_CB(buf)->handle); n_ptr = l_ptr->owner->next; tipc_node_lock(n_ptr); addr_string_fill(addr_string, n_ptr->addr); tipc_printf(TIPC_OUTPUT, "Multicast link info for %s\n", addr_string); tipc_printf(TIPC_OUTPUT, "Supported: %d, ", n_ptr->bclink.supported); tipc_printf(TIPC_OUTPUT, "Acked: %u\n", n_ptr->bclink.acked); tipc_printf(TIPC_OUTPUT, "Last in: %u, ", n_ptr->bclink.last_in); tipc_printf(TIPC_OUTPUT, "Gap after: %u, ", n_ptr->bclink.gap_after); tipc_printf(TIPC_OUTPUT, "Gap to: %u\n", n_ptr->bclink.gap_to); tipc_printf(TIPC_OUTPUT, "Nack sync: %u\n\n", n_ptr->bclink.nack_sync); tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr); tipc_node_unlock(n_ptr); l_ptr->stale_count = 0; }}void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf, u32 retransmits){ struct tipc_msg *msg; if (!buf) return; msg = buf_msg(buf); dbg("Retransmitting %u in link %x\n", retransmits, l_ptr); if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { if (!skb_cloned(buf)) { msg_dbg(msg, ">NO_RETR->BCONG>"); dbg_print_link(l_ptr, " "); l_ptr->retransm_queue_head = msg_seqno(msg); l_ptr->retransm_queue_size = retransmits; return; } else { /* Don't retransmit if driver already has the buffer */ } } else { /* Detect repeated retransmit failures on uncongested bearer */ if (l_ptr->last_retransmitted == msg_seqno(msg)) { if (++l_ptr->stale_count > 100) { link_retransmit_failure(l_ptr, buf); return; } } else { l_ptr->last_retransmitted = msg_seqno(msg); l_ptr->stale_count = 1; } } while (retransmits && (buf != l_ptr->next_out) && buf && !skb_cloned(buf)) { msg = buf_msg(buf); msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { msg_dbg(buf_msg(buf), ">RETR>"); buf = buf->next; retransmits--; l_ptr->stats.retransmitted++; } else { tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); l_ptr->stats.bearer_congs++; l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf)); l_ptr->retransm_queue_size = retransmits; return; } } l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;}/* * link_recv_non_seq: Receive packets which are outside * the link sequence flow */static void link_recv_non_seq(struct sk_buff *buf){ struct tipc_msg *msg = buf_msg(buf); if (msg_user(msg) == LINK_CONFIG) tipc_disc_recv_msg(buf); else tipc_bclink_recv_pkt(buf);}/** * link_insert_deferred_queue - insert deferred messages back into receive chain */static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr, struct sk_buff *buf){ u32 seq_no; if (l_ptr->oldest_deferred_in == NULL) return buf; seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in)); if (seq_no == mod(l_ptr->next_in_no)) { l_ptr->newest_deferred_in->next = buf; buf = l_ptr->oldest_deferred_in; l_ptr->oldest_deferred_in = NULL; l_ptr->deferred_inqueue_sz = 0; } return buf;}void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr){ read_lock_bh(&tipc_net_lock); while (head) { struct bearer *b_ptr; struct node *n_ptr; struct link *l_ptr; struct sk_buff *crs; struct sk_buff *buf = head; struct tipc_msg *msg = buf_msg(buf); u32 seq_no = msg_seqno(msg); u32 ackd = msg_ack(msg); u32 released = 0; int type; b_ptr = (struct bearer *)tb_ptr; TIPC_SKB_CB(buf)->handle = b_ptr; head = head->next; if (unlikely(msg_version(msg) != TIPC_VERSION)) goto cont;#if 0 if (msg_user(msg) != LINK_PROTOCOL)#endif msg_dbg(msg,"<REC<"); if (unlikely(msg_non_seq(msg))) { link_recv_non_seq(buf); continue; } if (unlikely(!msg_short(msg) && (msg_destnode(msg) != tipc_own_addr))) goto cont; n_ptr = tipc_node_find(msg_prevnode(msg)); if (unlikely(!n_ptr)) goto cont; tipc_node_lock(n_ptr); l_ptr = n_ptr->links[b_ptr->identity]; if (unlikely(!l_ptr)) { tipc_node_unlock(n_ptr); goto cont; } /* * Release acked messages */ if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) { if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported) tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); } crs = l_ptr->first_out; while ((crs != l_ptr->next_out) && less_eq(msg_seqno(buf_msg(crs)), ackd)) { struct sk_buff *next = crs->next; buf_discard(crs); crs = next; released++; } if (released) { l_ptr->first_out = crs; l_ptr->out_queue_size -= released; } if (unlikely(l_ptr->next_out)) tipc_link_push_queue(l_ptr); if (unlikely(!list_empty(&l_ptr->waiting_ports))) tipc_link_wakeup_ports(l_ptr, 0); if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { l_ptr->stats.sent_acks++; tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); }protocol_check: if (likely(link_working_working(l_ptr))) { if (likely(seq_no == mod(l_ptr->next_in_no))) { l_ptr->next_in_no++; if (unlikely(l_ptr->oldest_deferred_in)) head = link_insert_deferred_queue(l_ptr, head); if (likely(msg_is_dest(msg, tipc_own_addr))) {deliver: if (likely(msg_isdata(msg))) { tipc_node_unlock(n_ptr); tipc_port_recv_msg(buf); continue; } switch (msg_user(msg)) { case MSG_BUNDLER: l_ptr->stats.recv_bundles++; l_ptr->stats.recv_bundled += msg_msgcnt(msg); tipc_node_unlock(n_ptr); tipc_link_recv_bundle(buf); continue; case ROUTE_DISTRIBUTOR: tipc_node_unlock(n_ptr); tipc_cltr_recv_routing_table(buf); continue; case NAME_DISTRIBUTOR: tipc_node_unlock(n_ptr); tipc_named_recv(buf); continue; case CONN_MANAGER: tipc_node_unlock(n_ptr); tipc_port_recv_proto_msg(buf); continue; case MSG_FRAGMENTER: l_ptr->stats.recv_fragments++; if (tipc_link_recv_fragment(&l_ptr->defragm_buf, &buf, &msg)) { l_ptr->stats.recv_fragmented++; goto deliver; } break; case CHANGEOVER_PROTOCOL: type = msg_type(msg); if (link_recv_changeover_msg(&l_ptr, &buf)) { msg = buf_msg(buf); seq_no = msg_seqno(msg); TIPC_SKB_CB(buf)->handle = b_ptr; if (type == ORIGINAL_MSG) goto deliver; goto protocol_check; } break; } } tipc_node_unlock(n_ptr); tipc_net_route_msg(buf); continue; } link_handle_out_of_seq_msg(l_ptr, buf); head = link_insert_deferred_queue(l_ptr, head); tipc_node_unlock(n_ptr); continue; } if (msg_user(msg) == LINK_PROTOCOL) { link_recv_proto_msg(l_ptr, buf); head = link_insert_deferred_queue(l_ptr, head); tipc_node_unlock(n_ptr); continue; } msg_dbg(msg,"NSEQ<REC<"); link_state_event(l_ptr, TRAFFIC_MSG_EVT); if (link_working_working(l_ptr)) { /* Re-insert in front of queue */ msg_dbg(msg,"RECV-REINS:"); buf->next = head; head = buf; tipc_node_unlock(n_ptr); continue; } tipc_node_unlock(n_ptr);cont: buf_discard(buf); } read_unlock_bh(&tipc_net_lock);}/* * link_defer_buf(): Sort a received out-of-sequence packet * into the deferred reception queue. * Returns the increase of the queue length,i.e. 0 or 1 */u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, struct sk_buff *buf){ struct sk_buff *prev = NULL; struct sk_buff *crs = *head; u32 seq_no = msg_seqno(buf_msg(buf)); buf->next = NULL; /* Empty queue ? */ if (*head == NULL) { *head = *tail = buf; return 1; } /* Last ? */ if (less(msg_seqno(buf_msg(*tail)), seq_no)) { (*tail)->next = buf; *tail = buf; return 1; } /* Scan through queue and sort it in */ do { struct tipc_msg *msg = buf_msg(crs); if (less(seq_no, msg_seqno(msg))) { buf->next = crs; if (prev) prev->next = buf; else *head = buf; return 1;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -