📄 ralnd.c
字号:
write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);}intkranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, __u32 peer_ip, int peer_port){ kra_device_t *dev = conn->rac_device; unsigned long flags; RAP_RETURN rrc; /* CAVEAT EMPTOR: we're really overloading rac_last_tx + rac_keepalive * to do RapkCompleteSync() timekeeping (see kibnal_scheduler). */ conn->rac_last_tx = jiffies; conn->rac_keepalive = 0; rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams); if (rrc != RAP_SUCCESS) { CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer_ip), peer_port, rrc); return -ECONNABORTED; } /* Schedule conn on rad_new_conns */ kranal_conn_addref(conn); spin_lock_irqsave(&dev->rad_lock, flags); list_add_tail(&conn->rac_schedlist, &dev->rad_new_conns); wake_up(&dev->rad_waitq); spin_unlock_irqrestore(&dev->rad_lock, flags); rrc = RapkWaitToConnect(conn->rac_rihandle); if (rrc != RAP_SUCCESS) { CERROR("Error waiting to connect to %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer_ip), peer_port, rrc); return -ECONNABORTED; } /* Scheduler doesn't touch conn apart from to deschedule and decref it * after RapkCompleteSync() return success, so conn is all mine */ conn->rac_peerstamp = connreq->racr_peerstamp; conn->rac_peer_connstamp = connreq->racr_connstamp; conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout); kranal_update_reaper_timeout(conn->rac_keepalive); return 0;}intkranal_passive_conn_handshake (struct socket *sock, lnet_nid_t *src_nidp, lnet_nid_t *dst_nidp, kra_conn_t **connp){ __u32 peer_ip; unsigned int peer_port; kra_connreq_t rx_connreq; kra_connreq_t tx_connreq; kra_conn_t *conn; kra_device_t *dev; int rc; int i; rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port); if (rc != 0) { CERROR("Can't get peer's IP: %d\n", rc); return rc; } rc = kranal_recv_connreq(sock, &rx_connreq, 0); if (rc < 0) { CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer_ip), peer_port, rc); return rc; } if (rc > 0) { /* Request from "new" peer: send reply with my MAGIC/VERSION to * tell her I'm old... */ kranal_pack_connreq(&tx_connreq, NULL, LNET_NID_ANY); rc = libcfs_sock_write(sock, &tx_connreq, sizeof(tx_connreq), lnet_acceptor_timeout()); if (rc != 0) CERROR("Can't tx stub connreq to %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer_ip), peer_port, rc); return -EPROTO; } for (i = 0;;i++) { if (i == kranal_data.kra_ndevs) { CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n", rx_connreq.racr_devid, HIPQUAD(peer_ip), peer_port); return -ENODEV; } dev = &kranal_data.kra_devices[i]; if (dev->rad_id == rx_connreq.racr_devid) break; } rc = kranal_create_conn(&conn, dev); if (rc != 0) return rc; kranal_pack_connreq(&tx_connreq, conn, rx_connreq.racr_srcnid); rc = libcfs_sock_write(sock, &tx_connreq, sizeof(tx_connreq), lnet_acceptor_timeout()); if (rc != 0) { CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer_ip), peer_port, rc); kranal_conn_decref(conn); return rc; } rc = kranal_set_conn_params(conn, &rx_connreq, peer_ip, peer_port); if (rc != 0) { kranal_conn_decref(conn); return rc; } *connp = conn; *src_nidp = rx_connreq.racr_srcnid; *dst_nidp = rx_connreq.racr_dstnid; return 0;}intkranal_active_conn_handshake(kra_peer_t *peer, lnet_nid_t *dst_nidp, kra_conn_t **connp){ kra_connreq_t connreq; kra_conn_t *conn; kra_device_t *dev; struct socket *sock; int rc; unsigned int idx; /* spread connections over all devices using both peer NIDs to ensure * all nids use all devices */ idx = peer->rap_nid + kranal_data.kra_ni->ni_nid; dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs]; rc = kranal_create_conn(&conn, dev); if (rc != 0) return rc; kranal_pack_connreq(&connreq, conn, peer->rap_nid); if (the_lnet.ln_testprotocompat != 0) { /* single-shot proto test */ LNET_LOCK(); if ((the_lnet.ln_testprotocompat & 1) != 0) { connreq.racr_version++; the_lnet.ln_testprotocompat &= ~1; } if ((the_lnet.ln_testprotocompat & 2) != 0) { connreq.racr_magic = LNET_PROTO_MAGIC; the_lnet.ln_testprotocompat &= ~2; } LNET_UNLOCK(); } rc = lnet_connect(&sock, peer->rap_nid, 0, peer->rap_ip, peer->rap_port); if (rc != 0) goto failed_0; /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout * immediately after accepting a connection, so we connect and then * send immediately. */ rc = libcfs_sock_write(sock, &connreq, sizeof(connreq), lnet_acceptor_timeout()); if (rc != 0) { CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer->rap_ip), peer->rap_port, rc); goto failed_2; } rc = kranal_recv_connreq(sock, &connreq, 1); if (rc != 0) { CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer->rap_ip), peer->rap_port, rc); goto failed_2; } libcfs_sock_release(sock); rc = -EPROTO; if (connreq.racr_srcnid != peer->rap_nid) { CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: " "received %s expected %s\n", HIPQUAD(peer->rap_ip), peer->rap_port, libcfs_nid2str(connreq.racr_srcnid), libcfs_nid2str(peer->rap_nid)); goto failed_1; } if (connreq.racr_devid != dev->rad_id) { CERROR("Unexpected device id from %u.%u.%u.%u/%d: " "received %d expected %d\n", HIPQUAD(peer->rap_ip), peer->rap_port, connreq.racr_devid, dev->rad_id); goto failed_1; } rc = kranal_set_conn_params(conn, &connreq, peer->rap_ip, peer->rap_port); if (rc != 0) goto failed_1; *connp = conn; *dst_nidp = connreq.racr_dstnid; return 0; failed_2: libcfs_sock_release(sock); failed_1: lnet_connect_console_error(rc, peer->rap_nid, peer->rap_ip, peer->rap_port); failed_0: kranal_conn_decref(conn); return rc;}intkranal_conn_handshake (struct socket *sock, kra_peer_t *peer){ kra_peer_t *peer2; kra_tx_t *tx; lnet_nid_t peer_nid; lnet_nid_t dst_nid; unsigned long flags; kra_conn_t *conn; int rc; int nstale; int new_peer = 0; if (sock == NULL) { /* active: connd wants to connect to 'peer' */ LASSERT (peer != NULL); LASSERT (peer->rap_connecting); rc = kranal_active_conn_handshake(peer, &dst_nid, &conn); if (rc != 0) return rc; write_lock_irqsave(&kranal_data.kra_global_lock, flags); if (!kranal_peer_active(peer)) { /* raced with peer getting unlinked */ write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); kranal_conn_decref(conn); return -ESTALE; } peer_nid = peer->rap_nid; } else { /* passive: listener accepted 'sock' */ LASSERT (peer == NULL); rc = kranal_passive_conn_handshake(sock, &peer_nid, &dst_nid, &conn); if (rc != 0) return rc; /* assume this is a new peer */ rc = kranal_create_peer(&peer, peer_nid); if (rc != 0) { CERROR("Can't create conn for %s\n", libcfs_nid2str(peer_nid)); kranal_conn_decref(conn); return -ENOMEM; } write_lock_irqsave(&kranal_data.kra_global_lock, flags); peer2 = kranal_find_peer_locked(peer_nid); if (peer2 == NULL) { new_peer = 1; } else { /* peer_nid already in the peer table */ kranal_peer_decref(peer); peer = peer2; } } LASSERT ((!new_peer) != (!kranal_peer_active(peer))); /* Refuse connection if peer thinks we are a different NID. We check * this while holding the global lock, to synch with connection * destruction on NID change. */ if (!lnet_ptlcompat_matchnid(kranal_data.kra_ni->ni_nid, dst_nid)) { write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); CERROR("Stale/bad connection with %s: dst_nid %s, expected %s\n", libcfs_nid2str(peer_nid), libcfs_nid2str(dst_nid), libcfs_nid2str(kranal_data.kra_ni->ni_nid)); rc = -ESTALE; goto failed; } /* Refuse to duplicate an existing connection (both sides might try to * connect at once). NB we return success! We _are_ connected so we * _don't_ have any blocked txs to complete with failure. */ rc = kranal_conn_isdup_locked(peer, conn); if (rc != 0) { LASSERT (!list_empty(&peer->rap_conns)); LASSERT (list_empty(&peer->rap_tx_queue)); write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); CWARN("Not creating duplicate connection to %s: %d\n", libcfs_nid2str(peer_nid), rc); rc = 0; goto failed; } if (new_peer) { /* peer table takes my ref on the new peer */ list_add_tail(&peer->rap_list, kranal_nid2peerlist(peer_nid)); } /* initialise timestamps before reaper looks at them */ conn->rac_last_tx = conn->rac_last_rx = jiffies; kranal_peer_addref(peer); /* +1 ref for conn */ conn->rac_peer = peer; list_add_tail(&conn->rac_list, &peer->rap_conns); kranal_conn_addref(conn); /* +1 ref for conn table */ list_add_tail(&conn->rac_hashlist, kranal_cqid2connlist(conn->rac_cqid)); /* Schedule all packets blocking for a connection */ while (!list_empty(&peer->rap_tx_queue)) { tx = list_entry(peer->rap_tx_queue.next, kra_tx_t, tx_list); list_del(&tx->tx_list); kranal_post_fma(conn, tx); } nstale = kranal_close_stale_conns_locked(peer, conn); write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); /* CAVEAT EMPTOR: passive peer can disappear NOW */ if (nstale != 0) CWARN("Closed %d stale conns to %s\n", nstale, libcfs_nid2str(peer_nid)); CWARN("New connection to %s on devid[%d] = %d\n", libcfs_nid2str(peer_nid), conn->rac_device->rad_idx, conn->rac_device->rad_id); /* Ensure conn gets checked. Transmits may have been queued and an * FMA event may have happened before it got in the cq hash table */ kranal_schedule_conn(conn); return 0; failed: if (new_peer) kranal_peer_decref(peer); kranal_conn_decref(conn); return rc;}voidkranal_connect (kra_peer_t *peer){ kra_tx_t *tx; unsigned long flags; struct list_head zombies; int rc; LASSERT (peer->rap_connecting); CDEBUG(D_NET, "About to handshake %s\n", libcfs_nid2str(peer->rap_nid)); rc = kranal_conn_handshake(NULL, peer); CDEBUG(D_NET, "Done handshake %s:%d \n", libcfs_nid2str(peer->rap_nid), rc); write_lock_irqsave(&kranal_data.kra_global_lock, flags); LASSERT (peer->rap_connecting); peer->rap_connecting = 0; if (rc == 0) { /* kranal_conn_handshake() queues blocked txs immediately on * success to avoid messages jumping the queue */ LASSERT (list_empty(&peer->rap_tx_queue)); peer->rap_reconnect_interval = 0; /* OK to reconnect at any time */ write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); return; } peer->rap_reconnect_interval *= 2; peer->rap_reconnect_interval = MAX(peer->rap_reconnect_interval, *kranal_tunables.kra_min_reconnect_interval); peer->rap_reconnect_interval = MIN(peer->rap_reconnect_interval, *kranal_tunables.kra_max_reconnect_interval); peer->rap_reconnect_time = jiffies + peer->rap_reconnect_interval * HZ; /* Grab all blocked packets while we have the global lock */ list_add(&zombies, &peer->rap_tx_queue); list_del_init(&peer->rap_tx_queue); write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); if (list_empty(&zombies)) return; CDEBUG(D_NETERROR, "Dropping packets for %s: connection failed\n", libcfs_nid2str(peer->rap_nid)); do { tx = list_entry(zombies.next, kra_tx_t, tx_list); list_del(&tx->tx_list); kranal_tx_done(tx, -EHOSTUNREACH); } while (!list_empty(&zombies));}void
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -