⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socklnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
        LASSERT (niov <= LNET_MAX_IOV);                conn->ksnc_cookie = msg;        conn->ksnc_rx_nob_wanted = mlen;        conn->ksnc_rx_nob_left   = rlen;        if (mlen == 0 || iov != NULL) {                conn->ksnc_rx_nkiov = 0;                conn->ksnc_rx_kiov = NULL;                conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;                conn->ksnc_rx_niov =                        lnet_extract_iov(LNET_MAX_IOV, conn->ksnc_rx_iov,                                         niov, iov, offset, mlen);        } else {                conn->ksnc_rx_niov = 0;                conn->ksnc_rx_iov  = NULL;                conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;                conn->ksnc_rx_nkiov =                         lnet_extract_kiov(LNET_MAX_IOV, conn->ksnc_rx_kiov,                                          niov, kiov, offset, mlen);        }                LASSERT (mlen ==                  lnet_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +                 lnet_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));        LASSERT (conn->ksnc_rx_scheduled);        spin_lock_bh (&sched->kss_lock);        switch (conn->ksnc_rx_state) {        case SOCKNAL_RX_PARSE_WAIT:                list_add_tail(&conn->ksnc_rx_list, &sched->kss_rx_conns);                cfs_waitq_signal (&sched->kss_waitq);                LASSERT (conn->ksnc_rx_ready);                break;                        case SOCKNAL_RX_PARSE:                /* scheduler hasn't noticed I'm parsing yet */                break;        }        conn->ksnc_rx_state = SOCKNAL_RX_LNET_PAYLOAD;                spin_unlock_bh (&sched->kss_lock);        ksocknal_conn_decref(conn);        return (0);}static inline intksocknal_sched_cansleep(ksock_sched_t *sched){        int           rc;        spin_lock_bh (&sched->kss_lock);        rc = (!ksocknal_data.ksnd_shuttingdown &&              list_empty(&sched->kss_rx_conns) &&              list_empty(&sched->kss_tx_conns));                spin_unlock_bh (&sched->kss_lock);        return (rc);}int ksocknal_scheduler (void *arg){        ksock_sched_t     *sched = (ksock_sched_t *)arg;        ksock_conn_t      *conn;        ksock_tx_t        *tx;        int                rc;        int                nloops = 0;        int                id = sched - ksocknal_data.ksnd_schedulers;        char               name[16];        snprintf (name, sizeof (name),"socknal_sd%02d", id);        cfs_daemonize (name);        cfs_block_allsigs ();#if defined(CONFIG_SMP) && defined(CPU_AFFINITY)        id = ksocknal_sched2cpu(id);        if (cpu_online(id)) {                cpumask_t m = CPU_MASK_NONE;                cpu_set(id, m);                set_cpus_allowed(current, m);        } else {                CERROR ("Can't set CPU affinity for %s to %d\n", name, id);        }#endif /* CONFIG_SMP && CPU_AFFINITY */        spin_lock_bh (&sched->kss_lock);        while (!ksocknal_data.ksnd_shuttingdown) {                int did_something = 0;                /* Ensure I progress everything semi-fairly */                if (!list_empty (&sched->kss_rx_conns)) {                        conn = list_entry(sched->kss_rx_conns.next,                                          ksock_conn_t, ksnc_rx_list);                        list_del(&conn->ksnc_rx_list);                        LASSERT(conn->ksnc_rx_scheduled);                        LASSERT(conn->ksnc_rx_ready);                        /* clear rx_ready in case receive isn't complete.                         * Do it BEFORE we call process_recv, since                         * data_ready can set it any time after we release                         * kss_lock. */                        conn->ksnc_rx_ready = 0;                        spin_unlock_bh (&sched->kss_lock);                        rc = ksocknal_process_receive(conn);                        spin_lock_bh (&sched->kss_lock);                        /* I'm the only one that can clear this flag */                        LASSERT(conn->ksnc_rx_scheduled);                        /* Did process_receive get everything it wanted? */                        if (rc == 0)                                conn->ksnc_rx_ready = 1;                        if (conn->ksnc_rx_state == SOCKNAL_RX_PARSE) {                                /* Conn blocked waiting for ksocknal_recv()                                 * I change its state (under lock) to signal                                 * it can be rescheduled */                                conn->ksnc_rx_state = SOCKNAL_RX_PARSE_WAIT;                        } else if (conn->ksnc_rx_ready) {                                /* reschedule for rx */                                list_add_tail (&conn->ksnc_rx_list,                                               &sched->kss_rx_conns);                        } else {                                conn->ksnc_rx_scheduled = 0;                                /* drop my ref */                                ksocknal_conn_decref(conn);                        }                        did_something = 1;                }                if (!list_empty (&sched->kss_tx_conns)) {                        CFS_LIST_HEAD    (zlist);                        if (!list_empty(&sched->kss_zombie_noop_txs)) {                                list_add(&zlist, &sched->kss_zombie_noop_txs);                                 list_del_init(&sched->kss_zombie_noop_txs);                        }                        conn = list_entry(sched->kss_tx_conns.next,                                          ksock_conn_t, ksnc_tx_list);                        list_del (&conn->ksnc_tx_list);                                                LASSERT(conn->ksnc_tx_scheduled);                        LASSERT(conn->ksnc_tx_ready);                        LASSERT(!list_empty(&conn->ksnc_tx_queue));                                                tx = list_entry(conn->ksnc_tx_queue.next,                                        ksock_tx_t, tx_list);                        if (conn->ksnc_tx_mono == tx)                                ksocknal_next_mono_tx(conn);                        /* dequeue now so empty list => more to send */                        list_del(&tx->tx_list);                        /* Clear tx_ready in case send isn't complete.  Do                         * it BEFORE we call process_transmit, since                         * write_space can set it any time after we release                         * kss_lock. */                        conn->ksnc_tx_ready = 0;                        spin_unlock_bh (&sched->kss_lock);                        if (!list_empty(&zlist)) {                                /* free zombie noop txs, it's fast because                                  * noop txs are just put in freelist */                                ksocknal_txlist_done(NULL, &zlist, 0);                        }                        rc = ksocknal_process_transmit(conn, tx);                        if (rc == -ENOMEM || rc == -EAGAIN) {                                /* Incomplete send: replace tx on HEAD of tx_queue */                                spin_lock_bh (&sched->kss_lock);                                list_add (&tx->tx_list, &conn->ksnc_tx_queue);                        } else {                                /* Complete send; tx -ref */                                ksocknal_tx_decref (tx);                                spin_lock_bh (&sched->kss_lock);                                /* assume space for more */                                conn->ksnc_tx_ready = 1;                        }                        if (rc == -ENOMEM) {                                /* Do nothing; after a short timeout, this                                 * conn will be reposted on kss_tx_conns. */                        } else if (conn->ksnc_tx_ready &&                                   !list_empty (&conn->ksnc_tx_queue)) {                                /* reschedule for tx */                                list_add_tail (&conn->ksnc_tx_list,                                                &sched->kss_tx_conns);                        } else {                                conn->ksnc_tx_scheduled = 0;                                /* drop my ref */                                ksocknal_conn_decref(conn);                        }                                                        did_something = 1;                }                if (!did_something ||           /* nothing to do */                    ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */                        spin_unlock_bh (&sched->kss_lock);                        nloops = 0;                        if (!did_something) {   /* wait for something to do */                                rc = wait_event_interruptible_exclusive(                                        sched->kss_waitq,                                        !ksocknal_sched_cansleep(sched));                                LASSERT (rc == 0);                        } else {                                our_cond_resched();                        }                        spin_lock_bh (&sched->kss_lock);                }        }        spin_unlock_bh (&sched->kss_lock);        ksocknal_thread_fini ();        return (0);}/* * Add connection to kss_rx_conns of scheduler * and wakeup the scheduler. */void ksocknal_read_callback (ksock_conn_t *conn){        ksock_sched_t *sched;         ENTRY;        sched = conn->ksnc_scheduler;         spin_lock_bh (&sched->kss_lock);        conn->ksnc_rx_ready = 1;         if (!conn->ksnc_rx_scheduled) {  /* not being progressed */                 list_add_tail(&conn->ksnc_rx_list,                               &sched->kss_rx_conns);                 conn->ksnc_rx_scheduled = 1;                 /* extra ref for scheduler */                 ksocknal_conn_addref(conn);                cfs_waitq_signal (&sched->kss_waitq);         }         spin_unlock_bh (&sched->kss_lock);        EXIT;} /* * Add connection to kss_tx_conns of scheduler * and wakeup the scheduler. */void ksocknal_write_callback (ksock_conn_t *conn){         ksock_sched_t *sched;         ENTRY;                sched = conn->ksnc_scheduler;         spin_lock_bh (&sched->kss_lock);        conn->ksnc_tx_ready = 1;         if (!conn->ksnc_tx_scheduled && // not being progressed             !list_empty(&conn->ksnc_tx_queue)){//packets to send                 list_add_tail (&conn->ksnc_tx_list,                                &sched->kss_tx_conns);                 conn->ksnc_tx_scheduled = 1;                 /* extra ref for scheduler */                 ksocknal_conn_addref(conn);                 cfs_waitq_signal (&sched->kss_waitq);         }         spin_unlock_bh (&sched->kss_lock);        EXIT;}ksock_proto_t *ksocknal_parse_proto_version (ksock_hello_msg_t *hello){        if ((hello->kshm_magic   == LNET_PROTO_MAGIC &&             hello->kshm_version == KSOCK_PROTO_V2) ||            (hello->kshm_magic   == __swab32(LNET_PROTO_MAGIC) &&             hello->kshm_version == __swab32(KSOCK_PROTO_V2))) {#if SOCKNAL_VERSION_DEBUG                if (*ksocknal_tunables.ksnd_protocol != 2)                        return NULL;#endif                return &ksocknal_protocol_v2x;        }        if (hello->kshm_magic == le32_to_cpu(LNET_PROTO_TCP_MAGIC)) {                lnet_magicversion_t *hmv = (lnet_magicversion_t *)hello;                CLASSERT (sizeof (lnet_magicversion_t) ==                          offsetof (ksock_hello_msg_t, kshm_src_nid));                if (hmv->version_major == cpu_to_le16 (KSOCK_PROTO_V1_MAJOR) &&                    hmv->version_minor == cpu_to_le16 (KSOCK_PROTO_V1_MINOR))                        return &ksocknal_protocol_v1x;        }        return NULL;}static intksocknal_send_hello_v1 (ksock_conn_t *conn, ksock_hello_msg_t *hello){        cfs_socket_t        *sock = conn->ksnc_sock;        lnet_hdr_t          *hdr;        lnet_magicversion_t *hmv;        int                  rc;        int                  i;        CLASSERT(sizeof(lnet_magicversion_t) == offsetof(lnet_hdr_t, src_nid));        LIBCFS_ALLOC(hdr, sizeof(*hdr));        if (hdr == NULL) {                CERROR("Can't allocate lnet_hdr_t\n");                return -ENOMEM;        }        hmv = (lnet_magicversion_t *)&hdr->dest_nid;        /* Re-organize V2.x message header to V1.x (lnet_hdr_t)         * header and send out */        hmv->magic         = cpu_to_le32 (LNET_PROTO_TCP_MAGIC);        hmv->version_major = cpu_to_le16 (KSOCK_PROTO_V1_MAJOR);        hmv->version_minor = cpu_to_le16 (KSOCK_PROTO_V1_MINOR);        if (the_lnet.ln_testprotocompat != 0) {                /* single-shot proto check */                LNET_LOCK();                if ((the_lnet.ln_testprotocompat & 1) != 0) {                        hmv->version_major++;   /* just different! */                        the_lnet.ln_testprotocompat &= ~1;                }                if ((the_lnet.ln_testprotocompat & 2) != 0) {                        hmv->magic = LNET_PROTO_MAGIC;                        the_lnet.ln_testprotocompat &= ~2;                }                LNET_UNLOCK();        }        hdr->src_nid        = c

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -