📄 port.c
字号:
u32 imp = msg_importance(&p_ptr->publ.phdr); if (!p_ptr->publ.connected) return NULL; if (imp < TIPC_CRITICAL_IMPORTANCE) imp++; return port_build_proto_msg(port_peerport(p_ptr), port_peernode(p_ptr), p_ptr->publ.ref, tipc_own_addr, imp, TIPC_CONN_MSG, err, port_out_seqno(p_ptr), 0);}void tipc_port_recv_proto_msg(struct sk_buff *buf){ struct tipc_msg *msg = buf_msg(buf); struct port *p_ptr = tipc_port_lock(msg_destport(msg)); u32 err = TIPC_OK; struct sk_buff *r_buf = NULL; struct sk_buff *abort_buf = NULL; msg_dbg(msg, "PORT<RECV<:"); if (!p_ptr) { err = TIPC_ERR_NO_PORT; } else if (p_ptr->publ.connected) { if (port_peernode(p_ptr) != msg_orignode(msg)) err = TIPC_ERR_NO_PORT; if (port_peerport(p_ptr) != msg_origport(msg)) err = TIPC_ERR_NO_PORT; if (!err && msg_routed(msg)) { u32 seqno = msg_transp_seqno(msg); u32 myno = ++p_ptr->last_in_seqno; if (seqno != myno) { err = TIPC_ERR_NO_PORT; abort_buf = port_build_self_abort_msg(p_ptr, err); } } if (msg_type(msg) == CONN_ACK) { int wakeup = tipc_port_congested(p_ptr) && p_ptr->publ.congested && p_ptr->wakeup; p_ptr->acked += msg_msgcnt(msg); if (tipc_port_congested(p_ptr)) goto exit; p_ptr->publ.congested = 0; if (!wakeup) goto exit; p_ptr->wakeup(&p_ptr->publ); goto exit; } } else if (p_ptr->publ.published) { err = TIPC_ERR_NO_PORT; } if (err) { r_buf = port_build_proto_msg(msg_origport(msg), msg_orignode(msg), msg_destport(msg), tipc_own_addr, DATA_HIGH, TIPC_CONN_MSG, err, 0, 0); goto exit; } /* All is fine */ if (msg_type(msg) == CONN_PROBE) { r_buf = port_build_proto_msg(msg_origport(msg), msg_orignode(msg), msg_destport(msg), tipc_own_addr, CONN_MANAGER, CONN_PROBE_REPLY, TIPC_OK, port_out_seqno(p_ptr), 0); } p_ptr->probing_state = CONFIRMED; port_incr_out_seqno(p_ptr);exit: if (p_ptr) tipc_port_unlock(p_ptr); tipc_net_route_msg(r_buf); tipc_net_route_msg(abort_buf); buf_discard(buf);}static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id){ struct publication *publ; if (full_id) tipc_printf(buf, "<%u.%u.%u:%u>:", tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), tipc_node(tipc_own_addr), p_ptr->publ.ref); else tipc_printf(buf, "%-10u:", p_ptr->publ.ref); if (p_ptr->publ.connected) { u32 dport = port_peerport(p_ptr); u32 destnode = port_peernode(p_ptr); tipc_printf(buf, " connected to <%u.%u.%u:%u>", tipc_zone(destnode), tipc_cluster(destnode), tipc_node(destnode), dport); if (p_ptr->publ.conn_type != 0) tipc_printf(buf, " via {%u,%u}", p_ptr->publ.conn_type, p_ptr->publ.conn_instance); } else if (p_ptr->publ.published) { tipc_printf(buf, " bound to"); list_for_each_entry(publ, &p_ptr->publications, pport_list) { if (publ->lower == publ->upper) tipc_printf(buf, " {%u,%u}", publ->type, publ->lower); else tipc_printf(buf, " {%u,%u,%u}", publ->type, publ->lower, publ->upper); } } tipc_printf(buf, "\n");}#define MAX_PORT_QUERY 32768struct sk_buff *tipc_port_get_ports(void){ struct sk_buff *buf; struct tlv_desc *rep_tlv; struct print_buf pb; struct port *p_ptr; int str_len; buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY)); if (!buf) return NULL; rep_tlv = (struct tlv_desc *)buf->data; tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY); spin_lock_bh(&tipc_port_list_lock); list_for_each_entry(p_ptr, &ports, port_list) { spin_lock_bh(p_ptr->publ.lock); port_print(p_ptr, &pb, 0); spin_unlock_bh(p_ptr->publ.lock); } spin_unlock_bh(&tipc_port_list_lock); str_len = tipc_printbuf_validate(&pb); skb_put(buf, TLV_SPACE(str_len)); TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); return buf;}#if 0#define MAX_PORT_STATS 2000struct sk_buff *port_show_stats(const void *req_tlv_area, int req_tlv_space){ u32 ref; struct port *p_ptr; struct sk_buff *buf; struct tlv_desc *rep_tlv; struct print_buf pb; int str_len; if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_PORT_REF)) return cfg_reply_error_string(TIPC_CFG_TLV_ERROR); ref = *(u32 *)TLV_DATA(req_tlv_area); ref = ntohl(ref); p_ptr = tipc_port_lock(ref); if (!p_ptr) return cfg_reply_error_string("port not found"); buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_STATS)); if (!buf) { tipc_port_unlock(p_ptr); return NULL; } rep_tlv = (struct tlv_desc *)buf->data; tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_STATS); port_print(p_ptr, &pb, 1); /* NEED TO FILL IN ADDITIONAL PORT STATISTICS HERE */ tipc_port_unlock(p_ptr); str_len = tipc_printbuf_validate(&pb); skb_put(buf, TLV_SPACE(str_len)); TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); return buf;}#endifvoid tipc_port_reinit(void){ struct port *p_ptr; struct tipc_msg *msg; spin_lock_bh(&tipc_port_list_lock); list_for_each_entry(p_ptr, &ports, port_list) { msg = &p_ptr->publ.phdr; if (msg_orignode(msg) == tipc_own_addr) break; msg_set_orignode(msg, tipc_own_addr); } spin_unlock_bh(&tipc_port_list_lock);}/* * port_dispatcher_sigh(): Signal handler for messages destinated * to the tipc_port interface. */static void port_dispatcher_sigh(void *dummy){ struct sk_buff *buf; spin_lock_bh(&queue_lock); buf = msg_queue_head; msg_queue_head = NULL; spin_unlock_bh(&queue_lock); while (buf) { struct port *p_ptr; struct user_port *up_ptr; struct tipc_portid orig; struct tipc_name_seq dseq; void *usr_handle; int connected; int published; u32 message_type; struct sk_buff *next = buf->next; struct tipc_msg *msg = buf_msg(buf); u32 dref = msg_destport(msg); message_type = msg_type(msg); if (message_type > TIPC_DIRECT_MSG) goto reject; /* Unsupported message type */ p_ptr = tipc_port_lock(dref); if (!p_ptr) goto reject; /* Port deleted while msg in queue */ orig.ref = msg_origport(msg); orig.node = msg_orignode(msg); up_ptr = p_ptr->user_port; usr_handle = up_ptr->usr_handle; connected = p_ptr->publ.connected; published = p_ptr->publ.published; if (unlikely(msg_errcode(msg))) goto err; switch (message_type) { case TIPC_CONN_MSG:{ tipc_conn_msg_event cb = up_ptr->conn_msg_cb; u32 peer_port = port_peerport(p_ptr); u32 peer_node = port_peernode(p_ptr); spin_unlock_bh(p_ptr->publ.lock); if (unlikely(!connected)) { if (unlikely(published)) goto reject; tipc_connect2port(dref,&orig); } if (unlikely(msg_origport(msg) != peer_port)) goto reject; if (unlikely(msg_orignode(msg) != peer_node)) goto reject; if (unlikely(!cb)) goto reject; if (unlikely(++p_ptr->publ.conn_unacked >= TIPC_FLOW_CONTROL_WIN)) tipc_acknowledge(dref, p_ptr->publ.conn_unacked); skb_pull(buf, msg_hdr_sz(msg)); cb(usr_handle, dref, &buf, msg_data(msg), msg_data_sz(msg)); break; } case TIPC_DIRECT_MSG:{ tipc_msg_event cb = up_ptr->msg_cb; spin_unlock_bh(p_ptr->publ.lock); if (unlikely(connected)) goto reject; if (unlikely(!cb)) goto reject; skb_pull(buf, msg_hdr_sz(msg)); cb(usr_handle, dref, &buf, msg_data(msg), msg_data_sz(msg), msg_importance(msg), &orig); break; } case TIPC_MCAST_MSG: case TIPC_NAMED_MSG:{ tipc_named_msg_event cb = up_ptr->named_msg_cb; spin_unlock_bh(p_ptr->publ.lock); if (unlikely(connected)) goto reject; if (unlikely(!cb)) goto reject; if (unlikely(!published)) goto reject; dseq.type = msg_nametype(msg); dseq.lower = msg_nameinst(msg); dseq.upper = (message_type == TIPC_NAMED_MSG) ? dseq.lower : msg_nameupper(msg); skb_pull(buf, msg_hdr_sz(msg)); cb(usr_handle, dref, &buf, msg_data(msg), msg_data_sz(msg), msg_importance(msg), &orig, &dseq); break; } } if (buf) buf_discard(buf); buf = next; continue;err: switch (message_type) { case TIPC_CONN_MSG:{ tipc_conn_shutdown_event cb = up_ptr->conn_err_cb; u32 peer_port = port_peerport(p_ptr); u32 peer_node = port_peernode(p_ptr); spin_unlock_bh(p_ptr->publ.lock); if (!connected || !cb) break; if (msg_origport(msg) != peer_port) break; if (msg_orignode(msg) != peer_node) break; tipc_disconnect(dref); skb_pull(buf, msg_hdr_sz(msg)); cb(usr_handle, dref, &buf, msg_data(msg), msg_data_sz(msg), msg_errcode(msg)); break; } case TIPC_DIRECT_MSG:{ tipc_msg_err_event cb = up_ptr->err_cb; spin_unlock_bh(p_ptr->publ.lock); if (connected || !cb) break; skb_pull(buf, msg_hdr_sz(msg)); cb(usr_handle, dref, &buf, msg_data(msg), msg_data_sz(msg), msg_errcode(msg), &orig); break; } case TIPC_MCAST_MSG: case TIPC_NAMED_MSG:{ tipc_named_msg_err_event cb = up_ptr->named_err_cb; spin_unlock_bh(p_ptr->publ.lock); if (connected || !cb) break; dseq.type = msg_nametype(msg); dseq.lower = msg_nameinst(msg); dseq.upper = (message_type == TIPC_NAMED_MSG) ? dseq.lower : msg_nameupper(msg); skb_pull(buf, msg_hdr_sz(msg)); cb(usr_handle, dref, &buf, msg_data(msg), msg_data_sz(msg), msg_errcode(msg), &dseq); break; } } if (buf) buf_discard(buf); buf = next; continue;reject: tipc_reject_msg(buf, TIPC_ERR_NO_PORT); buf = next; }}/* * port_dispatcher(): Dispatcher for messages destinated * to the tipc_port interface. Called with port locked. */static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf){ buf->next = NULL; spin_lock_bh(&queue_lock); if (msg_queue_head) { msg_queue_tail->next = buf; msg_queue_tail = buf; } else { msg_queue_tail = msg_queue_head = buf; tipc_k_signal((Handler)port_dispatcher_sigh, 0); } spin_unlock_bh(&queue_lock); return TIPC_OK;}/* * Wake up port after congestion: Called with port locked, * */static void port_wakeup_sh(unsigned long ref){ struct port *p_ptr; struct user_port *up_ptr; tipc_continue_event cb = NULL; void *uh = NULL; p_ptr = tipc_port_lock(ref); if (p_ptr) { up_ptr = p_ptr->user_port; if (up_ptr) { cb = up_ptr->continue_event_cb; uh = up_ptr->usr_handle; } tipc_port_unlock(p_ptr); } if (cb) cb(uh, ref);}static void port_wakeup(struct tipc_port *p_ptr){ tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref);}void tipc_acknowledge(u32 ref, u32 ack){ struct port *p_ptr; struct sk_buff *buf = NULL; p_ptr = tipc_port_lock(ref); if (!p_ptr) return; if (p_ptr->publ.connected) { p_ptr->publ.conn_unacked -= ack; buf = port_build_proto_msg(port_peerport(p_ptr), port_peernode(p_ptr), ref, tipc_own_addr, CONN_MANAGER, CONN_ACK, TIPC_OK, port_out_seqno(p_ptr), ack); } tipc_port_unlock(p_ptr); tipc_net_route_msg(buf);}/* * tipc_createport(): user level call. Will add port to * registry if non-zero user_ref. */int tipc_createport(u32 user_ref, void *usr_handle, unsigned int importance, tipc_msg_err_event error_cb, tipc_named_msg_err_event named_error_cb, tipc_conn_shutdown_event conn_error_cb, tipc_msg_event msg_cb, tipc_named_msg_event named_msg_cb, tipc_conn_msg_event conn_msg_cb, tipc_continue_event continue_event_cb,/* May be zero */ u32 *portref){ struct user_port *up_ptr; struct port *p_ptr; u32 ref; up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); if (!up_ptr) { warn("Port creation failed, no memory\n"); return -ENOMEM; } ref = tipc_createport_raw(NULL, port_dispatcher, port_wakeup, importance); p_ptr = tipc_port_lock(ref); if (!p_ptr) { kfree(up_ptr); return -ENOMEM; } p_ptr->user_port = up_ptr; up_ptr->user_ref = user_ref; up_ptr->usr_handle = usr_handle; up_ptr->ref = p_ptr->publ.ref; up_ptr->err_cb = error_cb; up_ptr->named_err_cb = named_error_cb; up_ptr->conn_err_cb = conn_error_cb; up_ptr->msg_cb = msg_cb; up_ptr->named_msg_cb = named_msg_cb; up_ptr->conn_msg_cb = conn_msg_cb; up_ptr->continue_event_cb = continue_event_cb; INIT_LIST_HEAD(&up_ptr->uport_list); tipc_reg_add_port(up_ptr); *portref = p_ptr->publ.ref; dbg(" tipc_createport: %x with ref %u\n", p_ptr, p_ptr->publ.ref); tipc_port_unlock(p_ptr); return TIPC_OK;}int tipc_ownidentity(u32 ref, struct tipc_portid *id){ id->ref = ref; id->node = tipc_own_addr; return TIPC_OK;}int tipc_portimportance(u32 ref, unsigned int *importance){ struct port *p_ptr; p_ptr = tipc_port_lock(ref); if (!p_ptr) return -EINVAL; *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr); spin_unlock_bh(p_ptr->publ.lock); return TIPC_OK;}int tipc_set_portimportance(u32 ref, unsigned int imp){ struct port *p_ptr; if (imp > TIPC_CRITICAL_IMPORTANCE) return -EINVAL; p_ptr = tipc_port_lock(ref); if (!p_ptr) return -EINVAL; msg_set_importance(&p_ptr->publ.phdr, (u32)imp); spin_unlock_bh(p_ptr->publ.lock); return TIPC_OK;}int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq){ struct port *p_ptr; struct publication *publ; u32 key; int res = -EINVAL; p_ptr = tipc_port_lock(ref); if (!p_ptr) return -EINVAL; dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, " "lower = %u, upper = %u\n",
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -