📄 bcast.c
字号:
#if (TIPC_BCAST_LOSS_RATE) static int rx_count = 0;#endif struct tipc_msg *msg = buf_msg(buf); struct node* node = tipc_node_find(msg_prevnode(msg)); u32 next_in; u32 seqno; struct sk_buff *deferred; msg_dbg(msg, "<BC<<<"); if (unlikely(!node || !tipc_node_is_up(node) || !node->bclink.supported || (msg_mc_netid(msg) != tipc_net_id))) { buf_discard(buf); return; } if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { msg_dbg(msg, "<BCNACK<<<"); if (msg_destnode(msg) == tipc_own_addr) { tipc_node_lock(node); tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); tipc_node_unlock(node); spin_lock_bh(&bc_lock); bcl->stats.recv_nacks++; bcl->owner->next = node; /* remember requestor */ bclink_retransmit_pkt(msg_bcgap_after(msg), msg_bcgap_to(msg)); bcl->owner->next = NULL; spin_unlock_bh(&bc_lock); } else { tipc_bclink_peek_nack(msg_destnode(msg), msg_bcast_tag(msg), msg_bcgap_after(msg), msg_bcgap_to(msg)); } buf_discard(buf); return; }#if (TIPC_BCAST_LOSS_RATE) if (++rx_count == TIPC_BCAST_LOSS_RATE) { rx_count = 0; buf_discard(buf); return; }#endif tipc_node_lock(node);receive: deferred = node->bclink.deferred_head; next_in = mod(node->bclink.last_in + 1); seqno = msg_seqno(msg); if (likely(seqno == next_in)) { bcl->stats.recv_info++; node->bclink.last_in++; bclink_set_gap(node); if (unlikely(bclink_ack_allowed(seqno))) { bclink_send_ack(node); bcl->stats.sent_acks++; } if (likely(msg_isdata(msg))) { tipc_node_unlock(node); tipc_port_recv_mcast(buf, NULL); } else if (msg_user(msg) == MSG_BUNDLER) { bcl->stats.recv_bundles++; bcl->stats.recv_bundled += msg_msgcnt(msg); tipc_node_unlock(node); tipc_link_recv_bundle(buf); } else if (msg_user(msg) == MSG_FRAGMENTER) { bcl->stats.recv_fragments++; if (tipc_link_recv_fragment(&node->bclink.defragm, &buf, &msg)) bcl->stats.recv_fragmented++; tipc_node_unlock(node); tipc_net_route_msg(buf); } else { tipc_node_unlock(node); tipc_net_route_msg(buf); } if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) { tipc_node_lock(node); buf = deferred; msg = buf_msg(buf); node->bclink.deferred_head = deferred->next; goto receive; } return; } else if (less(next_in, seqno)) { u32 gap_after = node->bclink.gap_after; u32 gap_to = node->bclink.gap_to; if (tipc_link_defer_pkt(&node->bclink.deferred_head, &node->bclink.deferred_tail, buf)) { node->bclink.nack_sync++; bcl->stats.deferred_recv++; if (seqno == mod(gap_after + 1)) node->bclink.gap_after = seqno; else if (less(gap_after, seqno) && less(seqno, gap_to)) node->bclink.gap_to = seqno; } if (bclink_ack_allowed(node->bclink.nack_sync)) { if (gap_to != gap_after) bclink_send_nack(node); bclink_set_gap(node); } } else { bcl->stats.duplicates++; buf_discard(buf); } tipc_node_unlock(node);}u32 tipc_bclink_get_last_sent(void){ u32 last_sent = mod(bcl->next_out_no - 1); if (bcl->next_out) last_sent = mod(buf_seqno(bcl->next_out) - 1); return last_sent;}u32 tipc_bclink_acks_missing(struct node *n_ptr){ return (n_ptr->bclink.supported && (tipc_bclink_get_last_sent() != n_ptr->bclink.acked));}/** * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer * * Send through as many bearers as necessary to reach all nodes * that support TIPC multicasting. * * Returns 0 if packet sent successfully, non-zero if not */static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, struct tipc_media_addr *unused2){ static int send_count = 0; int bp_index; int swap_time; /* Prepare buffer for broadcasting (if first time trying to send it) */ if (likely(!msg_non_seq(buf_msg(buf)))) { struct tipc_msg *msg; assert(tipc_cltr_bcast_nodes.count != 0); bcbuf_set_acks(buf, tipc_cltr_bcast_nodes.count); msg = buf_msg(buf); msg_set_non_seq(msg); msg_set_mc_netid(msg, tipc_net_id); } /* Determine if bearer pairs should be swapped following this attempt */ if ((swap_time = (++send_count >= 10))) send_count = 0; /* Send buffer over bearers until all targets reached */ bcbearer->remains = tipc_cltr_bcast_nodes; for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { struct bearer *p = bcbearer->bpairs[bp_index].primary; struct bearer *s = bcbearer->bpairs[bp_index].secondary; if (!p) break; /* no more bearers to try */ tipc_nmap_diff(&bcbearer->remains, &p->nodes, &bcbearer->remains_new); if (bcbearer->remains_new.count == bcbearer->remains.count) continue; /* bearer pair doesn't add anything */ if (!p->publ.blocked && !p->media->send_msg(buf, &p->publ, &p->media->bcast_addr)) { if (swap_time && s && !s->publ.blocked) goto swap; else goto update; } if (!s || s->publ.blocked || s->media->send_msg(buf, &s->publ, &s->media->bcast_addr)) continue; /* unable to send using bearer pair */swap: bcbearer->bpairs[bp_index].primary = s; bcbearer->bpairs[bp_index].secondary = p;update: if (bcbearer->remains_new.count == 0) return TIPC_OK; bcbearer->remains = bcbearer->remains_new; } /* Unable to reach all targets */ bcbearer->bearer.publ.blocked = 1; bcl->stats.bearer_congs++; return ~TIPC_OK;}/** * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer */void tipc_bcbearer_sort(void){ struct bcbearer_pair *bp_temp = bcbearer->bpairs_temp; struct bcbearer_pair *bp_curr; int b_index; int pri; spin_lock_bh(&bc_lock); /* Group bearers by priority (can assume max of two per priority) */ memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); for (b_index = 0; b_index < MAX_BEARERS; b_index++) { struct bearer *b = &tipc_bearers[b_index]; if (!b->active || !b->nodes.count) continue; if (!bp_temp[b->priority].primary) bp_temp[b->priority].primary = b; else bp_temp[b->priority].secondary = b; } /* Create array of bearer pairs for broadcasting */ bp_curr = bcbearer->bpairs; memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) { if (!bp_temp[pri].primary) continue; bp_curr->primary = bp_temp[pri].primary; if (bp_temp[pri].secondary) { if (tipc_nmap_equal(&bp_temp[pri].primary->nodes, &bp_temp[pri].secondary->nodes)) { bp_curr->secondary = bp_temp[pri].secondary; } else { bp_curr++; bp_curr->primary = bp_temp[pri].secondary; } } bp_curr++; } spin_unlock_bh(&bc_lock);}/** * tipc_bcbearer_push - resolve bearer congestion * * Forces bclink to push out any unsent packets, until all packets are gone * or congestion reoccurs. * No locks set when function called */void tipc_bcbearer_push(void){ struct bearer *b_ptr; spin_lock_bh(&bc_lock); b_ptr = &bcbearer->bearer; if (b_ptr->publ.blocked) { b_ptr->publ.blocked = 0; tipc_bearer_lock_push(b_ptr); } spin_unlock_bh(&bc_lock);}int tipc_bclink_stats(char *buf, const u32 buf_size){ struct print_buf pb; if (!bcl) return 0; tipc_printbuf_init(&pb, buf, buf_size); spin_lock_bh(&bc_lock); tipc_printf(&pb, "Link <%s>\n" " Window:%u packets\n", bcl->name, bcl->queue_limit[0]); tipc_printf(&pb, " RX packets:%u fragments:%u/%u bundles:%u/%u\n", bcl->stats.recv_info, bcl->stats.recv_fragments, bcl->stats.recv_fragmented, bcl->stats.recv_bundles, bcl->stats.recv_bundled); tipc_printf(&pb, " TX packets:%u fragments:%u/%u bundles:%u/%u\n", bcl->stats.sent_info, bcl->stats.sent_fragments, bcl->stats.sent_fragmented, bcl->stats.sent_bundles, bcl->stats.sent_bundled); tipc_printf(&pb, " RX naks:%u defs:%u dups:%u\n", bcl->stats.recv_nacks, bcl->stats.deferred_recv, bcl->stats.duplicates); tipc_printf(&pb, " TX naks:%u acks:%u dups:%u\n", bcl->stats.sent_nacks, bcl->stats.sent_acks, bcl->stats.retransmitted); tipc_printf(&pb, " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n", bcl->stats.bearer_congs, bcl->stats.link_congs, bcl->stats.max_queue_sz, bcl->stats.queue_sz_counts ? (bcl->stats.accu_queue_sz / bcl->stats.queue_sz_counts) : 0); spin_unlock_bh(&bc_lock); return tipc_printbuf_validate(&pb);}int tipc_bclink_reset_stats(void){ if (!bcl) return -ENOPROTOOPT; spin_lock_bh(&bc_lock); memset(&bcl->stats, 0, sizeof(bcl->stats)); spin_unlock_bh(&bc_lock); return TIPC_OK;}int tipc_bclink_set_queue_limits(u32 limit){ if (!bcl) return -ENOPROTOOPT; if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN)) return -EINVAL; spin_lock_bh(&bc_lock); tipc_link_set_queue_limits(bcl, limit); spin_unlock_bh(&bc_lock); return TIPC_OK;}int tipc_bclink_init(void){ bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); if (!bcbearer || !bclink) { nomem: warn("Multicast link creation failed, no memory\n"); kfree(bcbearer); bcbearer = NULL; kfree(bclink); bclink = NULL; return -ENOMEM; } INIT_LIST_HEAD(&bcbearer->bearer.cong_links); bcbearer->bearer.media = &bcbearer->media; bcbearer->media.send_msg = tipc_bcbearer_send; sprintf(bcbearer->media.name, "tipc-multicast"); bcl = &bclink->link; INIT_LIST_HEAD(&bcl->waiting_ports); bcl->next_out_no = 1; spin_lock_init(&bclink->node.lock); bcl->owner = &bclink->node; bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); bcl->b_ptr = &bcbearer->bearer; bcl->state = WORKING_WORKING; sprintf(bcl->name, tipc_bclink_name); if (BCLINK_LOG_BUF_SIZE) { char *pb = kmalloc(BCLINK_LOG_BUF_SIZE, GFP_ATOMIC); if (!pb) goto nomem; tipc_printbuf_init(&bcl->print_buf, pb, BCLINK_LOG_BUF_SIZE); } return TIPC_OK;}void tipc_bclink_stop(void){ spin_lock_bh(&bc_lock); if (bcbearer) { tipc_link_stop(bcl); if (BCLINK_LOG_BUF_SIZE) kfree(bcl->print_buf.buf); bcl = NULL; kfree(bclink); bclink = NULL; kfree(bcbearer); bcbearer = NULL; } spin_unlock_bh(&bc_lock);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -