⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socklnd.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
        if (!conn->ksnc_tx_scheduled &&            !list_empty(&conn->ksnc_tx_queue)){                list_add_tail (&conn->ksnc_tx_list,                               &sched->kss_tx_conns);                /* a closing conn is always ready to tx */                conn->ksnc_tx_ready = 1;                conn->ksnc_tx_scheduled = 1;                /* extra ref for scheduler */                ksocknal_conn_addref(conn);                cfs_waitq_signal (&sched->kss_waitq);        }        spin_unlock_bh (&sched->kss_lock);        spin_lock(&peer->ksnp_lock);        list_for_each_safe(tmp, nxt, &peer->ksnp_zc_req_list) {                tx = list_entry(tmp, ksock_tx_t, tx_zc_list);                if (tx->tx_conn != conn)                        continue;                LASSERT (tx->tx_msg.ksm_zc_req_cookie != 0);                tx->tx_msg.ksm_zc_req_cookie = 0;                list_del(&tx->tx_zc_list);                list_add(&tx->tx_zc_list, &zlist);        }        spin_unlock(&peer->ksnp_lock);        list_for_each_safe(tmp, nxt, &zlist) {                tx = list_entry(tmp, ksock_tx_t, tx_zc_list);                list_del(&tx->tx_zc_list);                ksocknal_tx_decref(tx);        }        /* serialise with callbacks */        write_lock_bh (&ksocknal_data.ksnd_global_lock);        ksocknal_lib_reset_callback(conn->ksnc_sock, conn);        /* OK, so this conn may not be completely disengaged from its         * scheduler yet, but it _has_ committed to terminate... */        conn->ksnc_scheduler->kss_nconns--;        if (peer->ksnp_error != 0) {                /* peer's last conn closed in error */                LASSERT (list_empty (&peer->ksnp_conns));                failed = 1;                peer->ksnp_error = 0;     /* avoid multiple notifications */        }        write_unlock_bh (&ksocknal_data.ksnd_global_lock);        if (failed)                ksocknal_peer_failed(peer);        /* The socket is closed on the final put; either here, or in         * ksocknal_{send,recv}msg().  Since we set up the linger2 option         * when the connection was established, this will close the socket         * immediately, aborting anything buffered in it. Any hung         * zero-copy transmits will therefore complete in finite time. */        ksocknal_connsock_decref(conn);}voidksocknal_queue_zombie_conn (ksock_conn_t *conn){        /* Queue the conn for the reaper to destroy */        LASSERT (atomic_read(&conn->ksnc_conn_refcount) == 0);        spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);        list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);        cfs_waitq_signal(&ksocknal_data.ksnd_reaper_waitq);                spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);}voidksocknal_destroy_conn (ksock_conn_t *conn){        /* Final coup-de-grace of the reaper */        CDEBUG (D_NET, "connection %p\n", conn);        LASSERT (atomic_read (&conn->ksnc_conn_refcount) == 0);        LASSERT (atomic_read (&conn->ksnc_sock_refcount) == 0);        LASSERT (conn->ksnc_sock == NULL);        LASSERT (conn->ksnc_route == NULL);        LASSERT (!conn->ksnc_tx_scheduled);        LASSERT (!conn->ksnc_rx_scheduled);        LASSERT (list_empty(&conn->ksnc_tx_queue));        /* complete current receive if any */        switch (conn->ksnc_rx_state) {        case SOCKNAL_RX_LNET_PAYLOAD:                CERROR("Completing partial receive from %s"                       ", ip %d.%d.%d.%d:%d, with error\n",                       libcfs_id2str(conn->ksnc_peer->ksnp_id),                       HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);                lnet_finalize (conn->ksnc_peer->ksnp_ni,                                conn->ksnc_cookie, -EIO);                break;        case SOCKNAL_RX_LNET_HEADER:                if (conn->ksnc_rx_started)                        CERROR("Incomplete receive of lnet header from %s"                               ", ip %d.%d.%d.%d:%d, with error, protocol: %d.x.\n",                               libcfs_id2str(conn->ksnc_peer->ksnp_id),                               HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,                               conn->ksnc_proto->pro_version);                break;        case SOCKNAL_RX_KSM_HEADER:                if (conn->ksnc_rx_started)                        CERROR("Incomplete receive of ksock message from %s"                               ", ip %d.%d.%d.%d:%d, with error, protocol: %d.x.\n",                               libcfs_id2str(conn->ksnc_peer->ksnp_id),                               HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,                               conn->ksnc_proto->pro_version);                break;        case SOCKNAL_RX_SLOP:                if (conn->ksnc_rx_started)                        CERROR("Incomplete receive of slops from %s"                               ", ip %d.%d.%d.%d:%d, with error\n",                               libcfs_id2str(conn->ksnc_peer->ksnp_id),                               HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);               break;        default:                LBUG ();                break;        }        ksocknal_peer_decref(conn->ksnc_peer);        LIBCFS_FREE (conn, sizeof (*conn));}intksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why){        ksock_conn_t       *conn;        struct list_head   *ctmp;        struct list_head   *cnxt;        int                 count = 0;        list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {                conn = list_entry (ctmp, ksock_conn_t, ksnc_list);                if (ipaddr == 0 ||                    conn->ksnc_ipaddr == ipaddr) {                        count++;                        ksocknal_close_conn_locked (conn, why);                }        }        return (count);}intksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why){        ksock_peer_t     *peer = conn->ksnc_peer;        __u32             ipaddr = conn->ksnc_ipaddr;        int               count;        write_lock_bh (&ksocknal_data.ksnd_global_lock);        count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);        write_unlock_bh (&ksocknal_data.ksnd_global_lock);        return (count);}intksocknal_close_matching_conns (lnet_process_id_t id, __u32 ipaddr){        ksock_peer_t       *peer;        struct list_head   *ptmp;        struct list_head   *pnxt;        int                 lo;        int                 hi;        int                 i;        int                 count = 0;        write_lock_bh (&ksocknal_data.ksnd_global_lock);        if (id.nid != LNET_NID_ANY)                lo = hi = ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers;        else {                lo = 0;                hi = ksocknal_data.ksnd_peer_hash_size - 1;        }        for (i = lo; i <= hi; i++) {                list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {                        peer = list_entry (ptmp, ksock_peer_t, ksnp_list);                        if (!((id.nid == LNET_NID_ANY || id.nid == peer->ksnp_id.nid) &&                              (id.pid == LNET_PID_ANY || id.pid == peer->ksnp_id.pid)))                                continue;                        count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);                }        }        write_unlock_bh (&ksocknal_data.ksnd_global_lock);        /* wildcards always succeed */        if (id.nid == LNET_NID_ANY || id.pid == LNET_PID_ANY || ipaddr == 0)                return (0);        return (count == 0 ? -ENOENT : 0);}voidksocknal_notify (lnet_ni_t *ni, lnet_nid_t gw_nid, int alive){        /* The router is telling me she's been notified of a change in         * gateway state.... */        lnet_process_id_t  id = {.nid = gw_nid, .pid = LNET_PID_ANY};        CDEBUG (D_NET, "gw %s %s\n", libcfs_nid2str(gw_nid),                 alive ? "up" : "down");        if (!alive) {                /* If the gateway crashed, close all open connections... */                ksocknal_close_matching_conns (id, 0);                return;        }        /* ...otherwise do nothing.  We can only establish new connections         * if we have autroutes, and these connect on demand. */}voidksocknal_push_peer (ksock_peer_t *peer){        int               index;        int               i;        struct list_head *tmp;        ksock_conn_t     *conn;        for (index = 0; ; index++) {                read_lock (&ksocknal_data.ksnd_global_lock);                i = 0;                conn = NULL;                list_for_each (tmp, &peer->ksnp_conns) {                        if (i++ == index) {                                conn = list_entry (tmp, ksock_conn_t, ksnc_list);                                ksocknal_conn_addref(conn);                                break;                        }                }                read_unlock (&ksocknal_data.ksnd_global_lock);                if (conn == NULL)                        break;                ksocknal_lib_push_conn (conn);                ksocknal_conn_decref(conn);        }}intksocknal_push (lnet_ni_t *ni, lnet_process_id_t id){        ksock_peer_t      *peer;        struct list_head  *tmp;        int                index;        int                i;        int                j;        int                rc = -ENOENT;        for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {                for (j = 0; ; j++) {                        read_lock (&ksocknal_data.ksnd_global_lock);                        index = 0;                        peer = NULL;                        list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {                                peer = list_entry(tmp, ksock_peer_t,                                                  ksnp_list);                                if (!((id.nid == LNET_NID_ANY ||                                       id.nid == peer->ksnp_id.nid) &&                                      (id.pid == LNET_PID_ANY ||                                       id.pid == peer->ksnp_id.pid))) {                                        peer = NULL;                                        continue;                                }                                if (index++ == j) {                                        ksocknal_peer_addref(peer);                                        break;                                }                        }                        read_unlock (&ksocknal_data.ksnd_global_lock);                        if (peer != NULL) {                                rc = 0;                                ksocknal_push_peer (peer);                                ksocknal_peer_decref(peer);                        }                }        }        return (rc);}intksocknal_add_interface(lnet_ni_t *ni, __u32 ipaddress, __u32 netmask){        ksock_net_t       *net = ni->ni_data;        ksock_interface_t *iface;        int                rc;        int                i;        int                j;        struct list_head  *ptmp;        ksock_peer_t      *peer;        struct list_head  *rtmp;        ksock_route_t     *route;        if (ipaddress == 0 ||            netmask == 0)                return (-EINVAL);        write_lock_bh (&ksocknal_data.ksnd_global_lock);        iface = ksocknal_ip2iface(ni, ipaddress);        if (iface != NULL) {                /* silently ignore dups */                rc = 0;        } else if (net->ksnn_ninterfaces == LNET_MAX_INTERFACES) {                rc = -ENOSPC;        } else {                iface = &net->ksnn_interfaces[net->ksnn_ninterfaces++];                iface->ksni_ipaddr = ipaddress;                iface->ksni_netmask = netmask;                iface->ksni_nroutes = 0;                iface->ksni_npeers = 0;                for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {                        list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {                                peer = list_entry(ptmp, ksock_peer_t, ksnp_list);                                for (j = 0; j < peer->ksnp_n_passive_ips; j++)                                        if (peer->ksnp_passive_ips[j] == ipaddress)                                                iface->ksni_npeers++;                                list_for_each(rtmp, &peer->ksnp_routes) {                                        route = list_entry(rtmp, ksock_route_t, ksnr_list);                                        if (route->ksnr_myipaddr == ipaddress)                                                iface->ksni_nroutes++;                                }                        }                }                rc = 0;                /* NB only new connections will pay attention to the new interface! */    

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -