⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 zero-copy.6

📁 实现linux平台下零拷贝技术的软件包。
💻 6
📖 第 1 页 / 共 3 页
字号:
+ssize_t sock_sendfile(struct file *file, loff_t *ppos, size_t count, read_actor_t actor, void *target);   /*@@ -136,7 +142,8 @@ static struct file_operations socket_fil 	.fasync =	sock_fasync, 	.readv =	sock_readv, 	.writev =	sock_writev,-	.sendpage =	sock_sendpage+	.sendpage =	sock_sendpage,+	.sendfile =	sock_sendfile, };  /*@@ -726,6 +733,395 @@ static ssize_t sock_aio_write(struct kio 	return __sock_sendmsg(iocb, sock, &x->async_msg, size); } +int zc_sock_alloc_data(struct zc_buf *zb)+{+	struct sock *sk = zb->priv;+	unsigned long flags;+	struct zc_page *zp;+	int err = -ENODEV, need_wakeup = 0;+	unsigned int towrite = zb->size;+	struct sk_buff *skb = zb->skb;++	if (!sk || !sk->zc_page_num || !skb)+		goto out;++	spin_lock_irqsave(&sk->zc_lock, flags);+	if (!sk->zc_pages)+		goto out_unlock;++	BUG_ON(sk->zc_page_index + 1 > sk->zc_page_num);++	need_wakeup = 1;+	while (towrite) {+		zp = &sk->zc_pages[sk->zc_page_index];+		if (zp->size == zp->used || test_bit(ZC_PAGE_READY, &zp->flags)) {+			set_bit(ZC_PAGE_READY, &zp->flags);++			if (++sk->zc_page_index == sk->zc_page_num)+				sk->zc_page_index = 0;++			zp = &sk->zc_pages[sk->zc_page_index];+			if (zp->size == zp->used || test_bit(ZC_PAGE_READY, &zp->flags))+				break;+		}+		if (zp->size - zp->used < towrite && !zb->move_data)+			break;++		if (skb_shinfo(skb)->nr_frags >= MAX_SKB_FRAGS) {+			err = -ENOMEM;+			break;+		}+		/*+		 * Setup fragment with offset to point to the area where+		 * we actually can write without overwriting old data.+		 * Setup fragment size to be equal not to the real data size,+		 * but size of the area where we actually can write data into.+		 */+		skb_fill_page_desc(skb, skb_shinfo(skb)->nr_frags, zp->page, zp->page_offset+zp->used, zp->size-zp->used);++		printk("%s: [%1d.%1d] data=%p [%p], size=%4u, used=%4u, towrite=%4u, users=%u.\n", +				__func__, sk->zc_page_index, skb_shinfo(skb)->nr_frags-1,+				zp->page, page_address(zp->page) + zp->page_offset, +				zp->size, zp->used, towrite, sk->zc_users);++		if (zb->move_data) {+			unsigned int sz = min(zp->size - zp->used, towrite);++			err = zb->move_data(zb, sz);+			if (err <= 0)+				break;++			if (zp->used + err == zp->size) {+				printk("%s: [%1d.%1d] data=%p [%p], size=%4u, used=%4u, towrite=%4u, users=%u, page is ready.\n", +						__func__, sk->zc_page_index, skb_shinfo(skb)->nr_frags-1,+						zp->page, page_address(zp->page) + zp->page_offset, +						zp->size, zp->used, towrite, sk->zc_users);+				set_bit(ZC_PAGE_READY, &zp->flags);+				if (++sk->zc_page_index == sk->zc_page_num)+					sk->zc_page_index = 0;+			}+		} else +			err = zb->size;++		skb->len	+= err;+		skb->data_len	+= err;+		skb->truesize	+= err;++		towrite 	-= err;+		zp->used 	+= err;++		err = 0;+	}++	if (!err)+		sk->zc_users += skb->data_len;++out_unlock:+	spin_unlock_irqrestore(&sk->zc_lock, flags);+out:+	if (need_wakeup)+		wake_up(&sk->zc_data_ready);+	return err;+}++int zc_sock_commit_data(struct zc_buf *zb)+{+	struct sock *sk = zb->priv;+	unsigned long flags;+	struct zc_page *zp;++	spin_lock_irqsave(&sk->zc_lock, flags);++	BUG_ON(sk->zc_page_index + 1 > sk->zc_page_num);++	zp = &sk->zc_pages[sk->zc_page_index];++	if (unlikely(zb->size != zp->size)) {+		spin_unlock_irqrestore(&sk->zc_lock, flags);+		return 1;+	}++	if (zp->used == zp->size) {+		set_bit(ZC_PAGE_READY, &zp->flags);+		if (++sk->zc_page_index == sk->zc_page_num)+			sk->zc_page_index = 0;+	}+	spin_unlock_irqrestore(&sk->zc_lock, flags);++	wake_up(&sk->zc_data_ready);++	return 0;+}++extern struct page * __grab_cache_page(struct address_space *mapping, unsigned long index,+			struct page **cached_page, struct pagevec *lru_pvec);++static int commit_page(struct zc_page *zp, struct file *file, struct address_space *mapping)+{+	int err;+	struct address_space_operations *a_ops = mapping->a_ops;++	flush_dcache_page(zp->page);+	err = a_ops->commit_write(file, zp->page, zp->page_offset, zp->page_offset+zp->used);+	unlock_page(zp->page);+	mark_page_accessed(zp->page);+	page_cache_release(zp->page);+	if (zp->cached_page) {+		page_cache_release(zp->cached_page);+		zp->cached_page = NULL;+	}++	printk("%s: zp=%p, page=%p, page_offset=%u, used=%u, size=%u has been committed: err=%d.\n", +			__func__, zp, zp->page, zp->page_offset, zp->used, zp->size, err);++	if (err < 0)+		goto err_out_exit;++	balance_dirty_pages_ratelimited(mapping);++err_out_exit:+	return err;+}++static int prepare_page(struct zc_page *zp, struct file *file, struct address_space *mapping, +		loff_t *ppos, loff_t count, struct pagevec *lru_pvec)+{+	unsigned long index;+	unsigned long page_offset;+	unsigned long bytes;+	struct address_space_operations *a_ops = mapping->a_ops;+	loff_t pos_allocated = *ppos;+	int err = 0;++	page_offset = (pos_allocated & (PAGE_CACHE_SIZE -1));+	index = pos_allocated >> PAGE_CACHE_SHIFT;+	bytes = PAGE_CACHE_SIZE - page_offset;+	if (bytes > count)+		bytes = count;++	zp->page = __grab_cache_page(mapping, index, &zp->cached_page, lru_pvec);+	if (!zp->page) {+		err = -ENOMEM;+		goto err_out_exit;+	}++	err = a_ops->prepare_write(file, zp->page, page_offset, page_offset+bytes);+	if (unlikely(err)) {+		unlock_page(zp->page);+		page_cache_release(zp->page);+		goto err_out_exit;+	}++	zp->page_offset = page_offset;+	zp->size = bytes;+	zp->used = 0;+	clear_bit(ZC_PAGE_READY, &zp->flags);++	printk("%s: zp=%p, page=%p, page_offset=%u, used=%u, size=%u has been prepared: err=%d.\n", +			__func__, zp, zp->page, zp->page_offset, zp->used, zp->size, err);++	pos_allocated += bytes;++	*ppos = pos_allocated;++err_out_exit:+	return err;+}++/*+ * This should process all socket's related stuff,+ * for example emit TCP ACKs...+ * Since zero-copy skb can only have valid header,+ * this should process that header at skb->data.+ * skb_copy_datagram_iovec() is changed to not even touch+ * zero-copied skb.+ */++static u8 message_buf[PAGE_SIZE];++int receive_message(struct socket *sock, unsigned int ack_size)+{+	struct msghdr msg;+	struct kvec iov;+	int err;+	+	sock->sk->sk_allocation |= GFP_NOIO;+	iov.iov_base = message_buf;+	iov.iov_len = min(ack_size, sizeof(message_buf));+	msg.msg_name = NULL;+	msg.msg_namelen = 0;+	msg.msg_control = NULL;+	msg.msg_controllen = 0;+	msg.msg_namelen = 0;+	msg.msg_flags = MSG_NOSIGNAL | MSG_DONTWAIT;+	+	err = kernel_recvmsg(sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags);++	printk("%s: kernel_recvmsg returned %d, ack_size=%u.\n", __func__, err, ack_size);+	+	return err;+}++ssize_t sock_sendfile(struct file *in_file, loff_t *ppos, size_t count, read_actor_t actor, void *target)+{+	struct socket *sock;+	struct sock *sk;+	int err = 0;+	size_t written = 0;+	struct file *file = target;+	struct address_space *mapping = file->f_mapping;+	struct inode *inode = mapping->host;+	loff_t pos, pos_allocated;+	struct pagevec lru_pvec;+	unsigned long flags;+	int pnum_max = 16, i;+	unsigned int zc_page_index, ack_size;+	struct zc_page *zc_pages, *zp;++	if (!count)+		return 0;++	pos = pos_allocated = *ppos;+	err = generic_write_checks(file, &pos, &count, S_ISBLK(inode->i_mode));+	if (err)+		goto err_out_exit;++	sock = SOCKET_I(in_file->f_dentry->d_inode);++	if (!sock || !sock->sk) {+		err = -ENODEV;+		goto err_out_exit;+	}+	sk = sock->sk;++	pnum_max = ((count >> PAGE_CACHE_SHIFT) > pnum_max)?pnum_max:(count >> PAGE_CACHE_SHIFT);+	zc_pages = kzalloc(sizeof(struct zc_page) * pnum_max, GFP_KERNEL);+	if (!zc_pages) {+		err = -ENOMEM;+		goto err_out_exit;+	}++	pagevec_init(&lru_pvec, 0);+	+	err = 0;+	for (i=0; i<pnum_max; ++i) {+		zp = &zc_pages[i];+		+		err = prepare_page(zp, file, mapping, &pos_allocated, count, &lru_pvec);+		if (unlikely(err))+			goto err_out_release_pages;+	}++	zc_page_index = 0;+	+	spin_lock_irqsave(&sk->zc_lock, flags);+	sk->zc_pages 		= zc_pages;+	sk->zc_page_num 	= pnum_max;+	sk->zc_page_index 	= zc_page_index;+	sk->zc_alloc_data	= &zc_sock_alloc_data;+	sk->zc_commit_data	= &zc_sock_commit_data;+	spin_unlock_irqrestore(&sk->zc_lock, flags);++	printk("%s: sk=%p, %d pages have been set up.\n", __func__, sk, pnum_max);++	while (count) {+		struct zc_page *zp;++		interruptible_sleep_on_timeout(&sk->zc_data_ready, 5*HZ);++		printk("%s: wakeup: zc_page_index=%d, sk->zc_page_index=%d, sk->sk_state=%d.\n", +				__func__, zc_page_index, sk->zc_page_index, sk->sk_state);++		ack_size = 0;+		for (i=0; i<pnum_max; ++i) {+			zp = &zc_pages[i];++			if (test_bit(ZC_PAGE_READY, &zp->flags)) {+				printk("%s: checking page %p [%d]: page=%p, flags=%08lx, page_offset=%08x, size=%08x, used=%08x, written=%zx.\n", +						__func__, zp, i, zp->page, zp->flags, zp->page_offset, zp->size, zp->used, written);++				err = commit_page(zp, file, mapping);+				if (err)+					goto err_out_release_all_pages;++				count -= zp->used;+				written += zp->used;+				pos += zp->used;++				if (++zc_page_index >= pnum_max)+					zc_page_index = 0;++				ack_size += zp->used;++				err = prepare_page(zp, file, mapping, &pos_allocated, count, &lru_pvec);+			}+		}++		while (ack_size) {+			err = receive_message(sock, ack_size);+			if (err > 0) {+				spin_lock_irqsave(&sk->zc_lock, flags);+				sk->zc_users -= err;+				spin_unlock_irqrestore(&sk->zc_lock, flags);+				ack_size -= err;+			} else+				break;+		}++		if (signal_pending(current))+			break;+	}++	pagevec_lru_add(&lru_pvec);++	*ppos = pos;+	err = written;++err_out_release_all_pages:+	i = pnum_max;+err_out_release_pages:+	spin_lock_irqsave(&sk->zc_lock, flags);+	sk->zc_pages 		= NULL;+	sk->zc_page_num 	= 0;+	sk->zc_page_index 	= 0;+	sk->zc_alloc_data	= NULL;+	sk->zc_commit_data	= NULL;+	spin_unlock_irqrestore(&sk->zc_lock, flags);++	/*+	 * No new skbs can contribute data into VFS cache after this +	 * condition, so we only must care about those which are +	 * in socket queue already or will be inserted there after+	 * allocation, but allocation itself will always fail+	 * due to above locked changes.+	 */++	for (--i; i>=0; --i)+		commit_page(&zc_pages[i], file, mapping);++	while (sk->zc_users) {+		struct sk_buff *skb;+		+		interruptible_sleep_on_timeout(&sk->zc_data_ready, 5*HZ);+		+		printk("%s: going to flush receive queue: sk->zc_users=%u.\n",+				__func__, sk->zc_users);++		while ((skb = skb_dequeue(&sk->sk_receive_queue)) != NULL) {+			sk->zc_users -= skb->data_len;+			kfree_skb(skb);+		}+	}++	printk("%s: flushed: sk->zc_users=%u.\n", __func__, sk->zc_users);++	kfree(zc_pages);++err_out_exit:++	return err;+}+ static ssize_t sock_sendpage(struct file *file, struct page *page, 			     int offset, size_t size, loff_t *ppos, int more) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -