📄 ip_vs_sync.c
字号:
mreq.imr_ifindex = dev->ifindex; lock_sock(sk); ret = ip_mc_join_group(sk, &mreq); release_sock(sk); return ret;}static int bind_mcastif_addr(struct socket *sock, char *ifname){ struct net_device *dev; u32 addr; struct sockaddr_in sin; if ((dev = __dev_get_by_name(ifname)) == NULL) return -ENODEV; addr = inet_select_addr(dev, 0, RT_SCOPE_UNIVERSE); if (!addr) IP_VS_ERR("You probably need to specify IP address on " "multicast interface.\n"); IP_VS_DBG(7, "binding socket with (%s) %u.%u.%u.%u\n", ifname, NIPQUAD(addr)); /* Now bind the socket with the address of multicast interface */ sin.sin_family = AF_INET; sin.sin_addr.s_addr = addr; sin.sin_port = 0; return sock->ops->bind(sock, (struct sockaddr*)&sin, sizeof(sin));}/* * Set up sending multicast socket over UDP */static struct socket * make_send_sock(void){ struct socket *sock; /* First create a socket */ if (sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock) < 0) { IP_VS_ERR("Error during creation of socket; terminating\n"); return NULL; } if (set_mcast_if(sock->sk, ip_vs_master_mcast_ifn) < 0) { IP_VS_ERR("Error setting outbound mcast interface\n"); goto error; } set_mcast_loop(sock->sk, 0); set_mcast_ttl(sock->sk, 1); if (bind_mcastif_addr(sock, ip_vs_master_mcast_ifn) < 0) { IP_VS_ERR("Error binding address of the mcast interface\n"); goto error; } if (sock->ops->connect(sock, (struct sockaddr*)&mcast_addr, sizeof(struct sockaddr), 0) < 0) { IP_VS_ERR("Error connecting to the multicast addr\n"); goto error; } return sock; error: sock_release(sock); return NULL;}/* * Set up receiving multicast socket over UDP */static struct socket * make_receive_sock(void){ struct socket *sock; /* First create a socket */ if (sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock) < 0) { IP_VS_ERR("Error during creation of socket; terminating\n"); return NULL; } /* it is equivalent to the REUSEADDR option in user-space */ sock->sk->sk_reuse = 1; if (sock->ops->bind(sock, (struct sockaddr*)&mcast_addr, sizeof(struct sockaddr)) < 0) { IP_VS_ERR("Error binding to the multicast addr\n"); goto error; } /* join the multicast group */ if (join_mcast_group(sock->sk, (struct in_addr*)&mcast_addr.sin_addr, ip_vs_backup_mcast_ifn) < 0) { IP_VS_ERR("Error joining to the multicast group\n"); goto error; } return sock; error: sock_release(sock); return NULL;}static intip_vs_send_async(struct socket *sock, const char *buffer, const size_t length){ struct msghdr msg = {.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL}; struct kvec iov; int len; EnterFunction(7); iov.iov_base = (void *)buffer; iov.iov_len = length; len = kernel_sendmsg(sock, &msg, &iov, 1, (size_t)(length)); LeaveFunction(7); return len;}static voidip_vs_send_sync_msg(struct socket *sock, struct ip_vs_sync_mesg *msg){ int msize; msize = msg->size; /* Put size in network byte order */ msg->size = htons(msg->size); if (ip_vs_send_async(sock, (char *)msg, msize) != msize) IP_VS_ERR("ip_vs_send_async error\n");}static intip_vs_receive(struct socket *sock, char *buffer, const size_t buflen){ struct msghdr msg = {NULL,}; struct kvec iov; int len; EnterFunction(7); /* Receive a packet */ iov.iov_base = buffer; iov.iov_len = (size_t)buflen; len = kernel_recvmsg(sock, &msg, &iov, 1, buflen, 0); if (len < 0) return -1; LeaveFunction(7); return len;}static DECLARE_WAIT_QUEUE_HEAD(sync_wait);static pid_t sync_master_pid = 0;static pid_t sync_backup_pid = 0;static DECLARE_WAIT_QUEUE_HEAD(stop_sync_wait);static int stop_master_sync = 0;static int stop_backup_sync = 0;static void sync_master_loop(void){ struct socket *sock; struct ip_vs_sync_buff *sb; /* create the sending multicast socket */ sock = make_send_sock(); if (!sock) return; IP_VS_INFO("sync thread started: state = MASTER, mcast_ifn = %s, " "syncid = %d\n", ip_vs_master_mcast_ifn, ip_vs_master_syncid); for (;;) { while ((sb=sb_dequeue())) { ip_vs_send_sync_msg(sock, sb->mesg); ip_vs_sync_buff_release(sb); } /* check if entries stay in curr_sb for 2 seconds */ if ((sb = get_curr_sync_buff(2*HZ))) { ip_vs_send_sync_msg(sock, sb->mesg); ip_vs_sync_buff_release(sb); } if (stop_master_sync) break; ssleep(1); } /* clean up the sync_buff queue */ while ((sb=sb_dequeue())) { ip_vs_sync_buff_release(sb); } /* clean up the current sync_buff */ if ((sb = get_curr_sync_buff(0))) { ip_vs_sync_buff_release(sb); } /* release the sending multicast socket */ sock_release(sock);}static void sync_backup_loop(void){ struct socket *sock; char *buf; int len; if (!(buf = kmalloc(sync_recv_mesg_maxlen, GFP_ATOMIC))) { IP_VS_ERR("sync_backup_loop: kmalloc error\n"); return; } /* create the receiving multicast socket */ sock = make_receive_sock(); if (!sock) goto out; IP_VS_INFO("sync thread started: state = BACKUP, mcast_ifn = %s, " "syncid = %d\n", ip_vs_backup_mcast_ifn, ip_vs_backup_syncid); for (;;) { /* do you have data now? */ while (!skb_queue_empty(&(sock->sk->sk_receive_queue))) { if ((len = ip_vs_receive(sock, buf, sync_recv_mesg_maxlen)) <= 0) { IP_VS_ERR("receiving message error\n"); break; } /* disable bottom half, because it accessed the data shared by softirq while getting/creating conns */ local_bh_disable(); ip_vs_process_message(buf, len); local_bh_enable(); } if (stop_backup_sync) break; ssleep(1); } /* release the sending multicast socket */ sock_release(sock); out: kfree(buf);}static void set_sync_pid(int sync_state, pid_t sync_pid){ if (sync_state == IP_VS_STATE_MASTER) sync_master_pid = sync_pid; else if (sync_state == IP_VS_STATE_BACKUP) sync_backup_pid = sync_pid;}static void set_stop_sync(int sync_state, int set){ if (sync_state == IP_VS_STATE_MASTER) stop_master_sync = set; else if (sync_state == IP_VS_STATE_BACKUP) stop_backup_sync = set; else { stop_master_sync = set; stop_backup_sync = set; }}static int sync_thread(void *startup){ DECLARE_WAITQUEUE(wait, current); mm_segment_t oldmm; int state; const char *name; /* increase the module use count */ ip_vs_use_count_inc(); if (ip_vs_sync_state & IP_VS_STATE_MASTER && !sync_master_pid) { state = IP_VS_STATE_MASTER; name = "ipvs_syncmaster"; } else if (ip_vs_sync_state & IP_VS_STATE_BACKUP && !sync_backup_pid) { state = IP_VS_STATE_BACKUP; name = "ipvs_syncbackup"; } else { IP_VS_BUG(); ip_vs_use_count_dec(); return -EINVAL; } daemonize(name); oldmm = get_fs(); set_fs(KERNEL_DS); /* Block all signals */ spin_lock_irq(¤t->sighand->siglock); siginitsetinv(¤t->blocked, 0); recalc_sigpending(); spin_unlock_irq(¤t->sighand->siglock); /* set the maximum length of sync message */ set_sync_mesg_maxlen(state); /* set up multicast address */ mcast_addr.sin_family = AF_INET; mcast_addr.sin_port = htons(IP_VS_SYNC_PORT); mcast_addr.sin_addr.s_addr = htonl(IP_VS_SYNC_GROUP); add_wait_queue(&sync_wait, &wait); set_sync_pid(state, current->pid); complete((struct completion *)startup); /* processing master/backup loop here */ if (state == IP_VS_STATE_MASTER) sync_master_loop(); else if (state == IP_VS_STATE_BACKUP) sync_backup_loop(); else IP_VS_BUG(); remove_wait_queue(&sync_wait, &wait); /* thread exits */ set_sync_pid(state, 0); IP_VS_INFO("sync thread stopped!\n"); set_fs(oldmm); /* decrease the module use count */ ip_vs_use_count_dec(); set_stop_sync(state, 0); wake_up(&stop_sync_wait); return 0;}static int fork_sync_thread(void *startup){ pid_t pid; /* fork the sync thread here, then the parent process of the sync thread is the init process after this thread exits. */ repeat: if ((pid = kernel_thread(sync_thread, startup, 0)) < 0) { IP_VS_ERR("could not create sync_thread due to %d... " "retrying.\n", pid); ssleep(1); goto repeat; } return 0;}int start_sync_thread(int state, char *mcast_ifn, __u8 syncid){ DECLARE_COMPLETION(startup); pid_t pid; if ((state == IP_VS_STATE_MASTER && sync_master_pid) || (state == IP_VS_STATE_BACKUP && sync_backup_pid)) return -EEXIST; IP_VS_DBG(7, "%s: pid %d\n", __FUNCTION__, current->pid); IP_VS_DBG(7, "Each ip_vs_sync_conn entry need %Zd bytes\n", sizeof(struct ip_vs_sync_conn)); ip_vs_sync_state |= state; if (state == IP_VS_STATE_MASTER) { strlcpy(ip_vs_master_mcast_ifn, mcast_ifn, sizeof(ip_vs_master_mcast_ifn)); ip_vs_master_syncid = syncid; } else { strlcpy(ip_vs_backup_mcast_ifn, mcast_ifn, sizeof(ip_vs_backup_mcast_ifn)); ip_vs_backup_syncid = syncid; } repeat: if ((pid = kernel_thread(fork_sync_thread, &startup, 0)) < 0) { IP_VS_ERR("could not create fork_sync_thread due to %d... " "retrying.\n", pid); ssleep(1); goto repeat; } wait_for_completion(&startup); return 0;}int stop_sync_thread(int state){ DECLARE_WAITQUEUE(wait, current); if ((state == IP_VS_STATE_MASTER && !sync_master_pid) || (state == IP_VS_STATE_BACKUP && !sync_backup_pid)) return -ESRCH; IP_VS_DBG(7, "%s: pid %d\n", __FUNCTION__, current->pid); IP_VS_INFO("stopping sync thread %d ...\n", (state == IP_VS_STATE_MASTER) ? sync_master_pid : sync_backup_pid); __set_current_state(TASK_UNINTERRUPTIBLE); add_wait_queue(&stop_sync_wait, &wait); set_stop_sync(state, 1); ip_vs_sync_state -= state; wake_up(&sync_wait); schedule(); __set_current_state(TASK_RUNNING); remove_wait_queue(&stop_sync_wait, &wait); /* Note: no need to reap the sync thread, because its parent process is the init process */ if ((state == IP_VS_STATE_MASTER && stop_master_sync) || (state == IP_VS_STATE_BACKUP && stop_backup_sync)) IP_VS_BUG(); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -