📄 cluster.c
字号:
int senderix = find_sender(host, from_addr.sin_addr); if (senderix >= 0) { set_sender_last(senderix, now); accepted[n_accepted].fd = afd; accepted[n_accepted].addr = from_addr.sin_addr; accepted[n_accepted++].senderix = senderix; rebuild_fds = 1; host_last_seq_no[senders[senderix].hostix] = 0; } else { ast_log(LOG_NOTICE, "Got socket connection from unexpected sender %s %s\n", host->name, inaddr2s(from_addr.sin_addr)); } } ast_log(LOG_NOTICE, "Accepted socket connection from %s, fd %d\n", host?host->name : "unknown", afd); } else { ast_log(LOG_WARNING, "Accept of receiver connection failed: %s.\n", strerror(errno)); } break; } } case FD_ACCEPTED: { int ix = i - 2; int err = 0; if(fds[i].revents & POLLIN) { err = cluster_receive_packet(accepted[ix].senderix, fds[i].fd); set_sender_last(accepted[ix].senderix, now); } if((err == -1) || (fds[i].revents & (POLLERR|POLLNVAL))) { int error; unsigned int len = sizeof(error); getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &error, &len); ast_log(LOG_NOTICE, "Got error on accepted socket: %d %s\n", i, strerror(error)); close(fds[i].fd); for (j = ix; j < n_accepted-1; j++) accepted[j] = accepted[j+1]; n_accepted--; rebuild_fds = 1; } break; } case FD_RECEIVER: case FD_INPROGRESS: { struct receiver* receiver = &fds_receivers[i]; struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix]; int targetix = fds_targetix[i]; char* host_name = receiver->targets[targetix].host->name; char* if_name = receiver->targets[targetix].inf->name; rebuild_fds = 1; if(fds[i].revents & (POLLERR|POLLNVAL)) { if (receiver_stat->target[targetix].reported++ % 100 == 0) { int error; unsigned int len = sizeof(error); getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &error, &len); ast_log(LOG_NOTICE, "Socket connection failed to host: %s, inf %s, addr %s, error: %s\n", host_name, if_name, inaddr2s(receiver->targets[targetix].inf->addr), strerror(error)); } disconnect_receiver(receiver, targetix); } else if(fds[i].revents & (POLLHUP)) { ast_log(LOG_NOTICE, "Lost connection to receiver host: %s, inf %s, addr %s\n", host_name, if_name, inaddr2s(receiver->targets[targetix].inf->addr)); disconnect_receiver(receiver, targetix); } else if(fds[i].revents & (POLLIN|POLLOUT)) { ast_log(LOG_NOTICE, "Connected to receiver host: %s, inf %s, addr %s \n", host_name, if_name, inaddr2s(receiver->targets[targetix].inf->addr)); receiver_stat->target[targetix].connected = 1; receiver_stat->target[targetix].inprogress = 0; receiver_stat->target[targetix].reported = 0; } } break; } } } cluster_send_keep_alive(); if (check_receiver_connections()) rebuild_fds = 1; check_senders(); } return NULL;}static void build_sender_list(void){ int hostix = 0; struct host* host = NULL; while ((host = lookup_host_by_id(hostix)) != NULL) { if (host != this_host) { int linkix, targetix; for (linkix = 0; linkix < host->n_receivers; linkix++) { for (targetix = 0; targetix < host->receivers[linkix].n_targets; targetix++) { if (host->receivers[linkix].targets[targetix].host == this_host) { int j; for (j = 0; j < host->n_ifs; j++) { add_sender(host, host->ifs[j].addr, hostix); } } } } } hostix++; } if (!n_senders) { ast_log(LOG_DEBUG, "Found no senders to supervise\n"); }}static void wait_for_connections(void){ int cnt; int linkix, targetix; for (cnt = 0; cnt < 800; cnt++) { int n = 0, c = 0; for (linkix = 0; linkix < this_host->n_receivers; linkix++) { for (targetix = 0; targetix < this_host->receivers[linkix].n_targets; targetix++) { c += 1; if (receiver_stats[linkix].target[targetix].connected) n += 1; } } if (cnt % 100 == 0) ast_log(LOG_DEBUG, "wait %d %d %d %d\n", n, c, n_accepted, n_senders); if ((n == c) && (n_accepted == n_senders)) break; usleep(10*1000); }}int cluster_init(void (*isup_event_handler_callback)(struct mtp_event*), void (*isup_block_handler_callback)(struct link*)){ int i, j; int res; int flags; struct sched_param sp; isup_event_handler = isup_event_handler_callback; isup_block_handler = isup_block_handler_callback; build_sender_list(); for (i = 0; i < this_host->n_receivers; i++) { for (j = 0; j < this_host->receivers[i].n_targets; j++) { receiver_stats[i].target[j].fd = -1; receiver_stats[i].target[j].connected = 0; receiver_stats[i].target[j].inprogress = 0; receiver_stats[i].target[j].reported = 0; } } for (i = 0; i < this_host->n_receivers; i++) { for (j = 0; j < this_host->receivers[i].n_targets; j++) { struct host* host = this_host->receivers[i].targets[j].host; int l; for (l = 0; l < host->n_spans; l++) { struct link* link = host->spans[l].link; if (link->schannel >= 0) this_host->has_signalling_receivers = 1; } } } if (this_host->has_signalling_receivers) if (setup_receiver_socket()) goto fail; connect_receivers(); receivepipe[0] = receivepipe[1] = -1; receivebuf = lffifo_alloc(200000); res = pipe(receivepipe); if(res < 0) { ast_log(LOG_ERROR, "Unable to allocate cluster event pipe: %s.\n", strerror(errno)); goto fail; } res = fcntl(receivepipe[0], F_GETFL); if(res < 0) { ast_log(LOG_ERROR, "Could not obtain flags for read end of " "cluster event pipe: %s.\n", strerror(errno)); goto fail; } flags = res | O_NONBLOCK; res = fcntl(receivepipe[0], F_SETFL, flags); if(res < 0) { ast_log(LOG_ERROR, "Could not set read end of cluster event pipe " "non-blocking: %s.\n", strerror(errno)); goto fail; } res = fcntl(receivepipe[1], F_GETFL); if(res < 0) { ast_log(LOG_ERROR, "Could not obtain flags for write end of " "cluster event pipe: %s.\n", strerror(errno)); goto fail; } flags = res | O_NONBLOCK; res = fcntl(receivepipe[1], F_SETFL, flags); if(res < 0) { ast_log(LOG_ERROR, "Could not set write end of cluster event pipe " "non-blocking: %s.\n", strerror(errno)); goto fail; } cluster_sched = sched_context_create(); if(cluster_sched == NULL) { ast_log(LOG_ERROR, "Unable to create cluster scheduling context.\n"); goto fail; } cluster_running = 1; /* Otherwise there is a race, and cluster may exit immediately */ if(ast_pthread_create(&cluster_thread, NULL, cluster_thread_main, NULL) < 0) { ast_log(LOG_ERROR, "Unable to start cluster thread.\n"); cluster_running = 0; goto fail; } memset(&sp, 0, sizeof(sp)); sp.sched_priority = 10; res = pthread_setschedparam(cluster_thread, SCHED_RR, &sp); if(res != 0) { ast_log(LOG_WARNING, "Failed to set cluster thread to realtime priority: %s.\n", strerror(res)); } wait_for_connections(); return 0; fail: cluster_cleanup(); return -1;}void cluster_cleanup(void){ int i, j; if(cluster_running) { cluster_running = 0; /* Monitor wakes up periodically, so no need to signal it explicitly. */ pthread_join(cluster_thread, NULL); } if(cluster_sched) { sched_context_destroy(cluster_sched); cluster_sched = NULL; } if(receivebuf) { lffifo_free(receivebuf); receivebuf = NULL; } if(receivepipe[0] != -1) { close(receivepipe[0]); receivepipe[0] = -1; } if(receivepipe[1] != -1) { close(receivepipe[1]); receivepipe[1] = -1; } if (receiver_socket != -1) { shutdown(receiver_socket, SHUT_RDWR); close(receiver_socket); receiver_socket = -1; } for (i = 0; i < n_accepted; i++) { shutdown(accepted[i].fd, SHUT_RDWR); close(accepted[i].fd); } n_accepted = 0; if (this_host) { for (i = 0; i < this_host->n_receivers; i++) { for (j = 0; j < this_host->receivers[i].n_targets; j++) { if (receiver_stats[i].target[j].connected || receiver_stats[i].target[j].inprogress) { shutdown(receiver_stats[i].target[j].fd, SHUT_RDWR); close(receiver_stats[i].target[j].fd); receiver_stats[i].target[j].connected = 0; receiver_stats[i].target[j].inprogress = 0; } } } } n_senders = 0;}int cmd_cluster_start(int fd, int argc, char *argv[]){ if (!cluster_running) return cluster_init(isup_event_handler, isup_block_handler); return 0;}int cmd_cluster_stop(int fd, int argc, char *argv[]){ if (cluster_running) cluster_cleanup(); return 0;}int cmd_cluster_status(int fd, int argc, char *argv[]){ int i; int linkix, targetix; gettimeofday(&now, NULL); for (i = 0; i < n_senders; i++) { int tdiff = timediff_msec(now, senders[i].last); char* s = ""; switch (senders[i].state) { case STATE_UNKNOWN: s = "unknown"; tdiff = 0; break; case STATE_ALIVE: s = "alive"; break; case STATE_DEAD: s = "dead"; break; } ast_cli(fd, "sender %s, addr %s, state %s, last %d msec, up %d, down %d\n", senders[i].host->name, inaddr2s(senders[i].addr), s, tdiff, senders[i].up, senders[i].down); } for (linkix = 0; linkix < this_host->n_receivers; linkix++) { for (targetix = 0; targetix < this_host->receivers[linkix].n_targets; targetix++) { char* if_name = this_host->receivers[linkix].targets[targetix].inf->name; char* host_name = this_host->receivers[linkix].targets[targetix].host->name; struct in_addr addr = this_host->receivers[linkix].targets[targetix].inf->addr; char* c = (receiver_stats[linkix].target[targetix].connected) ? "connected" : ""; char* p = (receiver_stats[linkix].target[targetix].inprogress) ? "inprogress" : ""; int tdiff = timediff_msec(now, receiver_stats[linkix].target[targetix].lasttry); ast_cli(fd, "receiver %s if %s, addr %s, c:%s, p:%s, last try %d msec, %d fails, %lu forwards\n", host_name, if_name, inaddr2s(addr), c, p, tdiff, receiver_stats[linkix].target[targetix].fails, receiver_stats[linkix].target[targetix].forwards); } } return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -