varp_socket.c
来自「xen虚拟机源代码安装包」· C语言 代码 · 共 768 行 · 第 1/2 页
C
768 行
int sock; struct sockaddr_in addr_in; struct sockaddr *addr = (struct sockaddr *)&addr_in; int addr_n = sizeof(addr_in); int sockproto = 0; //dprintf(">\n"); addr_in.sin_family = AF_INET; addr_in.sin_addr.s_addr = saddr; addr_in.sin_port = port; dprintf("> flags=%s addr=%u.%u.%u.%u port=%d\n", socket_flags(flags), NIPQUAD(saddr), ntohs(port)); switch(socktype){ case SOCK_DGRAM: sockproto = IPPROTO_UDP; break; case SOCK_STREAM: sockproto = IPPROTO_TCP; break; } sock = socket(AF_INET, socktype, sockproto); if(sock < 0) goto exit; if(flags & VSOCK_REUSE){ err = setsock_reuse(sock, 1); if(err < 0) goto exit; } if(flags & VSOCK_BROADCAST){ err = setsock_broadcast(sock, 1); if(err < 0) goto exit; } if(flags & VSOCK_MULTICAST){ err = setsock_multicast(sock, saddr); if(err < 0) goto exit; } if(flags & VSOCK_CONNECT){ err = connect(sock, addr, addr_n); if(err < 0) goto exit; } if(flags & VSOCK_BIND){ err = bind(sock, addr, addr_n); if(err < 0) goto exit; } if(flags & VSOCK_NONBLOCK){ err = fcntl(sock, F_SETFL, O_NONBLOCK); if(err < 0) goto exit; } exit: *val = (err ? -1 : sock); if(err) eprintf("> err=%d errno=%d\n", err, errno); return err;}/** Open the varp multicast socket. * * @param mcaddr multicast address * @param port port * @param val return parameter for the socket * @return 0 on success, error code otherwise */int varp_mcast_open(uint32_t mcaddr, uint16_t port, int *val){ int err = 0; int flags = VSOCK_REUSE; int sock = 0; dprintf(">\n"); flags |= VSOCK_MULTICAST; flags |= VSOCK_BROADCAST; err = create_socket(SOCK_DGRAM, mcaddr, port, flags, &sock); if(err < 0) goto exit; if(MULTICAST(mcaddr)){ err = setsock_multicast_ttl(sock, 1); if(err < 0) goto exit; } exit: if(err){ shutdown(sock, 2); } *val = (err ? -1 : sock); dprintf("< err=%d val=%d\n", err, *val); return err;}/** Open the varp unicast socket. * * @param addr address * @param port port * @param val return parameter for the socket * @return 0 on success, error code otherwise */int varp_ucast_open(uint32_t addr, u16 port, int *val){ int err = 0; int flags = (VSOCK_BIND | VSOCK_REUSE); dprintf(">\n"); err = create_socket(SOCK_DGRAM, addr, port, flags, val); dprintf("< err=%d val=%d\n", err, *val); return err;}/** * Return code > 0 means the handler owns the packet. * Return code <= 0 means we still own it, with < 0 meaning * an error. */static int handle_varp_skb(struct sk_buff *skb){ int err = 0; switch(skb->pkt_type){ case PACKET_BROADCAST: case PACKET_MULTICAST: vnet_forward_send(skb); /* Fall through. */ case PACKET_HOST: err = varp_handle_message(skb); break; case PACKET_OTHERHOST: dprintf("> PACKET_OTHERHOST\n"); break; case PACKET_OUTGOING: dprintf("> PACKET_OUTGOING\n"); break; case PACKET_FASTROUTE: dprintf("> PACKET_FASTROUTE\n"); break; case PACKET_LOOPBACK: // Outbound mcast/bcast are echoed with this type. Drop. dprintf("> LOOP src=" IPFMT " dst=" IPFMT " dev=%s\n", NIPQUAD(skb->nh.iph->saddr), NIPQUAD(skb->nh.iph->daddr), (skb->dev ? skb->dev->name : "??")); default: // Drop. break; } if(err <= 0){ kfree_skb(skb); } return (err < 0 ? err : 0);}/** Handle some skbs on a varp socket (if any). * * @param fd socket file descriptor * @param n maximum number of skbs to handle * @return number of skbs handled */static int handle_varp_sock(int fd, int n){ int ret = 0; int err = 0; struct sk_buff *skb; struct socket *sock = NULL; sock = sockfd_lookup(fd, &err); if (!sock){ wprintf("> no sock for fd=%d\n", fd); goto exit; } for( ; ret < n; ret++){ if(!sock->sk) break; skb = skb_dequeue(&sock->sk->sk_receive_queue); if(!skb) break; // Call the skb destructor so it isn't charged to the socket anymore. // An skb from a socket receive queue is charged to the socket // by skb_set_owner_r() until its destructor is called. // If the destructor is not called the socket will run out of // receive queue space and be unable to accept incoming skbs. // The destructor used is sock_rfree(), see 'include/net/sock.h'. // Other destructors: sock_wfree, sk_stream_rfree. skb_orphan(skb); handle_varp_skb(skb); } sockfd_put(sock); exit: dprintf("< ret=%d\n", ret); return ret;}/** Add a wait queue to a socket. * * @param fd socket file descriptor * @param waitq queue * @return 0 on success, error code otherwise */int sock_add_wait_queue(int fd, wait_queue_t *waitq){ int err = -EINVAL; struct socket *sock = NULL; if(fd < 0) goto exit; sock = sockfd_lookup(fd, &err); if (!sock) goto exit; add_wait_queue(sock->sk->sk_sleep, waitq); sockfd_put(sock); err = 0; exit: return err;}/** Remove a wait queue from a socket. * * @param fd socket file descriptor * @param waitq queue * @return 0 on success, error code otherwise */int sock_remove_wait_queue(int fd, wait_queue_t *waitq){ int err = -EINVAL; struct socket *sock = NULL; if(fd < 0) goto exit; sock = sockfd_lookup(fd, &err); if (!sock) goto exit; remove_wait_queue(sock->sk->sk_sleep, waitq); sockfd_put(sock); err = 0; exit: return err;}#if 0// Default data ready function on a socket.static void sock_def_readable(struct sock *sk, int len){ read_lock(&sk->sk_callback_lock); if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) wake_up_interruptible(sk->sk_sleep); sk_wake_async(sk,1,POLL_IN); read_unlock(&sk->sk_callback_lock);}#endifstatic void sock_data_ready(struct sock *sk, int len){ struct sk_buff *skb; //read_lock(&sk->sk_callback_lock); skb = skb_dequeue(&sk->sk_receive_queue); if(skb){ skb_orphan(skb); } //read_unlock(&sk->sk_callback_lock); if(skb){ handle_varp_skb(skb); }}/** Set the data ready callback on a socket. */int sock_set_callback(int fd){ int err = -EINVAL; struct socket *sock = NULL; if(fd < 0) goto exit; sock = sockfd_lookup(fd, &err); if (!sock) goto exit; sock->sk->sk_data_ready = sock_data_ready; sockfd_put(sock); err = 0; exit: return err;}/** Open the sockets. */int varp_sockets_open(u32 mcaddr, u16 port){ int err = 0; mm_segment_t oldfs; dprintf("> mcaddr=%u.%u.%u.%u port=%u\n", NIPQUAD(mcaddr), ntohs(port)); oldfs = change_fs(KERNEL_DS); err = varp_mcast_open(mcaddr, port, &varp_mcast_sock); if(err < 0 ) goto exit; err = varp_ucast_open(INADDR_ANY, port, &varp_ucast_sock); if(err < 0 ) goto exit; sock_set_callback(varp_ucast_sock); sock_set_callback(varp_mcast_sock); exit: set_fs(oldfs); dprintf("< err=%d\n", err); return err;} /** Close the sockets. */void varp_sockets_close(void){ mm_segment_t oldfs; oldfs = change_fs(KERNEL_DS); if(varp_mcast_sock >= 0){ shutdown(varp_mcast_sock, 2); varp_mcast_sock = -1; } if(varp_ucast_sock >= 0){ shutdown(varp_ucast_sock, 2); varp_ucast_sock = -1; } set_fs(oldfs);}/** Loop handling the varp sockets. * We use kernel API for this (waitqueue, schedule_timeout) instead * of select because the select syscall was returning EFAULT. Oh well. * * @param arg arguments * @return exit code */int varp_main(void *arg){ int err = 0; long timeout = 1 * HZ; int count = 0; DECLARE_WAITQUEUE(mcast_wait, current); DECLARE_WAITQUEUE(ucast_wait, current); dprintf("> start\n"); snprintf(current->comm, sizeof(current->comm), "varp_main"); err = sock_add_wait_queue(varp_mcast_sock, &mcast_wait); if(err) goto exit_mcast_sock; err = sock_add_wait_queue(varp_ucast_sock, &ucast_wait); if(err) goto exit_ucast_sock; atomic_set(&varp_state, VARP_STATE_RUNNING); for( ; atomic_read(&varp_run); ){ count = 0; count += handle_varp_sock(varp_mcast_sock, 1); count += handle_varp_sock(varp_ucast_sock, 16); if(!count){ if(!atomic_read(&varp_run)) break; // No skbs were handled, go to sleep. set_current_state(TASK_INTERRUPTIBLE); schedule_timeout(timeout); __set_current_state(TASK_RUNNING); } } exit_ucast_sock: sock_remove_wait_queue(varp_ucast_sock, &ucast_wait); exit_mcast_sock: sock_remove_wait_queue(varp_mcast_sock, &mcast_wait); varp_sockets_close(); if(err){ eprintf("%s< err=%d\n", __FUNCTION__, err); } varp_thread_err = err; atomic_set(&varp_state, VARP_STATE_EXITED); //MOD_DEC_USE_COUNT; return err;}/** Close the varp sockets and stop the thread handling them. */void varp_close(void){ int tries = 10; dprintf(">\n"); // Tell the varp thread to stop and wait a while for it. atomic_set(&varp_run, 0); while(atomic_read(&varp_state) == VARP_STATE_RUNNING && tries-- > 0){ set_current_state(TASK_INTERRUPTIBLE); schedule_timeout(HZ / 2); __set_current_state(TASK_RUNNING); } //MOD_DEC_USE_COUNT; dprintf("<\n");} /** Open the varp sockets and start the thread handling them. * * @param mcaddr multicast address * @param port port * @return 0 on success, error code otherwise */int varp_open(u32 mcaddr, u16 port){ int err = 0; //MOD_INC_USE_COUNT; dprintf(">\n"); err = varp_sockets_open(mcaddr, port); if(err) goto exit; atomic_set(&varp_run, 1); atomic_set(&varp_state, VARP_STATE_NONE); kernel_thread(varp_main, NULL, (CLONE_FS | CLONE_FILES | CLONE_SIGHAND));#if 0 while(atomic_read(&varp_state) == VARP_STATE_NONE){ set_current_state(TASK_INTERRUPTIBLE); schedule_timeout(1 * HZ); __set_current_state(TASK_RUNNING); } err = varp_thread_err;#endif exit: if(err){ wprintf("> err=%d\n", err); } return err;}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?