📄 socklnd_cb.c
字号:
LASSERT (niov <= LNET_MAX_IOV); conn->ksnc_cookie = msg; conn->ksnc_rx_nob_wanted = mlen; conn->ksnc_rx_nob_left = rlen; if (mlen == 0 || iov != NULL) { conn->ksnc_rx_nkiov = 0; conn->ksnc_rx_kiov = NULL; conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov; conn->ksnc_rx_niov = lnet_extract_iov(LNET_MAX_IOV, conn->ksnc_rx_iov, niov, iov, offset, mlen); } else { conn->ksnc_rx_niov = 0; conn->ksnc_rx_iov = NULL; conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov; conn->ksnc_rx_nkiov = lnet_extract_kiov(LNET_MAX_IOV, conn->ksnc_rx_kiov, niov, kiov, offset, mlen); } LASSERT (mlen == lnet_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) + lnet_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov)); LASSERT (conn->ksnc_rx_scheduled); spin_lock_bh (&sched->kss_lock); switch (conn->ksnc_rx_state) { case SOCKNAL_RX_PARSE_WAIT: list_add_tail(&conn->ksnc_rx_list, &sched->kss_rx_conns); cfs_waitq_signal (&sched->kss_waitq); LASSERT (conn->ksnc_rx_ready); break; case SOCKNAL_RX_PARSE: /* scheduler hasn't noticed I'm parsing yet */ break; } conn->ksnc_rx_state = SOCKNAL_RX_LNET_PAYLOAD; spin_unlock_bh (&sched->kss_lock); ksocknal_conn_decref(conn); return (0);}static inline intksocknal_sched_cansleep(ksock_sched_t *sched){ int rc; spin_lock_bh (&sched->kss_lock); rc = (!ksocknal_data.ksnd_shuttingdown && list_empty(&sched->kss_rx_conns) && list_empty(&sched->kss_tx_conns)); spin_unlock_bh (&sched->kss_lock); return (rc);}int ksocknal_scheduler (void *arg){ ksock_sched_t *sched = (ksock_sched_t *)arg; ksock_conn_t *conn; ksock_tx_t *tx; int rc; int nloops = 0; int id = sched - ksocknal_data.ksnd_schedulers; char name[16]; snprintf (name, sizeof (name),"socknal_sd%02d", id); cfs_daemonize (name); cfs_block_allsigs ();#if defined(CONFIG_SMP) && defined(CPU_AFFINITY) id = ksocknal_sched2cpu(id); if (cpu_online(id)) { cpumask_t m = CPU_MASK_NONE; cpu_set(id, m); set_cpus_allowed(current, m); } else { CERROR ("Can't set CPU affinity for %s to %d\n", name, id); }#endif /* CONFIG_SMP && CPU_AFFINITY */ spin_lock_bh (&sched->kss_lock); while (!ksocknal_data.ksnd_shuttingdown) { int did_something = 0; /* Ensure I progress everything semi-fairly */ if (!list_empty (&sched->kss_rx_conns)) { conn = list_entry(sched->kss_rx_conns.next, ksock_conn_t, ksnc_rx_list); list_del(&conn->ksnc_rx_list); LASSERT(conn->ksnc_rx_scheduled); LASSERT(conn->ksnc_rx_ready); /* clear rx_ready in case receive isn't complete. * Do it BEFORE we call process_recv, since * data_ready can set it any time after we release * kss_lock. */ conn->ksnc_rx_ready = 0; spin_unlock_bh (&sched->kss_lock); rc = ksocknal_process_receive(conn); spin_lock_bh (&sched->kss_lock); /* I'm the only one that can clear this flag */ LASSERT(conn->ksnc_rx_scheduled); /* Did process_receive get everything it wanted? */ if (rc == 0) conn->ksnc_rx_ready = 1; if (conn->ksnc_rx_state == SOCKNAL_RX_PARSE) { /* Conn blocked waiting for ksocknal_recv() * I change its state (under lock) to signal * it can be rescheduled */ conn->ksnc_rx_state = SOCKNAL_RX_PARSE_WAIT; } else if (conn->ksnc_rx_ready) { /* reschedule for rx */ list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns); } else { conn->ksnc_rx_scheduled = 0; /* drop my ref */ ksocknal_conn_decref(conn); } did_something = 1; } if (!list_empty (&sched->kss_tx_conns)) { CFS_LIST_HEAD (zlist); if (!list_empty(&sched->kss_zombie_noop_txs)) { list_add(&zlist, &sched->kss_zombie_noop_txs); list_del_init(&sched->kss_zombie_noop_txs); } conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list); list_del (&conn->ksnc_tx_list); LASSERT(conn->ksnc_tx_scheduled); LASSERT(conn->ksnc_tx_ready); LASSERT(!list_empty(&conn->ksnc_tx_queue)); tx = list_entry(conn->ksnc_tx_queue.next, ksock_tx_t, tx_list); if (conn->ksnc_tx_mono == tx) ksocknal_next_mono_tx(conn); /* dequeue now so empty list => more to send */ list_del(&tx->tx_list); /* Clear tx_ready in case send isn't complete. Do * it BEFORE we call process_transmit, since * write_space can set it any time after we release * kss_lock. */ conn->ksnc_tx_ready = 0; spin_unlock_bh (&sched->kss_lock); if (!list_empty(&zlist)) { /* free zombie noop txs, it's fast because * noop txs are just put in freelist */ ksocknal_txlist_done(NULL, &zlist, 0); } rc = ksocknal_process_transmit(conn, tx); if (rc == -ENOMEM || rc == -EAGAIN) { /* Incomplete send: replace tx on HEAD of tx_queue */ spin_lock_bh (&sched->kss_lock); list_add (&tx->tx_list, &conn->ksnc_tx_queue); } else { /* Complete send; tx -ref */ ksocknal_tx_decref (tx); spin_lock_bh (&sched->kss_lock); /* assume space for more */ conn->ksnc_tx_ready = 1; } if (rc == -ENOMEM) { /* Do nothing; after a short timeout, this * conn will be reposted on kss_tx_conns. */ } else if (conn->ksnc_tx_ready && !list_empty (&conn->ksnc_tx_queue)) { /* reschedule for tx */ list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns); } else { conn->ksnc_tx_scheduled = 0; /* drop my ref */ ksocknal_conn_decref(conn); } did_something = 1; } if (!did_something || /* nothing to do */ ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */ spin_unlock_bh (&sched->kss_lock); nloops = 0; if (!did_something) { /* wait for something to do */ rc = wait_event_interruptible_exclusive( sched->kss_waitq, !ksocknal_sched_cansleep(sched)); LASSERT (rc == 0); } else { our_cond_resched(); } spin_lock_bh (&sched->kss_lock); } } spin_unlock_bh (&sched->kss_lock); ksocknal_thread_fini (); return (0);}/* * Add connection to kss_rx_conns of scheduler * and wakeup the scheduler. */void ksocknal_read_callback (ksock_conn_t *conn){ ksock_sched_t *sched; ENTRY; sched = conn->ksnc_scheduler; spin_lock_bh (&sched->kss_lock); conn->ksnc_rx_ready = 1; if (!conn->ksnc_rx_scheduled) { /* not being progressed */ list_add_tail(&conn->ksnc_rx_list, &sched->kss_rx_conns); conn->ksnc_rx_scheduled = 1; /* extra ref for scheduler */ ksocknal_conn_addref(conn); cfs_waitq_signal (&sched->kss_waitq); } spin_unlock_bh (&sched->kss_lock); EXIT;} /* * Add connection to kss_tx_conns of scheduler * and wakeup the scheduler. */void ksocknal_write_callback (ksock_conn_t *conn){ ksock_sched_t *sched; ENTRY; sched = conn->ksnc_scheduler; spin_lock_bh (&sched->kss_lock); conn->ksnc_tx_ready = 1; if (!conn->ksnc_tx_scheduled && // not being progressed !list_empty(&conn->ksnc_tx_queue)){//packets to send list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns); conn->ksnc_tx_scheduled = 1; /* extra ref for scheduler */ ksocknal_conn_addref(conn); cfs_waitq_signal (&sched->kss_waitq); } spin_unlock_bh (&sched->kss_lock); EXIT;}ksock_proto_t *ksocknal_parse_proto_version (ksock_hello_msg_t *hello){ if ((hello->kshm_magic == LNET_PROTO_MAGIC && hello->kshm_version == KSOCK_PROTO_V2) || (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC) && hello->kshm_version == __swab32(KSOCK_PROTO_V2))) {#if SOCKNAL_VERSION_DEBUG if (*ksocknal_tunables.ksnd_protocol != 2) return NULL;#endif return &ksocknal_protocol_v2x; } if (hello->kshm_magic == le32_to_cpu(LNET_PROTO_TCP_MAGIC)) { lnet_magicversion_t *hmv = (lnet_magicversion_t *)hello; CLASSERT (sizeof (lnet_magicversion_t) == offsetof (ksock_hello_msg_t, kshm_src_nid)); if (hmv->version_major == cpu_to_le16 (KSOCK_PROTO_V1_MAJOR) && hmv->version_minor == cpu_to_le16 (KSOCK_PROTO_V1_MINOR)) return &ksocknal_protocol_v1x; } return NULL;}static intksocknal_send_hello_v1 (ksock_conn_t *conn, ksock_hello_msg_t *hello){ cfs_socket_t *sock = conn->ksnc_sock; lnet_hdr_t *hdr; lnet_magicversion_t *hmv; int rc; int i; CLASSERT(sizeof(lnet_magicversion_t) == offsetof(lnet_hdr_t, src_nid)); LIBCFS_ALLOC(hdr, sizeof(*hdr)); if (hdr == NULL) { CERROR("Can't allocate lnet_hdr_t\n"); return -ENOMEM; } hmv = (lnet_magicversion_t *)&hdr->dest_nid; /* Re-organize V2.x message header to V1.x (lnet_hdr_t) * header and send out */ hmv->magic = cpu_to_le32 (LNET_PROTO_TCP_MAGIC); hmv->version_major = cpu_to_le16 (KSOCK_PROTO_V1_MAJOR); hmv->version_minor = cpu_to_le16 (KSOCK_PROTO_V1_MINOR); if (the_lnet.ln_testprotocompat != 0) { /* single-shot proto check */ LNET_LOCK(); if ((the_lnet.ln_testprotocompat & 1) != 0) { hmv->version_major++; /* just different! */ the_lnet.ln_testprotocompat &= ~1; } if ((the_lnet.ln_testprotocompat & 2) != 0) { hmv->magic = LNET_PROTO_MAGIC; the_lnet.ln_testprotocompat &= ~2; } LNET_UNLOCK(); } hdr->src_nid = c
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -