📄 zero-copy.7
字号:
+ .sendpage = sock_sendpage,+ .sendfile = sock_sendfile, }; /*@@ -517,6 +524,7 @@ void sock_release(struct socket *sock) sock->ops->release(sock); sock->ops = NULL;+ sk_zc_fini(sock->sk); module_put(owner); } @@ -726,6 +734,460 @@ static ssize_t sock_aio_write(struct kio return __sock_sendmsg(iocb, sock, &x->async_msg, size); } +int zc_sock_alloc_data(struct zc_buf *zb)+{+ struct sock *sk = zb->priv;+ struct zc_page *zp;+ int err = -ENODEV;+ unsigned int towrite = zb->size;+ struct sk_buff *skb = zb->skb;++ if (!sk->zc_pages)+ goto out;++ while (towrite) {+ zp = &sk->zc_pages[sk->zc_page_index];+ if (zp->size == zp->used || test_bit(ZC_PAGE_READY, &zp->flags)) {+ set_bit(ZC_PAGE_READY, &zp->flags);++ if (++sk->zc_page_index == sk->zc_page_num)+ sk->zc_page_index = 0;++ zp = &sk->zc_pages[sk->zc_page_index];+ if (zp->size == zp->used || test_bit(ZC_PAGE_READY, &zp->flags))+ break;+ }+ if (zp->size - zp->used < towrite && !zb->move_data)+ break;++ if (skb_shinfo(skb)->nr_frags >= MAX_SKB_FRAGS) {+ err = -ENOMEM;+ break;+ }+ /*+ * Setup fragment with offset to point to the area where+ * we actually can write without overwriting old data.+ * Setup fragment size to be equal not to the real data size,+ * but size of the area where we actually can write data into.+ */+ skb_fill_page_desc(skb, skb_shinfo(skb)->nr_frags, zp->page, zp->page_offset+zp->used, zp->size-zp->used);++ printk("%s: [%1d.%1d] data=%p, size=%4u, used=%4u, towrite=%4u.\n", + __func__, sk->zc_page_index, skb_shinfo(skb)->nr_frags-1,+ zp->page, zp->size, zp->used, towrite);++ if (zb->move_data) {+ unsigned int sz = min(zp->size - zp->used, towrite);++ err = zb->move_data(zb, sz);+ if (err <= 0)+ break;++ if (zp->used + err == zp->size) {+ printk("%s: [%1d.%1d] data=%p, size=%4u, used=%4u, towrite=%4u, page is ready.\n", + __func__, sk->zc_page_index, skb_shinfo(skb)->nr_frags-1,+ zp->page, zp->size, zp->used, towrite);+ set_bit(ZC_PAGE_READY, &zp->flags);+ if (++sk->zc_page_index == sk->zc_page_num)+ sk->zc_page_index = 0;+ }+ } else + err = zb->size;++ skb->len += err;+ skb->data_len += err;+ skb->truesize += err;++ towrite -= err;+ zp->used += err;++ err = 0;+ }++out:+ return err;+}++int zc_sock_commit_data(struct zc_buf *zb)+{+ struct sock *sk = zb->priv;+ struct zc_page *zp;++ if (!sk->zc_pages)+ return -1;++ zp = &sk->zc_pages[sk->zc_page_index];++ if (unlikely(zb->size != zp->size))+ return 1;++ if (zp->used == zp->size) {+ set_bit(ZC_PAGE_READY, &zp->flags);+ if (++sk->zc_page_index == sk->zc_page_num)+ sk->zc_page_index = 0;+ }++ return 0;+}++extern struct page * __grab_cache_page(struct address_space *mapping, unsigned long index,+ struct page **cached_page, struct pagevec *lru_pvec);++static int commit_page(struct zc_page *zp, struct file *file, struct address_space *mapping)+{+ int err;+ struct address_space_operations *a_ops = mapping->a_ops;++ flush_dcache_page(zp->page);+ err = a_ops->commit_write(file, zp->page, zp->page_offset, zp->page_offset+zp->used);+ unlock_page(zp->page);+ mark_page_accessed(zp->page);+ page_cache_release(zp->page);++ printk("%s: zp=%p, page=%p, page_offset=%u, used=%u, size=%u has been committed: err=%d.\n", + __func__, zp, zp->page, zp->page_offset, zp->used, zp->size, err);++ if (err < 0)+ goto err_out_exit;++ balance_dirty_pages_ratelimited(mapping);++err_out_exit:+ return err;+}++static int prepare_page(struct zc_page *zp, struct sock *sk, struct file *file, struct address_space *mapping, + loff_t *ppos, loff_t count, struct pagevec *lru_pvec)+{+ unsigned long index;+ unsigned long page_offset;+ unsigned long bytes;+ struct address_space_operations *a_ops = mapping->a_ops;+ loff_t pos_allocated = *ppos;+ int err = 0;++ page_offset = (pos_allocated & (PAGE_CACHE_SIZE -1));+ index = pos_allocated >> PAGE_CACHE_SHIFT;+ bytes = PAGE_CACHE_SIZE - page_offset;+ if (bytes > count)+ bytes = count;++ zp->page = __grab_cache_page(mapping, index, &sk->zc_cached_page, lru_pvec);+ if (!zp->page) {+ err = -ENOMEM;+ goto err_out_exit;+ }++ err = a_ops->prepare_write(file, zp->page, page_offset, page_offset+bytes);+ if (unlikely(err)) {+ unlock_page(zp->page);+ page_cache_release(zp->page);+ goto err_out_exit;+ }++ zp->page_offset = page_offset;+ zp->size = bytes;+ zp->used = 0;+ clear_bit(ZC_PAGE_READY, &zp->flags);++ printk("%s: zp=%p, page=%p, page_offset=%u, used=%u, size=%u has been prepared: err=%d.\n", + __func__, zp, zp->page, zp->page_offset, zp->used, zp->size, err);++ pos_allocated += bytes;++ *ppos = pos_allocated;++err_out_exit:+ return err;+}++/*+ * This should process all socket's related stuff,+ * for example emit TCP ACKs...+ * Since zero-copy skb can only have valid header,+ * this should process that header at skb->data.+ * skb_copy_datagram_iovec() is changed to not even touch+ * zero-copied skb.+ */++static u8 message_buf[PAGE_SIZE];++int receive_message(struct socket *sock, unsigned int ack_size)+{+ struct msghdr msg;+ struct kvec iov;+ int err;++ sock->sk->sk_allocation |= GFP_NOIO;+ iov.iov_base = message_buf;+ iov.iov_len = min(ack_size, (unsigned int)sizeof(message_buf));+ msg.msg_name = NULL;+ msg.msg_namelen = 0;+ msg.msg_control = NULL;+ msg.msg_controllen = 0;+ msg.msg_namelen = 0;+ msg.msg_flags = MSG_NOSIGNAL | MSG_DONTWAIT;++ err = kernel_recvmsg(sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags);++ printk("%s: kernel_recvmsg returned %d, ack_size=%u.\n", __func__, err, ack_size);++ return err;+}++void sk_zc_fini(struct sock *sk)+{+ if (sk) {+ unsigned int zc_page_num;+ struct zc_page *zc_pages;+ unsigned long flags;+ + spin_lock_irqsave(&sk->zc_lock, flags);+ zc_page_num = sk->zc_page_num;+ zc_pages = sk->zc_pages;+ + sk->zc_pages = NULL;+ sk->zc_page_num = 0;+ sk->zc_page_index = 0;+ sk->zc_alloc_data = NULL;+ sk->zc_commit_data = NULL;+ spin_unlock_irqrestore(&sk->zc_lock, flags);++ /*+ * No new skbs can contribute data into VFS cache after this + * condition, so we only must care about those which are + * in socket queue already or will be inserted there after+ * allocation, but allocation itself will always fail+ * due to above locked changes.+ */++ skb_queue_purge(&sk->sk_receive_queue);++ if (zc_page_num) {+ struct address_space *mapping = sk->zc_file->f_mapping;+ int i;+ + if (sk->zc_cached_page) {+ page_cache_release(sk->zc_cached_page);+ sk->zc_cached_page = NULL;+ }++ for (i=0; i<zc_page_num; ++i)+ commit_page(&zc_pages[i], sk->zc_file, mapping);++ sk->zc_file->f_mode &= ~FMODE_ZEROCOPY;+ fput(sk->zc_file);+ }++ kfree(zc_pages);+ }+}++void sk_zc_init(struct sock *sk)+{+ spin_lock_init(&sk->zc_lock);+ init_waitqueue_head(&sk->zc_data_ready);+ sk->zc_pages = NULL;+ sk->zc_page_num = 0;+ sk->zc_page_index = 0;+ sk->zc_alloc_data = NULL;+ sk->zc_commit_data = NULL;+ sk->zc_file = NULL;+ sk->zc_cached_page = NULL;+}++int sock_zc_init(struct socket *sock, int fd)+{+ struct file *file;+ struct sock *sk;+ struct zc_page *zc_pages, *zp;+ int pnum_max, err, i;+ unsigned long flags;+ struct address_space *mapping;+ struct inode *inode;+ size_t count;++ /*+ * Sane setup.+ */+ count = INT_MAX;+ pnum_max = 16;++ err = -EBADF;+ file = fget(fd);+ if (!file)+ goto err_out_exit;+ if (!(file->f_mode & FMODE_WRITE))+ goto err_out_fput;+ err = -ETXTBSY;+ if (file->f_mode & FMODE_ZEROCOPY)+ goto err_out_fput;+ err = -EINVAL;+ if (!file->f_op)+ goto err_out_fput;++ err = rw_verify_area(WRITE, file, &file->f_pos, count);+ if (err)+ goto err_out_fput;++ err = security_file_permission(file, MAY_WRITE);+ if (err)+ goto err_out_fput;++ sk = sock->sk;+ mapping = file->f_mapping;+ inode = mapping->host;+ + sk->zc_pos = 0;++ zc_pages = kzalloc(sizeof(struct zc_page) * pnum_max, GFP_KERNEL);+ if (!zc_pages) {+ err = -ENOMEM;+ goto err_out_fput;+ }++ pagevec_init(&sk->zc_lru_pvec, 0);++ err = 0;+ for (i=0; i<pnum_max; ++i) {+ zp = &zc_pages[i];++ err = prepare_page(zp, sk, file, mapping, &sk->zc_pos, count, &sk->zc_lru_pvec);+ if (unlikely(err))+ goto err_out_commit_pages;+ }++ file->f_mode |= FMODE_ZEROCOPY;++ spin_lock_irqsave(&sk->zc_lock, flags);+ sk->zc_file = file;+ sk->zc_pages = zc_pages;+ sk->zc_page_num = pnum_max;+ sk->zc_page_index = 0;+ sk->zc_alloc_data = &zc_sock_alloc_data;+ sk->zc_commit_data = &zc_sock_commit_data;+ spin_unlock_irqrestore(&sk->zc_lock, flags);++ printk("%s: sk=%p, %d pages have been set up.\n", __func__, sk, pnum_max);++ return 0;+#if 0+err_out_release_pages:+ spin_lock_irqsave(&sk->zc_lock, flags);+ sk->zc_pages = NULL;+ sk->zc_page_num = 0;+ sk->zc_page_index = 0;+ sk->zc_alloc_data = NULL;+ sk->zc_commit_data = NULL;+ spin_unlock_irqrestore(&sk->zc_lock, flags);+#endif+err_out_commit_pages:+ for (--i; i>=0; --i)+ commit_page(&zc_pages[i], file, mapping);++ kfree(zc_pages);+err_out_fput:+ fput(file);+err_out_exit:+ return err;+}++ssize_t sock_sendfile(struct file *in_file, loff_t *ppos, size_t count, read_actor_t actor, void *target)+{+ struct socket *sock;+ struct sock *sk;+ int err = 0;+ size_t written = 0;+ struct file *file = target;+ struct address_space *mapping = file->f_mapping;+ struct inode *inode = mapping->host;+ unsigned long flags;+ int i;+ unsigned int ack_size, zc_page_index = 0;++ if (!count)+ return 0;++ sock = SOCKET_I(in_file->f_dentry->d_inode);++ if (!sock || !sock->sk) {+ err = -ENODEV;+ goto err_out_exit;+ }+ sk = sock->sk;++ err = generic_write_checks(file, &sk->zc_pos, &count, S_ISBLK(inode->i_mode));+ if (err)+ goto err_out_exit;++ spin_lock_irqsave(&sk->zc_lock, flags);+ if (!sk->zc_pages) {+ spin_unlock_irqrestore(&sk->zc_lock, flags);+ err = -EINVAL;+ goto err_out_exit;+ }+ spin_unlock_irqrestore(&sk->zc_lock, flags);++ while (count) {+ struct zc_page *zp;++ wait_event_interruptible_timeout(sk->zc_data_ready, sk->zc_page_index != zc_page_index, 5*HZ);+ zc_page_index = sk->zc_page_index;++ printk("%s: wakeup: sk->zc_page_index=%d, sk->sk_state=%d.\n", + __func__, sk->zc_page_index, sk->sk_state);++ ack_size = 0;+ for (i=0; i<sk->zc_page_num; ++i) {+ zp = &sk->zc_pages[i];++ if (test_bit(ZC_PAGE_READY, &zp->flags)) {+ printk("%s: checking page %p [%d]: page=%p, flags=%08lx, page_offset=%08x, size=%08x, used=%08x, written=%zx.\n", + __func__, zp, i, zp->page, zp->flags, zp->page_offset, zp->size, zp->used, written);++ err = commit_page(zp, file, mapping);+ if (err)+ goto err_out_release_all_pages;++ count -= zp->used;+ written += zp->used;+ ack_size += zp->used;++ err = prepare_page(zp, sk, file, mapping, &sk->zc_pos, count, &sk->zc_lru_pvec);+ }+ }++ printk("%s: Going to ack %u bytes.\n", __func__, ack_size);++ while (ack_size > 0) {+ err = receive_message(sock, ack_size);+ if (err > 0) {+ ack_size -= err;+ } else+ break;+ }++ if (signal_pending(current))+ break;+ }++ pagevec_lru_add(&sk->zc_lru_pvec);++ *ppos = written;+ err = written;++err_out_release_all_pages:+ i = sk->zc_page_num;+ + printk("%s: Releasing zero-copy socket sk=%p.\n", __func__, sk);++ sk_zc_fini(sk);++err_out_exit:++ return err;+}+ static ssize_t sock_sendpage(struct file *file, struct page *page, int offset, size_t size, loff_t *ppos, int more) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -