⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socklnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
        /* called holding global lock (read or irq-write) and caller may         * not have dropped this lock between finding conn and calling me,         * so we don't need the {get,put}connsock dance to deref         * ksnc_sock... */        LASSERT(!conn->ksnc_closing);        CDEBUG (D_NET, "Sending to %s ip %d.%d.%d.%d:%d\n",                 libcfs_id2str(conn->ksnc_peer->ksnp_id),                HIPQUAD(conn->ksnc_ipaddr),                conn->ksnc_port);        tx->tx_checked_zc = 0;        conn->ksnc_proto->pro_pack(tx);        /* Ensure the frags we've been given EXACTLY match the number of         * bytes we want to send.  Many TCP/IP stacks disregard any total         * size parameters passed to them and just look at the frags.          *         * We always expect at least 1 mapped fragment containing the         * complete ksocknal message header. */        LASSERT (lnet_iov_nob (tx->tx_niov, tx->tx_iov) +                 lnet_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);        LASSERT (tx->tx_niov >= 1);        LASSERT (tx->tx_resid == tx->tx_nob);        CDEBUG (D_NET, "Packet %p type %d, nob %d niov %d nkiov %d\n",                tx, (tx->tx_lnetmsg != NULL)? tx->tx_lnetmsg->msg_hdr.type:                                               KSOCK_MSG_NOOP,                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);                atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);        tx->tx_conn = conn;        ksocknal_conn_addref(conn); /* +1 ref for tx */        /*          * NB Darwin: SOCK_WMEM_QUEUED()->sock_getsockopt() will take         * a blockable lock(socket lock), so SOCK_WMEM_QUEUED can't be         * put in spinlock.          */        bufnob = SOCK_WMEM_QUEUED(conn->ksnc_sock);        spin_lock_bh (&sched->kss_lock);        if (list_empty(&conn->ksnc_tx_queue) && bufnob == 0) {                /* First packet starts the timeout */                conn->ksnc_tx_deadline =                         cfs_time_shift(*ksocknal_tunables.ksnd_timeout);                conn->ksnc_tx_bufnob = 0;                mb();    /* order with adding to tx_queue */        }        ztx = NULL;        if (msg->ksm_type == KSOCK_MSG_NOOP) {                /* The packet is noop ZC ACK, try to piggyback the ack_cookie                 * on a normal packet so I don't need to send it */                LASSERT(msg->ksm_zc_req_cookie == 0);                LASSERT(msg->ksm_zc_ack_cookie != 0);                if (conn->ksnc_tx_mono != NULL) {                        if (ksocknal_piggyback_zcack(conn, msg->ksm_zc_ack_cookie)) {                                /* zc-ack cookie is piggybacked */                                atomic_sub (tx->tx_nob, &conn->ksnc_tx_nob);                                ztx = tx;       /* Put to freelist later */                        } else {                                /* no packet can piggyback zc-ack cookie */                                list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);                        }                } else {                        /* It's the first mono-packet */                        conn->ksnc_tx_mono = tx;                        list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);                }        } else {                /* It's a normal packet - can it piggback a noop zc-ack that                 * has been queued already? */                LASSERT(msg->ksm_zc_ack_cookie == 0);                if (conn->ksnc_proto == &ksocknal_protocol_v2x &&  /* V2.x packet */                    conn->ksnc_tx_mono != NULL) {                                  if (conn->ksnc_tx_mono->tx_msg.ksm_type == KSOCK_MSG_NOOP) {                                /* There is a noop zc-ack can be piggybacked */                                ztx = conn->ksnc_tx_mono;                                msg->ksm_zc_ack_cookie = ztx->tx_msg.ksm_zc_ack_cookie;                                ksocknal_next_mono_tx(conn);                                /* use tx to replace the noop zc-ack packet, ztx will                                 * be put to freelist later */                                list_add(&tx->tx_list, &ztx->tx_list);                                list_del(&ztx->tx_list);                                atomic_sub (ztx->tx_nob, &conn->ksnc_tx_nob);                        } else {                                /* no noop zc-ack packet, just enqueue it */                                LASSERT(conn->ksnc_tx_mono->tx_msg.ksm_type == KSOCK_MSG_LNET);                                list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);                        }                } else if (conn->ksnc_proto == &ksocknal_protocol_v2x) {                        /* it's the first mono-packet, enqueue it */                        conn->ksnc_tx_mono = tx;                        list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);                } else {                        /* V1.x packet, just enqueue it */                        list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);                }        }        if (ztx != NULL)                list_add_tail(&ztx->tx_list, &sched->kss_zombie_noop_txs);                        if (conn->ksnc_tx_ready &&      /* able to send */            !conn->ksnc_tx_scheduled) { /* not scheduled to send */                /* +1 ref for scheduler */                ksocknal_conn_addref(conn);                list_add_tail (&conn->ksnc_tx_list,                                &sched->kss_tx_conns);                conn->ksnc_tx_scheduled = 1;                cfs_waitq_signal (&sched->kss_waitq);        }        spin_unlock_bh (&sched->kss_lock);}ksock_route_t *ksocknal_find_connectable_route_locked (ksock_peer_t *peer){        struct list_head  *tmp;        ksock_route_t     *route;                list_for_each (tmp, &peer->ksnp_routes) {                route = list_entry (tmp, ksock_route_t, ksnr_list);                LASSERT (!route->ksnr_connecting || route->ksnr_scheduled);                if (route->ksnr_scheduled)      /* connections being established */                        continue;                /* all route types connected ? */                if ((ksocknal_route_mask() & ~route->ksnr_connected) == 0)                        continue;                /* too soon to retry this guy? */                if (!(route->ksnr_retry_interval == 0 || /* first attempt */                      cfs_time_aftereq (cfs_time_current(),                                         route->ksnr_timeout)))                        continue;                                return (route);        }                return (NULL);}ksock_route_t *ksocknal_find_connecting_route_locked (ksock_peer_t *peer){        struct list_head  *tmp;        ksock_route_t     *route;        list_for_each (tmp, &peer->ksnp_routes) {                route = list_entry (tmp, ksock_route_t, ksnr_list);                LASSERT (!route->ksnr_connecting || route->ksnr_scheduled);                                if (route->ksnr_scheduled)                        return (route);        }                return (NULL);}intksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id){        ksock_peer_t     *peer;        ksock_conn_t     *conn;        ksock_route_t    *route;        rwlock_t         *g_lock;        int               retry;        int               rc;                LASSERT (tx->tx_conn == NULL);        LASSERT (tx->tx_lnetmsg != NULL);        g_lock = &ksocknal_data.ksnd_global_lock;                for (retry = 0;; retry = 1) {#if !SOCKNAL_ROUND_ROBIN                read_lock (g_lock);                peer = ksocknal_find_peer_locked(ni, id);                if (peer != NULL) {                        if (ksocknal_find_connectable_route_locked(peer) == NULL) {                                conn = ksocknal_find_conn_locked (tx->tx_lnetmsg->msg_len, peer);                                if (conn != NULL) {                                        /* I've got no routes that need to be                                         * connecting and I do have an actual                                         * connection... */                                        ksocknal_queue_tx_locked (tx, conn);                                        read_unlock (g_lock);                                        return (0);                                }                        }                }                 /* I'll need a write lock... */                read_unlock (g_lock);#endif                write_lock_bh (g_lock);                peer = ksocknal_find_peer_locked(ni, id);                if (peer != NULL)                         break;                                write_unlock_bh (g_lock);                if ((id.pid & LNET_PID_USERFLAG) != 0) {                        CERROR("Refusing to create a connection to "                               "userspace process %s\n", libcfs_id2str(id));                        return -EHOSTUNREACH;                }                                if (retry) {                        CERROR("Can't find peer %s\n", libcfs_id2str(id));                        return -EHOSTUNREACH;                }                                rc = ksocknal_add_peer(ni, id,                                        LNET_NIDADDR(id.nid),                                       lnet_acceptor_port());                if (rc != 0) {                        CERROR("Can't add peer %s: %d\n",                               libcfs_id2str(id), rc);                        return rc;                }        }        for (;;) {                /* launch any/all connections that need it */                route = ksocknal_find_connectable_route_locked (peer);                if (route == NULL)                        break;                ksocknal_launch_connection_locked (route);        }        conn = ksocknal_find_conn_locked (tx->tx_lnetmsg->msg_len, peer);        if (conn != NULL) {                /* Connection exists; queue message on it */                ksocknal_queue_tx_locked (tx, conn);                write_unlock_bh (g_lock);                return (0);        }        if (peer->ksnp_accepting > 0 ||            ksocknal_find_connecting_route_locked (peer) != NULL) {                /* Queue the message until a connection is established */                list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);                write_unlock_bh (g_lock);                return 0;        }                write_unlock_bh (g_lock);        /* NB Routes may be ignored if connections to them failed recently */        CDEBUG(D_NETERROR, "No usable routes to %s\n", libcfs_id2str(id));        return (-EHOSTUNREACH);}intksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg){        int               type = lntmsg->msg_type;         lnet_process_id_t target = lntmsg->msg_target;        unsigned int      payload_niov = lntmsg->msg_niov;         struct iovec     *payload_iov = lntmsg->msg_iov;         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;        unsigned int      payload_offset = lntmsg->msg_offset;        unsigned int      payload_nob = lntmsg->msg_len;        ksock_tx_t       *tx;        int               desc_size;        int               rc;        /* NB 'private' is different depending on what we're sending.         * Just ignore it... */        CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",               payload_nob, payload_niov, libcfs_id2str(target));        LASSERT (payload_nob == 0 || payload_niov > 0);        LASSERT (payload_niov <= LNET_MAX_IOV);        /* payload is either all vaddrs or all pages */        LASSERT (!(payload_kiov != NULL && payload_iov != NULL));        LASSERT (!in_interrupt ());                if (payload_iov != NULL)                desc_size = offsetof(ksock_tx_t,                                      tx_frags.virt.iov[1 + payload_niov]);        else                desc_size = offsetof(ksock_tx_t,                                      tx_frags.paged.kiov[payload_niov]);                tx = ksocknal_alloc_tx(desc_size);        if (tx == NULL) {                CERROR("Can't allocate tx desc type %d size %d\n",                       type, desc_size);                return (-ENOMEM);        }        tx->tx_conn = NULL;                     /* set when assigned a conn */        tx->tx_lnetmsg = lntmsg;        if (payload_iov != NULL) {                tx->tx_kiov = NULL;                tx->tx_nkiov = 0;                tx->tx_iov = tx->tx_frags.virt.iov;                tx->tx_niov = 1 +                               lnet_extract_iov(payload_niov, &tx->tx_iov[1],                                               payload_niov, payload_iov,                                               payload_offset, payload_nob);        } else {                tx->tx_niov = 1;                tx->tx_iov = &tx->tx_frags.paged.iov;                tx->tx_kiov = tx->tx_frags.paged.kiov;                tx->tx_nkiov = lnet_extract_kiov(payload_niov, tx->tx_kiov,                                                 payload_niov, payload_kiov,                                                 payload_offset, payload_nob);        }        ksocknal_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);        /* The first fragment will be set later in pro_pack */        rc = ksocknal_launch_packet(ni, tx, target);        if (rc == 0)                return (0);                ksocknal_free_tx(tx);        return (-EIO);}intksocknal_thread_start (int (*fn)(void *arg), void *arg){        long          pid = cfs_kernel_thread (fn, arg, 0);        if (pid < 0)                return ((int)pid);        write_lock_bh (&ksocknal_data.ksnd_global_lock);        ksocknal_data.ksnd_nthreads++;        write_unlock_bh (&ksocknal_data.ksnd_global_lock);        return (0);}voidksocknal_thread_fini (void){        write_lock_bh (&ksocknal_data.ksnd_global_lock);        ksocknal_data.ksnd_nthreads--;        write_unlock_bh (&ksocknal_data.ksnd_global_lock);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -