📄 openiblnd.c
字号:
LIBCFS_ALLOC(msg, sizeof(*msg)); if (msg == NULL) { CERROR("Can't allocate msgs for %u.%u.%u.%u/%d\n", HIPQUAD(peer_ip), peer_port); return; } rc = libcfs_sock_read(sock, &msg->ibm_magic, sizeof(msg->ibm_magic), lnet_acceptor_timeout()); if (rc != 0) { CERROR("Error %d receiving svcqry(1) from %u.%u.%u.%u/%d\n", rc, HIPQUAD(peer_ip), peer_port); goto out; } if (msg->ibm_magic != IBNAL_MSG_MAGIC && msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) { /* Unexpected magic! */ if (the_lnet.ln_ptlcompat == 0) { if (msg->ibm_magic == LNET_PROTO_MAGIC || msg->ibm_magic == __swab32(LNET_PROTO_MAGIC)) { /* future protocol version compatibility! * When LNET unifies protocols over all LNDs, * the first thing sent will be a version * query. I send back a reply in my current * protocol to tell her I'm "old" */ kibnal_init_msg(msg, 0, 0); kibnal_pack_msg(msg, IBNAL_MSG_VERSION, 0, LNET_NID_ANY, 0); reject = 1; goto reply; } CERROR ("Bad magic(1) %#08x (%#08x expected) from " "%u.%u.%u.%u/%d\n", msg->ibm_magic, IBNAL_MSG_MAGIC, HIPQUAD(peer_ip), peer_port); goto out; } /* When portals compatibility is set, I may be passed a new * connection "blindly" by the acceptor, and I have to * determine if my peer has sent an acceptor connection request * or not. */ rc = lnet_accept(kibnal_data.kib_ni, sock, msg->ibm_magic); if (rc != 0) goto out; /* It was an acceptor connection request! * Now I should see my magic... */ rc = libcfs_sock_read(sock, &msg->ibm_magic, sizeof(msg->ibm_magic), lnet_acceptor_timeout()); if (rc != 0) { CERROR("Error %d receiving svcqry(2) from %u.%u.%u.%u/%d\n", rc, HIPQUAD(peer_ip), peer_port); goto out; } if (msg->ibm_magic != IBNAL_MSG_MAGIC && msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) { CERROR ("Bad magic(2) %#08x (%#08x expected) from " "%u.%u.%u.%u/%d\n", msg->ibm_magic, IBNAL_MSG_MAGIC, HIPQUAD(peer_ip), peer_port); goto out; } } /* Now check version */ rc = libcfs_sock_read(sock, &msg->ibm_version, sizeof(msg->ibm_version), lnet_acceptor_timeout()); if (rc != 0) { CERROR("Error %d receiving svcqry(3) from %u.%u.%u.%u/%d\n", rc, HIPQUAD(peer_ip), peer_port); goto out; } version = (msg->ibm_magic == IBNAL_MSG_MAGIC) ? msg->ibm_version : __swab16(msg->ibm_version); /* Peer is a different protocol version: reply in my current protocol * to tell her I'm "old" */ if (version != IBNAL_MSG_VERSION && version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) { kibnal_init_msg(msg, 0, 0); kibnal_pack_msg(msg, IBNAL_MSG_VERSION, 0, LNET_NID_ANY, 0); reject = 1; goto reply; } /* Now read in all the rest */ rc = libcfs_sock_read(sock, &msg->ibm_type, offsetof(kib_msg_t, ibm_u) - offsetof(kib_msg_t, ibm_type), lnet_acceptor_timeout()); if (rc != 0) { CERROR("Error %d receiving svcqry(4) from %u.%u.%u.%u/%d\n", rc, HIPQUAD(peer_ip), peer_port); goto out; } rc = kibnal_unpack_msg(msg, version, offsetof(kib_msg_t, ibm_u)); if (rc != 0) { CERROR("Error %d unpacking svcqry from %u.%u.%u.%u/%d\n", rc, HIPQUAD(peer_ip), peer_port); goto out; } if (msg->ibm_type != IBNAL_MSG_SVCQRY) { CERROR("Unexpected message %d from %u.%u.%u.%u/%d\n", msg->ibm_type, HIPQUAD(peer_ip), peer_port); goto out; } if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid, msg->ibm_dstnid)) { CERROR("Unexpected dstnid %s: expected %s from %u.%u.%u.%u/%d\n", libcfs_nid2str(msg->ibm_dstnid), libcfs_nid2str(kibnal_data.kib_ni->ni_nid), HIPQUAD(peer_ip), peer_port); goto out; } srcnid = msg->ibm_srcnid; srcstamp = msg->ibm_srcstamp; kibnal_init_msg(msg, IBNAL_MSG_SVCRSP, sizeof(msg->ibm_u.svcrsp)); msg->ibm_u.svcrsp.ibsr_svc_id = kibnal_data.kib_svc_id; memcpy(msg->ibm_u.svcrsp.ibsr_svc_gid, kibnal_data.kib_svc_gid, sizeof(kibnal_data.kib_svc_gid)); msg->ibm_u.svcrsp.ibsr_svc_pkey = kibnal_data.kib_svc_pkey; kibnal_pack_msg(msg, version, 0, srcnid, srcstamp); reply: rc = libcfs_sock_write (sock, msg, msg->ibm_nob, lnet_acceptor_timeout()); if (!reject && rc != 0) { /* Only complain if we're not rejecting */ CERROR("Error %d replying to svcqry from %u.%u.%u.%u/%d\n", rc, HIPQUAD(peer_ip), peer_port); goto out; } out: LIBCFS_FREE(msg, sizeof(*msg));}voidkibnal_free_acceptsock (kib_acceptsock_t *as){ libcfs_sock_release(as->ibas_sock); LIBCFS_FREE(as, sizeof(*as));}intkibnal_accept(lnet_ni_t *ni, struct socket *sock){ kib_acceptsock_t *as; unsigned long flags; LIBCFS_ALLOC(as, sizeof(*as)); if (as == NULL) { CERROR("Out of Memory\n"); return -ENOMEM; } as->ibas_sock = sock; spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags); list_add_tail(&as->ibas_list, &kibnal_data.kib_connd_acceptq); wake_up(&kibnal_data.kib_connd_waitq); spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags); return 0;}intkibnal_start_ib_listener (void) { int rc; LASSERT (kibnal_data.kib_listen_handle == NULL); kibnal_data.kib_svc_id = ib_cm_service_assign(); CDEBUG(D_NET, "svc id "LPX64"\n", kibnal_data.kib_svc_id); rc = ib_cached_gid_get(kibnal_data.kib_device, kibnal_data.kib_port, 0, kibnal_data.kib_svc_gid); if (rc != 0) { CERROR("Can't get port %d GID: %d\n", kibnal_data.kib_port, rc); return rc; } rc = ib_cached_pkey_get(kibnal_data.kib_device, kibnal_data.kib_port, 0, &kibnal_data.kib_svc_pkey); if (rc != 0) { CERROR ("Can't get port %d PKEY: %d\n", kibnal_data.kib_port, rc); return rc; } rc = ib_cm_listen(kibnal_data.kib_svc_id, TS_IB_CM_SERVICE_EXACT_MASK, kibnal_passive_conn_callback, NULL, &kibnal_data.kib_listen_handle); if (rc != 0) { kibnal_data.kib_listen_handle = NULL; CERROR ("Can't create IB listener: %d\n", rc); return rc; } LASSERT (kibnal_data.kib_listen_handle != NULL); return 0;}voidkibnal_stop_ib_listener (void) { int rc; LASSERT (kibnal_data.kib_listen_handle != NULL); rc = ib_cm_listen_stop (kibnal_data.kib_listen_handle); if (rc != 0) CERROR("Error stopping IB listener: %d\n", rc); kibnal_data.kib_listen_handle = NULL;}intkibnal_create_peer (kib_peer_t **peerp, lnet_nid_t nid){ kib_peer_t *peer; unsigned long flags; int rc; LASSERT (nid != LNET_NID_ANY); LIBCFS_ALLOC(peer, sizeof (*peer)); if (peer == NULL) { CERROR("Cannot allocate peer\n"); return -ENOMEM; } memset(peer, 0, sizeof(*peer)); /* zero flags etc */ peer->ibp_nid = nid; atomic_set (&peer->ibp_refcount, 1); /* 1 ref for caller */ INIT_LIST_HEAD (&peer->ibp_list); /* not in the peer table yet */ INIT_LIST_HEAD (&peer->ibp_conns); INIT_LIST_HEAD (&peer->ibp_tx_queue); INIT_LIST_HEAD (&peer->ibp_connd_list); /* not queued for connecting */ peer->ibp_error = 0; peer->ibp_last_alive = cfs_time_current(); peer->ibp_reconnect_interval = 0; /* OK to connect at any time */ write_lock_irqsave(&kibnal_data.kib_global_lock, flags); if (atomic_read(&kibnal_data.kib_npeers) >= *kibnal_tunables.kib_concurrent_peers) { rc = -EOVERFLOW; /* !! but at least it distinguishes */ } else if (kibnal_data.kib_nonewpeers) { rc = -ESHUTDOWN; /* shutdown has started */ } else { rc = 0; /* npeers only grows with kib_global_lock held */ atomic_inc(&kibnal_data.kib_npeers); } write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags); if (rc != 0) { CERROR("Can't create peer: %s\n", (rc == -ESHUTDOWN) ? "shutting down" : "too many peers"); LIBCFS_FREE(peer, sizeof(*peer)); } else { *peerp = peer; } return rc;}voidkibnal_destroy_peer (kib_peer_t *peer){ CDEBUG (D_NET, "peer %s %p deleted\n", libcfs_nid2str(peer->ibp_nid), peer); LASSERT (atomic_read (&peer->ibp_refcount) == 0); LASSERT (peer->ibp_persistence == 0); LASSERT (!kibnal_peer_active(peer)); LASSERT (peer->ibp_connecting == 0); LASSERT (peer->ibp_accepting == 0); LASSERT (list_empty (&peer->ibp_connd_list)); LASSERT (list_empty (&peer->ibp_conns)); LASSERT (list_empty (&peer->ibp_tx_queue)); LIBCFS_FREE (peer, sizeof (*peer)); /* NB a peer's connections keep a reference on their peer until * they are destroyed, so we can be assured that _all_ state to do * with this peer has been cleaned up when its refcount drops to * zero. */ atomic_dec(&kibnal_data.kib_npeers);}kib_peer_t *kibnal_find_peer_locked (lnet_nid_t nid){ struct list_head *peer_list = kibnal_nid2peerlist (nid); struct list_head *tmp; kib_peer_t *peer; list_for_each (tmp, peer_list) { peer = list_entry (tmp, kib_peer_t, ibp_list); LASSERT (peer->ibp_persistence != 0 || /* persistent peer */ peer->ibp_connecting != 0 || /* creating conns */ peer->ibp_accepting != 0 || !list_empty (&peer->ibp_conns)); /* active conn */ if (peer->ibp_nid != nid) continue; return (peer); } return (NULL);}kib_peer_t *kibnal_get_peer (lnet_nid_t nid){ kib_peer_t *peer; unsigned long flags; read_lock_irqsave(&kibnal_data.kib_global_lock, flags); peer = kibnal_find_peer_locked (nid); if (peer != NULL) /* +1 ref for caller? */ kibnal_peer_addref(peer); read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags); return (peer);}voidkibnal_unlink_peer_locked (kib_peer_t *peer){ LASSERT (peer->ibp_persistence == 0); LASSERT (list_empty(&peer->ibp_conns)); LASSERT (kibnal_peer_active(peer)); list_del_init (&peer->ibp_list); /* lose peerlist's ref */ kibnal_peer_decref(peer);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -