varp_socket.c

来自「xen虚拟机源代码安装包」· C语言 代码 · 共 768 行 · 第 1/2 页

C
768
字号
    int sock;    struct sockaddr_in addr_in;    struct sockaddr *addr = (struct sockaddr *)&addr_in;    int addr_n = sizeof(addr_in);    int sockproto = 0;    //dprintf(">\n");    addr_in.sin_family      = AF_INET;    addr_in.sin_addr.s_addr = saddr;    addr_in.sin_port        = port;    dprintf("> flags=%s addr=%u.%u.%u.%u port=%d\n",            socket_flags(flags),            NIPQUAD(saddr), ntohs(port));    switch(socktype){    case SOCK_DGRAM:  sockproto = IPPROTO_UDP; break;    case SOCK_STREAM: sockproto = IPPROTO_TCP; break;    }    sock = socket(AF_INET, socktype, sockproto);    if(sock < 0) goto exit;    if(flags & VSOCK_REUSE){        err = setsock_reuse(sock, 1);        if(err < 0) goto exit;    }    if(flags & VSOCK_BROADCAST){        err = setsock_broadcast(sock, 1);        if(err < 0) goto exit;    }    if(flags & VSOCK_MULTICAST){        err = setsock_multicast(sock, saddr);        if(err < 0) goto exit;    }    if(flags & VSOCK_CONNECT){        err = connect(sock, addr, addr_n);        if(err < 0) goto exit;    }    if(flags & VSOCK_BIND){        err = bind(sock, addr, addr_n);        if(err < 0) goto exit;    }    if(flags & VSOCK_NONBLOCK){        err = fcntl(sock, F_SETFL, O_NONBLOCK);        if(err < 0) goto exit;    }  exit:    *val = (err ? -1 : sock);    if(err) eprintf("> err=%d errno=%d\n", err, errno);    return err;}/** Open the varp multicast socket. * * @param mcaddr multicast address  * @param port port * @param val return parameter for the socket * @return 0 on success, error code otherwise */int varp_mcast_open(uint32_t mcaddr, uint16_t port, int *val){    int err = 0;    int flags = VSOCK_REUSE;    int sock = 0;        dprintf(">\n");    flags |= VSOCK_MULTICAST;    flags |= VSOCK_BROADCAST;        err = create_socket(SOCK_DGRAM, mcaddr, port, flags, &sock);    if(err < 0) goto exit;    if(MULTICAST(mcaddr)){        err = setsock_multicast_ttl(sock, 1);        if(err < 0) goto exit;    }  exit:    if(err){        shutdown(sock, 2);    }    *val = (err ? -1 : sock);    dprintf("< err=%d val=%d\n", err, *val);    return err;}/** Open the varp unicast socket. * * @param addr address  * @param port port * @param val return parameter for the socket * @return 0 on success, error code otherwise */int varp_ucast_open(uint32_t addr, u16 port, int *val){    int err = 0;    int flags = (VSOCK_BIND | VSOCK_REUSE);    dprintf(">\n");    err = create_socket(SOCK_DGRAM, addr, port, flags, val);    dprintf("< err=%d val=%d\n", err, *val);    return err;}/** * Return code > 0 means the handler owns the packet. * Return code <= 0 means we still own it, with < 0 meaning * an error. */static int handle_varp_skb(struct sk_buff *skb){    int err = 0;    switch(skb->pkt_type){    case PACKET_BROADCAST:    case PACKET_MULTICAST:        vnet_forward_send(skb);        /* Fall through. */    case PACKET_HOST:        err = varp_handle_message(skb);        break;    case PACKET_OTHERHOST:        dprintf("> PACKET_OTHERHOST\n");        break;    case PACKET_OUTGOING:        dprintf("> PACKET_OUTGOING\n");        break;    case PACKET_FASTROUTE:        dprintf("> PACKET_FASTROUTE\n");        break;    case PACKET_LOOPBACK:        // Outbound mcast/bcast are echoed with this type. Drop.        dprintf("> LOOP src=" IPFMT " dst=" IPFMT " dev=%s\n",                NIPQUAD(skb->nh.iph->saddr),                NIPQUAD(skb->nh.iph->daddr),                (skb->dev ? skb->dev->name : "??"));      default:        // Drop.        break;    }    if(err <= 0){        kfree_skb(skb);    }    return (err < 0 ? err : 0);}/** Handle some skbs on a varp socket (if any). * * @param fd socket file descriptor * @param n maximum number of skbs to handle * @return number of skbs handled */static int handle_varp_sock(int fd, int n){    int ret = 0;    int err = 0;    struct sk_buff *skb;    struct socket *sock = NULL;    sock = sockfd_lookup(fd, &err);    if (!sock){        wprintf("> no sock for fd=%d\n", fd);        goto exit;    }    for( ; ret < n; ret++){        if(!sock->sk) break;        skb = skb_dequeue(&sock->sk->sk_receive_queue);        if(!skb) break;        // Call the skb destructor so it isn't charged to the socket anymore.        // An skb from a socket receive queue is charged to the socket        // by skb_set_owner_r() until its destructor is called.        // If the destructor is not called the socket will run out of        // receive queue space and be unable to accept incoming skbs.        // The destructor used is sock_rfree(), see 'include/net/sock.h'.        // Other destructors: sock_wfree, sk_stream_rfree.        skb_orphan(skb);        handle_varp_skb(skb);    }    sockfd_put(sock);  exit:    dprintf("< ret=%d\n", ret);    return ret;}/** Add a wait queue to a socket. * * @param fd socket file descriptor * @param waitq queue * @return 0 on success, error code otherwise */int sock_add_wait_queue(int fd, wait_queue_t *waitq){    int err = -EINVAL;    struct socket *sock = NULL;    if(fd < 0) goto exit;    sock = sockfd_lookup(fd, &err);    if (!sock) goto exit;    add_wait_queue(sock->sk->sk_sleep, waitq);    sockfd_put(sock);    err = 0;  exit:    return err;}/** Remove a wait queue from a socket. * * @param fd socket file descriptor * @param waitq queue * @return 0 on success, error code otherwise */int sock_remove_wait_queue(int fd, wait_queue_t *waitq){    int err = -EINVAL;    struct socket *sock = NULL;    if(fd < 0) goto exit;    sock = sockfd_lookup(fd, &err);    if (!sock) goto exit;    remove_wait_queue(sock->sk->sk_sleep, waitq);    sockfd_put(sock);    err = 0;  exit:    return err;}#if 0// Default data ready function on a socket.static void sock_def_readable(struct sock *sk, int len){	read_lock(&sk->sk_callback_lock);	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))		wake_up_interruptible(sk->sk_sleep);	sk_wake_async(sk,1,POLL_IN);	read_unlock(&sk->sk_callback_lock);}#endifstatic void sock_data_ready(struct sock *sk, int len){    struct sk_buff *skb;    //read_lock(&sk->sk_callback_lock);    skb = skb_dequeue(&sk->sk_receive_queue);    if(skb){        skb_orphan(skb);    }    //read_unlock(&sk->sk_callback_lock);    if(skb){        handle_varp_skb(skb);    }}/** Set the data ready callback on a socket. */int sock_set_callback(int fd){    int err = -EINVAL;    struct socket *sock = NULL;    if(fd < 0) goto exit;    sock = sockfd_lookup(fd, &err);    if (!sock) goto exit;    sock->sk->sk_data_ready = sock_data_ready;    sockfd_put(sock);    err = 0;  exit:    return err;}/** Open the sockets. */int varp_sockets_open(u32 mcaddr, u16 port){    int err = 0;    mm_segment_t oldfs;    dprintf("> mcaddr=%u.%u.%u.%u port=%u\n", NIPQUAD(mcaddr), ntohs(port));    oldfs = change_fs(KERNEL_DS);    err = varp_mcast_open(mcaddr, port, &varp_mcast_sock);    if(err < 0 ) goto exit;    err = varp_ucast_open(INADDR_ANY, port, &varp_ucast_sock);    if(err < 0 ) goto exit;    sock_set_callback(varp_ucast_sock);    sock_set_callback(varp_mcast_sock);  exit:    set_fs(oldfs);    dprintf("< err=%d\n", err);    return err;}	/** Close the sockets. */void varp_sockets_close(void){    mm_segment_t oldfs;    oldfs = change_fs(KERNEL_DS);    if(varp_mcast_sock >= 0){        shutdown(varp_mcast_sock, 2);        varp_mcast_sock = -1;    }    if(varp_ucast_sock >= 0){        shutdown(varp_ucast_sock, 2);        varp_ucast_sock = -1;    }    set_fs(oldfs);}/** Loop handling the varp sockets. * We use kernel API for this (waitqueue, schedule_timeout) instead * of select because the select syscall was returning EFAULT. Oh well. * * @param arg arguments * @return exit code */int varp_main(void *arg){    int err = 0;    long timeout = 1 * HZ;    int count = 0;    DECLARE_WAITQUEUE(mcast_wait, current);    DECLARE_WAITQUEUE(ucast_wait, current);    dprintf("> start\n");    snprintf(current->comm, sizeof(current->comm), "varp_main");    err = sock_add_wait_queue(varp_mcast_sock, &mcast_wait);    if(err) goto exit_mcast_sock;    err = sock_add_wait_queue(varp_ucast_sock, &ucast_wait);    if(err) goto exit_ucast_sock;    atomic_set(&varp_state, VARP_STATE_RUNNING);    for( ; atomic_read(&varp_run); ){        count = 0;        count += handle_varp_sock(varp_mcast_sock, 1);        count += handle_varp_sock(varp_ucast_sock, 16);        if(!count){            if(!atomic_read(&varp_run)) break;            // No skbs were handled, go to sleep.            set_current_state(TASK_INTERRUPTIBLE);            schedule_timeout(timeout);            __set_current_state(TASK_RUNNING);        }    }  exit_ucast_sock:    sock_remove_wait_queue(varp_ucast_sock, &ucast_wait);  exit_mcast_sock:    sock_remove_wait_queue(varp_mcast_sock, &mcast_wait);    varp_sockets_close();    if(err){        eprintf("%s< err=%d\n", __FUNCTION__, err);    }    varp_thread_err = err;    atomic_set(&varp_state, VARP_STATE_EXITED);    //MOD_DEC_USE_COUNT;    return err;}/** Close the varp sockets and stop the thread handling them. */void varp_close(void){    int tries = 10;    dprintf(">\n");    // Tell the varp thread to stop and wait a while for it.    atomic_set(&varp_run, 0);    while(atomic_read(&varp_state) == VARP_STATE_RUNNING && tries-- > 0){        set_current_state(TASK_INTERRUPTIBLE);        schedule_timeout(HZ / 2);        __set_current_state(TASK_RUNNING);    }    //MOD_DEC_USE_COUNT;    dprintf("<\n");}    /** Open the varp sockets and start the thread handling them. * * @param mcaddr multicast address * @param port port * @return 0 on success, error code otherwise */int varp_open(u32 mcaddr, u16 port){    int err = 0;        //MOD_INC_USE_COUNT;    dprintf(">\n");    err = varp_sockets_open(mcaddr, port);    if(err) goto exit;    atomic_set(&varp_run, 1);    atomic_set(&varp_state, VARP_STATE_NONE);    kernel_thread(varp_main, NULL, (CLONE_FS | CLONE_FILES | CLONE_SIGHAND));#if 0    while(atomic_read(&varp_state) == VARP_STATE_NONE){        set_current_state(TASK_INTERRUPTIBLE);        schedule_timeout(1 * HZ);        __set_current_state(TASK_RUNNING);    }    err = varp_thread_err;#endif  exit:    if(err){        wprintf("> err=%d\n", err);    }    return err;}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?