📄 zero-copy.10
字号:
+ *grow = 0;+ return ZC_OK;+ }+ if (size > idx->size) {+ *grow = diff + size - idx->off - idx->size;+ return ZC_GROW_UP;+ }+ }+ + if (diff + size >= idx->off + idx->size) {+ *grow = diff + size - idx->off - idx->size;+ return ZC_GROW_BOTH;+ } else {+ *grow = idx->off - diff;+ return ZC_GROW_DOWN;+ }++ BUG();++ return ZC_NEXT;+}++static int zc_check_seq(struct zsock *zsk, struct zc_page *zp, u16 diff, u16 size, u16 *grow, struct zc_index **idx_grow)+{+ struct zc_index *idx;+ int ret = ZC_NEXT;+ + if (likely(zp->idx_num <= ZC_MAX_IDX)) {+ int i;++ for (i=0; i<zp->idx_num; ++i) {+ idx = &zp->idx[i];+ + ret = zc_check_seq_index(idx, diff, size, grow);+ if (ret != ZC_NEXT) {+ *idx_grow = idx;+ return ret;+ }+ }+ } else {+ struct zc_index_list_entry *e;++ list_for_each_entry(e, &zp->idx_list, entry) {+ idx = &e->idx;+ + ret = zc_check_seq_index(idx, diff, size, grow);+ if (ret != ZC_NEXT) {+ *idx_grow = idx;+ return ret;+ }+ }+ }++ *idx_grow = NULL;+ *grow = size;++ return ZC_NEXT;+}++static int zc_commit_seq(struct zsock *zsk, struct zc_page *zp, u16 diff, u16 size, u16 grow, int status, struct zc_index *idx_grow)+{+ switch (status) {+ case ZC_OK:+ return 0;+ case ZC_NEXT:+ {+ struct zc_index *idx;+ + if (likely(zp->idx_num + 1 <= ZC_MAX_IDX)) {+ idx = &zp->idx[zp->idx_num];+ } else {+ struct zc_index_list_entry *e = mempool_alloc(idx_pool, GFP_ATOMIC);+ if (!e)+ return -ENOMEM;+ list_add_tail(&e->entry, &zp->idx_list);+ idx = &e->idx;+ }+ + idx->off = diff;+ idx->size = size;+ zp->idx_num++;+ return 0;+ }+ default:+ if (!idx_grow)+ return -EINVAL;+ idx_grow->off = diff;+ idx_grow->size = size;+ return 0;+ }++ return 0;+}++int zc_sock_alloc_data(struct zc_buf *zb)+{+ struct zsock *zsk = zb->priv;+ struct zc_page *zp;+ int err = 0;+ unsigned int towrite = zb->size, skb_len;+ struct sk_buff *skb = zb->skb;+ struct ethhdr *eth;+ struct iphdr *ip;+ int index, nocopy, state;+ u32 seq, ack=0, hsize, oseq;+ u16 sz, diff, grow;+ struct zc_index *idx;++ if (!zsk->zc_pages)+ goto out;++ eth = (struct ethhdr *)zb->header;+ ip = (struct iphdr *)(eth+1);++ if (ip->protocol == IPPROTO_TCP) {+ struct tcphdr *th = (struct tcphdr *)(((u8 *)ip) + ip->ihl*4);++ hsize = sizeof(struct tcphdr);+ oseq = seq = ntohl(th->seq);+ ack = ntohl(th->ack_seq);++ if (!towrite)+ seq = oseq = seq+1;++ /*+ * Is it possible to come here using two different pathes?+ * This means that skb_alloc_zerocopy() is called from different IRQ handlers+ * on different CPUs simultaneously for the same zero-copy socket.+ *+ * If so, then sequence number setup must be done under write lock being held+ * using sock_zc_setup_seq() from internals of TCP state machine.+ */+ if (!zsk->zc_seq_first)+ __sock_zc_setup_seq(zsk, seq);+ } else if (ip->protocol == IPPROTO_UDP) {+ hsize = 0;+ oseq = seq = zsk->zc_page_num * zsk->zc_page_index + zsk->zc_seq_first;+ } else+ goto out;++ skb_len = skb->len;+ + while (towrite > 0) {+ nocopy = 0;+ grow = 0;++ index = zc_calc_index(zsk, seq);+ if (index < 0) {+ err = -1;+ break;+ }+ + zp = &zsk->zc_pages[index];+ + diff = seq - zp->seq;++ sz = min(zp->size - zp->used, towrite);+ sz = min(zp->size - (zp->page_offset + diff), (unsigned int)sz);++ spin_lock(&zp->lock);++ state = zc_check_seq(zsk, zp, diff, sz, &grow, &idx);+ if (state == ZC_OK)+ nocopy = 1;++ if (test_bit(ZC_PAGE_READY, &zp->flags) || (zp->size == zp->used))+ nocopy = 1;+ if (zp->size - zp->used < towrite && !zb->move_data) {+ err = -1;+ goto unlock;+ }+ if (unlikely(skb_shinfo(skb)->nr_frags >= MAX_SKB_FRAGS)) {+ err = -ENOMEM;+ goto unlock;+ }++ /*+ * Setup fragment with offset to point to the area where+ * we actually can write without overwriting old data.+ * Setup fragment size to be equal not to the real data size,+ * but size of the area where we actually can write data into.+ */+ skb_fill_page_desc(skb, skb_shinfo(skb)->nr_frags, zp->page, zp->page_offset+diff, sz);++ if (zb->move_data) {+ if (nocopy)+ err = sz;+ else+ err = zb->move_data(zb, skb->len - skb_len, sz);++ if (err <= 0)+ goto unlock;+ } else + err = zb->size;++ if (zc_commit_seq(zsk, zp, diff, sz, grow, state, idx)) {+ err = -1;+ goto unlock;+ }++ skb->len += err;+ skb->data_len += err;+ skb->truesize += err;++ towrite -= err;++ zp->used += (state == ZC_OK)?err:grow;+ seq += err;++ err = 0;++unlock:+ spin_unlock(&zp->lock);+ if (err < 0)+ break;+ }++ seq = oseq;+ /*+ * Error happens when part or the whole packet can not be moved into some page.+ * It is most likely due to the fact, that sendfile() still has not committed+ * selected pages back to VFS.+ * Or sequence number is completely bogus.+ *+ * In case of uncommitted page, we very likely caught following problem:+ * part of the packet has been written into the previous page, but next page+ * contains old data which is not committed to VFS, and we can not overwrite them.+ * In this case we must fallback all writes to the previous pages, so we start+ * from the begining, select one by one the same pages as were selected for writing,+ * and decreases it's zp->used counter, so page starts looking like it was before.+ */+ if (err < 0) {+ towrite = zb->size - towrite;++ while (towrite) {+ index = zc_calc_index(zsk, oseq);+ if (index < 0) {+ err = -1;+ break;+ }+ zp = &zsk->zc_pages[index];++ spin_lock(&zp->lock);+ diff = oseq - zp->seq;+ sz = min(zp->size - (zp->page_offset + diff), towrite);+ zp->used -= sz;+ spin_unlock(&zp->lock);++ towrite -= sz;+ oseq += sz;+ }+ }++ for (index=0; index<zsk->zc_page_num; ++index) {+ zp = &zsk->zc_pages[index];++ if (zp->used == zp->size) {+ set_bit(ZC_PAGE_READY, &zp->flags);+ if (++zsk->zc_page_index == zsk->zc_page_num)+ zsk->zc_page_index = 0;+ }+ }++ set_bit(ZSK_DATA_READY, &zsk->zc_flags);++out:+ return err;+}++int zc_sock_commit_data(struct zc_buf *zb)+{+ struct zsock *zsk = zb->priv;+ struct zc_page *zp;++ if (!zsk->zc_pages)+ return -1;++ zp = &zsk->zc_pages[zsk->zc_page_index];++ if (unlikely(zb->size != zp->size))+ return 1;++ if (zp->used == zp->size) {+ set_bit(ZC_PAGE_READY, &zp->flags);+ if (++zsk->zc_page_index == zsk->zc_page_num)+ zsk->zc_page_index = 0;+ }+ + set_bit(ZSK_DATA_READY, &zsk->zc_flags);++ return 0;+}++/*+ * This should process all socket's related stuff,+ * for example emit TCP ACKs...+ * Since zero-copy skb can only have valid header,+ * this should process that header at skb->data.+ * skb_copy_datagram_iovec() is changed to not even touch+ * zero-copied skb.+ */+static u8 message_buf[PAGE_SIZE];++static int receive_message(struct socket *sock, unsigned int ack_size)+{+ struct msghdr msg;+ struct kvec iov;+ int err;++ sock->sk->sk_allocation |= GFP_NOIO;+ iov.iov_base = message_buf;+ iov.iov_len = min(ack_size, (unsigned int)sizeof(message_buf));+ msg.msg_name = NULL;+ msg.msg_namelen = 0;+ msg.msg_control = NULL;+ msg.msg_controllen = 0;+ msg.msg_namelen = 0;+ msg.msg_flags = MSG_NOSIGNAL | MSG_DONTWAIT;++ err = kernel_recvmsg(sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags);++ return err;+}++int tcp_udp_v4_sock_zc_init(struct socket *sock, struct tcp_udp_v4_priv *priv)+{+ struct file *file;+ struct zsock *zsk;+ struct zc_page *zc_pages, *zp;+ int pnum_max, err, i;+ unsigned long flags;+ struct address_space *mapping;+ struct inode *inode;+ size_t count;++ /*+ * Sane setup.+ */+ count = INT_MAX;+ pnum_max = priv->pnum;++ if (!sock->sk)+ return -EINVAL;++ err = -EBADF;+ file = fget(priv->fd);+ if (!file)+ goto err_out_exit;+ if (!(file->f_mode & FMODE_WRITE))+ goto err_out_fput;+ err = -ETXTBSY;+ if (file->f_mode & FMODE_ZEROCOPY)+ goto err_out_fput;+ err = -EINVAL;+ if (!file->f_op)+ goto err_out_fput;++ err = rw_verify_area(WRITE, file, &file->f_pos, count);+ if (err)+ goto err_out_fput;++ err = security_file_permission(file, MAY_WRITE);+ if (err)+ goto err_out_fput;++ err = -ENOMEM;+ zsk = zsk_alloc(&tcp_udp_v4_zc_handler, priv, sizeof(*priv), NULL, GFP_KERNEL);+ if (!zsk)+ goto err_out_fput;++ mapping = file->f_mapping;+ inode = mapping->host;+ + zc_pages = kzalloc(sizeof(struct zc_page) * pnum_max, GFP_KERNEL);+ if (!zc_pages) {+ err = -ENOMEM;+ goto err_out_zsk_put;+ }++ pagevec_init(&zsk->zc_lru_pvec, 0);++ err = 0;+ for (i=0; i<pnum_max; ++i) {+ zp = &zc_pages[i];++ spin_lock_init(&zp->lock);+ err = prepare_page(zp, zsk, file, mapping, &zsk->zc_pos, count, &zsk->zc_lru_pvec);+ if (unlikely(err))+ goto err_out_commit_pages;+ }++ file->f_mode |= FMODE_ZEROCOPY;++ write_lock_irqsave(&zsk->zc_lock, flags);+ zsk->zc_file = file;+ zsk->zc_pages = zc_pages;+ zsk->zc_page_num = pnum_max;+ zsk->zc_page_index = 0;+ zsk->zc_alloc_data = &zc_sock_alloc_data;+ zsk->zc_commit_data = &zc_sock_commit_data;+ zsk->sk = sock->sk;+ write_unlock_irqrestore(&zsk->zc_lock, flags);++ err = tcp_udp_v4_zc_sock_insert(zsk);+ if (err) {+ i = pnum_max;+ goto err_out_commit_pages;+ }+ + sock->sk->zsk = zsk;++ return 0;++ write_lock_irqsave(&zsk->zc_lock, flags);+ zsk->zc_file = NULL;+ zsk->zc_pages = 0;+ zsk->zc_page_num = 0;+ zsk->zc_page_index = 0;+ zsk->zc_alloc_data = NULL;+ zsk->zc_commit_data = NULL;+ zsk->sk = NULL;+ write_unlock_irqrestore(&zsk->zc_lock, flags);++err_out_commit_pages:+ for (--i; i>=0; --i)+ commit_page(&zc_pages[i], file, mapping);++ kfree(zc_pages);+err_out_zsk_put:+ zsk_put(zsk);+err_out_fput:+ file->f_mode &= ~FMODE_ZEROCOPY;+ fput(file);+err_out_exit:+ return err;+}++static ssize_t sock_sendfile(struct file *in_file, loff_t *ppos, size_t count, read_actor_t actor, void *target)+{+ struct socket *sock;+ struct sock *sk;+ int err = 0;+ size_t written = 0;+ struct file *file = target;+ struct address_space *mapping = file->f_mapping;+ struct inode *inode = mapping->host;+ int i;+ unsigned int ack_size;+ struct zsock *zsk;++ if (!count)+ return 0;++ sock = SOCKET_I(in_file->f_dentry->d_inode);++ if (!sock || !sock->sk || !sock->sk->zsk)+ return -ENODEV;+ sk = sock->sk;+ zsk = sk->zsk;+ sk->zsk = NULL;++ err = generic_write_checks(file, &zsk->zc_pos, &count, S_ISBLK(inode->i_mode));+ if (err)+ goto err_out_exit;++ if (!zsk->zc_pages) {+ err = -EINVAL;+ goto err_out_exit;+ }++ zsk_get(zsk);++ while (count) {+ struct zc_page *zp;++ wait_event_interruptible_timeout(zsk->zc_data_ready, test_and_clear_bit(ZSK_DATA_READY, &zsk->zc_flags), 5*HZ);++ ack_size = 0;+ for (i=0; i<zsk->zc_page_num; ++i) {+ zp = &zsk->zc_pages[i];++ if (test_bit(ZC_PAGE_READY, &zp->flags)) {+ err = commit_page(zp, file, mapping);+ if (err)+ goto err_out_release_all_pages;++ count -= zp->used;+ written += zp->used;+ ack_size += zp->used;++ err = prepare_page(zp, zsk, file, mapping, &zsk->zc_pos, count, &zsk->zc_lru_pvec);+ }+ }+#if 0+ if (!ack_size)+ ack_size = zxk->zc_page_num * sizeof(message_buf);+#endif+ while ((err = receive_message(sock, zsk->zc_page_num * sizeof(message_buf))) > 0) {+ ack_size -= err;+ }++ if (signal_pending(current))+ break;+ }++ pagevec_lru_add(&zsk->zc_lru_pvec);++ *ppos = written;+ err = written;++err_out_release_all_pages:++err_out_exit:+ sk_zc_fini(zsk);+ zc_cleanup(zsk);+ return err;+}+ static ssize_t sock_sendpage(struct file *file, struct page *page, int offset, size_t size, loff_t *ppos, int more) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -