⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socklnd.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
                peer2 = ksocknal_find_peer_locked(ni, peerid);                if (peer2 == NULL) {                        /* NB this puts an "empty" peer in the peer                         * table (which takes my ref) */                        list_add_tail(&peer->ksnp_list,                                      ksocknal_nid2peerlist(peerid.nid));                } else {                        ksocknal_peer_decref(peer);                        peer = peer2;                }                /* +1 ref for me */                ksocknal_peer_addref(peer);                peer->ksnp_accepting++;                                /* Am I already connecting to this guy?  Resolve in                 * favour of higher NID... */                if (peerid.nid < ni->ni_nid &&                    ksocknal_connecting(peer, conn->ksnc_ipaddr)) {                        rc = EALREADY;                        warn = "connection race resolution";                        goto failed_2;                }        }        if (peer->ksnp_closing ||            (active && route->ksnr_deleted)) {                /* peer/route got closed under me */                rc = -ESTALE;                warn = "peer/route removed";                goto failed_2;        }        if (peer->ksnp_proto == NULL) {                 /* Never connected before.                 * NB recv_hello may have returned EPROTO to signal my peer                 * wants a different protocol than the one I asked for.                 */                LASSERT (list_empty(&peer->ksnp_conns));                                peer->ksnp_proto = conn->ksnc_proto;                peer->ksnp_incarnation = incarnation;        }        if (peer->ksnp_proto != conn->ksnc_proto ||            peer->ksnp_incarnation != incarnation) {                /* Peer rebooted or I've got the wrong protocol version */                ksocknal_close_peer_conns_locked(peer, 0, 0);                peer->ksnp_proto = NULL;                rc = ESTALE;                warn = peer->ksnp_incarnation != incarnation ?                        "peer rebooted" :                       "wrong proto version";                goto failed_2;        }        switch (rc) {        default:                LBUG();        case 0:                break;        case EALREADY:                warn = "lost conn race";                goto failed_2;        case EPROTO:                warn = "retry with different protocol version";                goto failed_2;        }        /* Refuse to duplicate an existing connection, unless this is a         * loopback connection */        if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {                list_for_each(tmp, &peer->ksnp_conns) {                        conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);                        if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||                            conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||                            conn2->ksnc_type != conn->ksnc_type)                                continue;                        /* Reply on a passive connection attempt so the peer                         * realises we're connected. */                        LASSERT (rc == 0);                        if (!active)                                rc = EALREADY;                        warn = "duplicate";                        goto failed_2;                }        }        /* If the connection created by this route didn't bind to the IP         * address the route connected to, the connection/route matching         * code below probably isn't going to work. */        if (active &&            route->ksnr_ipaddr != conn->ksnc_ipaddr) {                CERROR("Route %s %u.%u.%u.%u connected to %u.%u.%u.%u\n",                       libcfs_id2str(peer->ksnp_id),                       HIPQUAD(route->ksnr_ipaddr),                       HIPQUAD(conn->ksnc_ipaddr));        }        /* Search for a route corresponding to the new connection and         * create an association.  This allows incoming connections created         * by routes in my peer to match my own route entries so I don't         * continually create duplicate routes. */        list_for_each (tmp, &peer->ksnp_routes) {                route = list_entry(tmp, ksock_route_t, ksnr_list);                if (route->ksnr_ipaddr != conn->ksnc_ipaddr)                        continue;                ksocknal_associate_route_conn_locked(route, conn);                break;        }        conn->ksnc_peer = peer;                 /* conn takes my ref on peer */        peer->ksnp_last_alive = cfs_time_current();        peer->ksnp_error = 0;        sched = ksocknal_choose_scheduler_locked (irq);        sched->kss_nconns++;        conn->ksnc_scheduler = sched;        /* Set the deadline for the outgoing HELLO to drain */        conn->ksnc_tx_bufnob = SOCK_WMEM_QUEUED(sock);        conn->ksnc_tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout);        mb();       /* order with adding to peer's conn list */        list_add (&conn->ksnc_list, &peer->ksnp_conns);        ksocknal_conn_addref(conn);        ksocknal_new_packet(conn, 0);        /* Take all the packets blocking for a connection.         * NB, it might be nicer to share these blocked packets among any         * other connections that are becoming established. */        while (!list_empty (&peer->ksnp_tx_queue)) {                tx = list_entry (peer->ksnp_tx_queue.next,                                 ksock_tx_t, tx_list);                list_del (&tx->tx_list);                ksocknal_queue_tx_locked (tx, conn);        }        write_unlock_bh (global_lock);        /* We've now got a new connection.  Any errors from here on are just         * like "normal" comms errors and we close the connection normally.         * NB (a) we still have to send the reply HELLO for passive         *        connections,          *    (b) normal I/O on the conn is blocked until I setup and call the         *        socket callbacks.         */        ksocknal_lib_bind_irq (irq);        CDEBUG(D_NET, "New conn %s p %d.x %u.%u.%u.%u -> %u.%u.%u.%u/%d"               " incarnation:"LPD64" sched[%d]/%d\n",               libcfs_id2str(peerid), conn->ksnc_proto->pro_version,               HIPQUAD(conn->ksnc_myipaddr), HIPQUAD(conn->ksnc_ipaddr),               conn->ksnc_port, incarnation,               (int)(conn->ksnc_scheduler - ksocknal_data.ksnd_schedulers), irq);        if (active) {                /* additional routes after interface exchange? */                ksocknal_create_routes(peer, conn->ksnc_port,                                       hello->kshm_ips, hello->kshm_nips);        } else {                hello->kshm_nips = ksocknal_select_ips(peer, hello->kshm_ips,                                                       hello->kshm_nips);                rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);        }                LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,                                    kshm_ips[LNET_MAX_INTERFACES]));        /* setup the socket AFTER I've received hello (it disables         * SO_LINGER).  I might call back to the acceptor who may want         * to send a protocol version response and then close the         * socket; this ensures the socket only tears down after the         * response has been sent. */        if (rc == 0)                rc = ksocknal_lib_setup_sock(sock);        write_lock_bh(global_lock);        /* NB my callbacks block while I hold ksnd_global_lock */        ksocknal_lib_set_callback(sock, conn);        if (!active)                peer->ksnp_accepting--;        write_unlock_bh(global_lock);        if (rc != 0) {                write_lock_bh(global_lock);                ksocknal_close_conn_locked(conn, rc);                write_unlock_bh(global_lock);        } else if (ksocknal_connsock_addref(conn) == 0) {                /* Allow I/O to proceed. */                ksocknal_read_callback(conn);                ksocknal_write_callback(conn);                ksocknal_connsock_decref(conn);        }        ksocknal_conn_decref(conn);        return rc; failed_2:        if (!peer->ksnp_closing &&            list_empty (&peer->ksnp_conns) &&            list_empty (&peer->ksnp_routes)) {                list_add(&zombies, &peer->ksnp_tx_queue);                list_del_init(&peer->ksnp_tx_queue);                ksocknal_unlink_peer_locked(peer);        }                write_unlock_bh (global_lock);        if (warn != NULL) {                if (rc < 0)                        CERROR("Not creating conn %s type %d: %s\n",                               libcfs_id2str(peerid), conn->ksnc_type, warn);                else                        CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",                              libcfs_id2str(peerid), conn->ksnc_type, warn);        }        if (!active) {                if (rc > 0) {                        /* Request retry by replying with CONN_NONE                          * ksnc_proto has been set already */                        conn->ksnc_type = SOCKLND_CONN_NONE;                        hello->kshm_nips = 0;                        ksocknal_send_hello(ni, conn, peerid.nid, hello);                }                write_lock_bh(global_lock);                peer->ksnp_accepting--;                write_unlock_bh(global_lock);        }                ksocknal_txlist_done(ni, &zombies, 1);        ksocknal_peer_decref(peer); failed_1:        if (hello != NULL)                LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,                                            kshm_ips[LNET_MAX_INTERFACES]));        LIBCFS_FREE (conn, sizeof(*conn)); failed_0:        libcfs_sock_release(sock);        return rc;}voidksocknal_close_conn_locked (ksock_conn_t *conn, int error){        /* This just does the immmediate housekeeping, and queues the         * connection for the reaper to terminate.         * Caller holds ksnd_global_lock exclusively in irq context */        ksock_peer_t      *peer = conn->ksnc_peer;        ksock_route_t     *route;        ksock_conn_t      *conn2;        struct list_head  *tmp;        LASSERT (peer->ksnp_error == 0);        LASSERT (!conn->ksnc_closing);        conn->ksnc_closing = 1;        /* ksnd_deathrow_conns takes over peer's ref */        list_del (&conn->ksnc_list);        route = conn->ksnc_route;        if (route != NULL) {                /* dissociate conn from route... */                LASSERT (!route->ksnr_deleted);                LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);                conn2 = NULL;                list_for_each(tmp, &peer->ksnp_conns) {                        conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);                        if (conn2->ksnc_route == route &&                            conn2->ksnc_type == conn->ksnc_type)                                break;                        conn2 = NULL;                }                if (conn2 == NULL)                        route->ksnr_connected &= ~(1 << conn->ksnc_type);                conn->ksnc_route = NULL;#if 0           /* irrelevent with only eager routes */                list_del (&route->ksnr_list);   /* make route least favourite */                list_add_tail (&route->ksnr_list, &peer->ksnp_routes);#endif                ksocknal_route_decref(route);     /* drop conn's ref on route */        }        if (list_empty (&peer->ksnp_conns)) {                /* No more connections to this peer */                peer->ksnp_proto = NULL;        /* renegotiate protocol version */                peer->ksnp_error = error;       /* stash last conn close reason */                if (list_empty (&peer->ksnp_routes)) {                        /* I've just closed last conn belonging to a                         * peer with no routes to it */                        ksocknal_unlink_peer_locked (peer);                }        }        spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);        list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);        cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq);        spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);}voidksocknal_peer_failed (ksock_peer_t *peer){        time_t    last_alive = 0;        int       notify = 0;        /* There has been a connection failure or comms error; but I'll only         * tell LNET I think the peer is dead if it's to another kernel and         * there are no connections or connection attempts in existance. */                read_lock (&ksocknal_data.ksnd_global_lock);        if ((peer->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&            list_empty(&peer->ksnp_conns) &&            peer->ksnp_accepting == 0 &&            ksocknal_find_connecting_route_locked(peer) == NULL) {                notify = 1;                last_alive = cfs_time_seconds(peer->ksnp_last_alive);        }                read_unlock (&ksocknal_data.ksnd_global_lock);        if (notify)                lnet_notify (peer->ksnp_ni, peer->ksnp_id.nid, 0,                             last_alive);}voidksocknal_terminate_conn (ksock_conn_t *conn){        /* This gets called by the reaper (guaranteed thread context) to         * disengage the socket from its callbacks and close it.         * ksnc_refcount will eventually hit zero, and then the reaper will         * destroy it. */        ksock_peer_t     *peer = conn->ksnc_peer;        ksock_sched_t    *sched = conn->ksnc_scheduler;        int               failed = 0;        struct list_head *tmp;        struct list_head *nxt;        ksock_tx_t       *tx;        LIST_HEAD        (zlist);        LASSERT(conn->ksnc_closing);        /* wake up the scheduler to "send" all remaining packets to /dev/null */        spin_lock_bh (&sched->kss_lock);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -