📄 port.c
字号:
/* * net/tipc/port.c: TIPC port code * * Copyright (c) 1992-2007, Ericsson AB * Copyright (c) 2004-2007, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the names of the copyright holders nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * Alternatively, this software may be distributed under the terms of the * GNU General Public License ("GPL") version 2 as published by the Free * Software Foundation. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */#include "core.h"#include "config.h"#include "dbg.h"#include "port.h"#include "addr.h"#include "link.h"#include "node.h"#include "name_table.h"#include "user_reg.h"#include "msg.h"#include "bcast.h"/* Connection management: */#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */#define CONFIRMED 0#define PROBING 1#define MAX_REJECT_SIZE 1024static struct sk_buff *msg_queue_head = NULL;static struct sk_buff *msg_queue_tail = NULL;DEFINE_SPINLOCK(tipc_port_list_lock);static DEFINE_SPINLOCK(queue_lock);static LIST_HEAD(ports);static void port_handle_node_down(unsigned long ref);static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err);static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err);static void port_timeout(unsigned long ref);static u32 port_peernode(struct port *p_ptr){ return msg_destnode(&p_ptr->publ.phdr);}static u32 port_peerport(struct port *p_ptr){ return msg_destport(&p_ptr->publ.phdr);}static u32 port_out_seqno(struct port *p_ptr){ return msg_transp_seqno(&p_ptr->publ.phdr);}static void port_incr_out_seqno(struct port *p_ptr){ struct tipc_msg *m = &p_ptr->publ.phdr; if (likely(!msg_routed(m))) return; msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1));}/** * tipc_multicast - send a multicast message to local and remote destinations */int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain, u32 num_sect, struct iovec const *msg_sect){ struct tipc_msg *hdr; struct sk_buff *buf; struct sk_buff *ibuf = NULL; struct port_list dports = {0, NULL, }; struct port *oport = tipc_port_deref(ref); int ext_targets; int res; if (unlikely(!oport)) return -EINVAL; /* Create multicast message */ hdr = &oport->publ.phdr; msg_set_type(hdr, TIPC_MCAST_MSG); msg_set_nametype(hdr, seq->type); msg_set_namelower(hdr, seq->lower); msg_set_nameupper(hdr, seq->upper); msg_set_hdr_sz(hdr, MCAST_H_SIZE); res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, !oport->user_port, &buf); if (unlikely(!buf)) return res; /* Figure out where to send multicast message */ ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, TIPC_NODE_SCOPE, &dports); /* Send message to destinations (duplicate it only if necessary) */ if (ext_targets) { if (dports.count != 0) { ibuf = skb_copy(buf, GFP_ATOMIC); if (ibuf == NULL) { tipc_port_list_free(&dports); buf_discard(buf); return -ENOMEM; } } res = tipc_bclink_send_msg(buf); if ((res < 0) && (dports.count != 0)) { buf_discard(ibuf); } } else { ibuf = buf; } if (res >= 0) { if (ibuf) tipc_port_recv_mcast(ibuf, &dports); } else { tipc_port_list_free(&dports); } return res;}/** * tipc_port_recv_mcast - deliver multicast message to all destination ports * * If there is no port list, perform a lookup to create one */void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp){ struct tipc_msg* msg; struct port_list dports = {0, NULL, }; struct port_list *item = dp; int cnt = 0; msg = buf_msg(buf); /* Create destination port list, if one wasn't supplied */ if (dp == NULL) { tipc_nametbl_mc_translate(msg_nametype(msg), msg_namelower(msg), msg_nameupper(msg), TIPC_CLUSTER_SCOPE, &dports); item = dp = &dports; } /* Deliver a copy of message to each destination port */ if (dp->count != 0) { if (dp->count == 1) { msg_set_destport(msg, dp->ports[0]); tipc_port_recv_msg(buf); tipc_port_list_free(dp); return; } for (; cnt < dp->count; cnt++) { int index = cnt % PLSIZE; struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); if (b == NULL) { warn("Unable to deliver multicast message(s)\n"); msg_dbg(msg, "LOST:"); goto exit; } if ((index == 0) && (cnt != 0)) { item = item->next; } msg_set_destport(buf_msg(b),item->ports[index]); tipc_port_recv_msg(b); } }exit: buf_discard(buf); tipc_port_list_free(dp);}/** * tipc_createport_raw - create a native TIPC port * * Returns local port reference */u32 tipc_createport_raw(void *usr_handle, u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), void (*wakeup)(struct tipc_port *), const u32 importance){ struct port *p_ptr; struct tipc_msg *msg; u32 ref; p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC); if (!p_ptr) { warn("Port creation failed, no memory\n"); return 0; } ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock); if (!ref) { warn("Port creation failed, reference table exhausted\n"); kfree(p_ptr); return 0; } tipc_port_lock(ref); p_ptr->publ.usr_handle = usr_handle; p_ptr->publ.max_pkt = MAX_PKT_DEFAULT; p_ptr->publ.ref = ref; msg = &p_ptr->publ.phdr; msg_init(msg, DATA_LOW, TIPC_NAMED_MSG, TIPC_OK, LONG_H_SIZE, 0); msg_set_orignode(msg, tipc_own_addr); msg_set_prevnode(msg, tipc_own_addr); msg_set_origport(msg, ref); msg_set_importance(msg,importance); p_ptr->last_in_seqno = 41; p_ptr->sent = 1; INIT_LIST_HEAD(&p_ptr->wait_list); INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); p_ptr->congested_link = NULL; p_ptr->dispatcher = dispatcher; p_ptr->wakeup = wakeup; p_ptr->user_port = NULL; k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); spin_lock_bh(&tipc_port_list_lock); INIT_LIST_HEAD(&p_ptr->publications); INIT_LIST_HEAD(&p_ptr->port_list); list_add_tail(&p_ptr->port_list, &ports); spin_unlock_bh(&tipc_port_list_lock); tipc_port_unlock(p_ptr); return ref;}int tipc_deleteport(u32 ref){ struct port *p_ptr; struct sk_buff *buf = NULL; tipc_withdraw(ref, 0, NULL); p_ptr = tipc_port_lock(ref); if (!p_ptr) return -EINVAL; tipc_ref_discard(ref); tipc_port_unlock(p_ptr); k_cancel_timer(&p_ptr->timer); if (p_ptr->publ.connected) { buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); tipc_nodesub_unsubscribe(&p_ptr->subscription); } if (p_ptr->user_port) { tipc_reg_remove_port(p_ptr->user_port); kfree(p_ptr->user_port); } spin_lock_bh(&tipc_port_list_lock); list_del(&p_ptr->port_list); list_del(&p_ptr->wait_list); spin_unlock_bh(&tipc_port_list_lock); k_term_timer(&p_ptr->timer); kfree(p_ptr); dbg("Deleted port %u\n", ref); tipc_net_route_msg(buf); return TIPC_OK;}/** * tipc_get_port() - return port associated with 'ref' * * Note: Port is not locked. */struct tipc_port *tipc_get_port(const u32 ref){ return (struct tipc_port *)tipc_ref_deref(ref);}/** * tipc_get_handle - return user handle associated to port 'ref' */void *tipc_get_handle(const u32 ref){ struct port *p_ptr; void * handle; p_ptr = tipc_port_lock(ref); if (!p_ptr) return NULL; handle = p_ptr->publ.usr_handle; tipc_port_unlock(p_ptr); return handle;}static int port_unreliable(struct port *p_ptr){ return msg_src_droppable(&p_ptr->publ.phdr);}int tipc_portunreliable(u32 ref, unsigned int *isunreliable){ struct port *p_ptr; p_ptr = tipc_port_lock(ref); if (!p_ptr) return -EINVAL; *isunreliable = port_unreliable(p_ptr); spin_unlock_bh(p_ptr->publ.lock); return TIPC_OK;}int tipc_set_portunreliable(u32 ref, unsigned int isunreliable){ struct port *p_ptr; p_ptr = tipc_port_lock(ref); if (!p_ptr) return -EINVAL; msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0)); tipc_port_unlock(p_ptr); return TIPC_OK;}static int port_unreturnable(struct port *p_ptr){ return msg_dest_droppable(&p_ptr->publ.phdr);}int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable){ struct port *p_ptr; p_ptr = tipc_port_lock(ref); if (!p_ptr) return -EINVAL; *isunrejectable = port_unreturnable(p_ptr); spin_unlock_bh(p_ptr->publ.lock); return TIPC_OK;}int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable){ struct port *p_ptr; p_ptr = tipc_port_lock(ref); if (!p_ptr) return -EINVAL; msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0)); tipc_port_unlock(p_ptr); return TIPC_OK;}/* * port_build_proto_msg(): build a port level protocol * or a connection abortion message. Called with * tipc_port lock on. */static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode, u32 origport, u32 orignode, u32 usr, u32 type, u32 err, u32 seqno, u32 ack){ struct sk_buff *buf; struct tipc_msg *msg; buf = buf_acquire(LONG_H_SIZE); if (buf) { msg = buf_msg(buf); msg_init(msg, usr, type, err, LONG_H_SIZE, destnode); msg_set_destport(msg, destport); msg_set_origport(msg, origport); msg_set_destnode(msg, destnode); msg_set_orignode(msg, orignode); msg_set_transp_seqno(msg, seqno); msg_set_msgcnt(msg, ack); msg_dbg(msg, "PORT>SEND>:"); } return buf;}int tipc_set_msg_option(struct tipc_port *tp_ptr, const char *opt, const u32 sz){ msg_expand(&tp_ptr->phdr, msg_destnode(&tp_ptr->phdr)); msg_set_options(&tp_ptr->phdr, opt, sz); return TIPC_OK;}int tipc_reject_msg(struct sk_buff *buf, u32 err){ struct tipc_msg *msg = buf_msg(buf); struct sk_buff *rbuf; struct tipc_msg *rmsg; int hdr_sz; u32 imp = msg_importance(msg); u32 data_sz = msg_data_sz(msg); if (data_sz > MAX_REJECT_SIZE) data_sz = MAX_REJECT_SIZE; if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE)) imp++; msg_dbg(msg, "port->rej: "); /* discard rejected message if it shouldn't be returned to sender */ if (msg_errcode(msg) || msg_dest_droppable(msg)) { buf_discard(buf); return data_sz; } /* construct rejected message */ if (msg_mcast(msg)) hdr_sz = MCAST_H_SIZE; else hdr_sz = LONG_H_SIZE; rbuf = buf_acquire(data_sz + hdr_sz); if (rbuf == NULL) { buf_discard(buf); return data_sz; } rmsg = buf_msg(rbuf); msg_init(rmsg, imp, msg_type(msg), err, hdr_sz, msg_orignode(msg)); msg_set_destport(rmsg, msg_origport(msg)); msg_set_prevnode(rmsg, tipc_own_addr); msg_set_origport(rmsg, msg_destport(msg)); if (msg_short(msg)) msg_set_orignode(rmsg, tipc_own_addr); else msg_set_orignode(rmsg, msg_destnode(msg)); msg_set_size(rmsg, data_sz + hdr_sz); msg_set_nametype(rmsg, msg_nametype(msg)); msg_set_nameinst(rmsg, msg_nameinst(msg)); skb_copy_to_linear_data_offset(rbuf, hdr_sz, msg_data(msg), data_sz); /* send self-abort message when rejecting on a connected port */ if (msg_connected(msg)) { struct sk_buff *abuf = NULL; struct port *p_ptr = tipc_port_lock(msg_destport(msg)); if (p_ptr) { if (p_ptr->publ.connected) abuf = port_build_self_abort_msg(p_ptr, err); tipc_port_unlock(p_ptr); } tipc_net_route_msg(abuf); } /* send rejected message */ buf_discard(buf); tipc_net_route_msg(rbuf); return data_sz;}int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr, struct iovec const *msg_sect, u32 num_sect, int err){ struct sk_buff *buf; int res; res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, !p_ptr->user_port, &buf); if (!buf) return res; return tipc_reject_msg(buf, err);}static void port_timeout(unsigned long ref){ struct port *p_ptr = tipc_port_lock(ref); struct sk_buff *buf = NULL; if (!p_ptr) return; if (!p_ptr->publ.connected) { tipc_port_unlock(p_ptr); return; } /* Last probe answered ? */ if (p_ptr->probing_state == PROBING) { buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); } else { buf = port_build_proto_msg(port_peerport(p_ptr), port_peernode(p_ptr), p_ptr->publ.ref, tipc_own_addr, CONN_MANAGER, CONN_PROBE, TIPC_OK, port_out_seqno(p_ptr), 0); port_incr_out_seqno(p_ptr); p_ptr->probing_state = PROBING; k_start_timer(&p_ptr->timer, p_ptr->probing_interval); } tipc_port_unlock(p_ptr); tipc_net_route_msg(buf);}static void port_handle_node_down(unsigned long ref){ struct port *p_ptr = tipc_port_lock(ref); struct sk_buff* buf = NULL; if (!p_ptr) return; buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); tipc_port_unlock(p_ptr); tipc_net_route_msg(buf);}static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err){ u32 imp = msg_importance(&p_ptr->publ.phdr); if (!p_ptr->publ.connected) return NULL; if (imp < TIPC_CRITICAL_IMPORTANCE) imp++; return port_build_proto_msg(p_ptr->publ.ref, tipc_own_addr, port_peerport(p_ptr), port_peernode(p_ptr), imp, TIPC_CONN_MSG, err, p_ptr->last_in_seqno + 1, 0);}static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -