📄 cluster.c
字号:
/* cluster.c - chan_ss7 clustering/redundancy * * Copyright (C) 2006, Sifira A/S. * * Author: Anders Baekgaard <ab@sifira.dk> * * This file is part of chan_ss7. * * chan_ss7 is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * chan_ss7 is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with chan_ss7; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */#include <errno.h>#include <stdlib.h>#include <string.h>#include <stdio.h>#include <time.h>#include <sys/param.h>#include <arpa/inet.h>#include <sys/socket.h>#include <unistd.h>#include <fcntl.h>#include <netdb.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <pthread.h>#include <signal.h>#include <sys/poll.h>#include <sys/ioctl.h>#include <sys/time.h>#include "asterisk/options.h"#include "asterisk/logger.h"#include "asterisk/config.h"#include "asterisk/sched.h"#include "asterisk/utils.h"#include "asterisk/cli.h"#include "asterisk/lock.h"#include "asterisk/channel.h"#include "astversion.h"#include "config.h"#include "lffifo.h"#include "utils.h"#include "mtp.h"#include "cluster.h"/* Delay between cluster thread wakeups. */#define CLUSTER_WAKEUP_INTERVAL 500#define CLUSTER_KEEP_ALIVE_INTERVAL 500#define CLUSTER_ALIVE_TIMEOUT 1000#define CLUSTER_CONNECT_RETRY_INTERVAL 2000#define CLUSTER_CONNECT_TIMEOUT 10000static int receivepipe[2];static struct lffifo *receivebuf;static struct sched_context *cluster_sched = NULL;static pthread_t cluster_thread = AST_PTHREADT_NULL;static int cluster_running = 0;static struct receiver_stat { struct { int connected; int inprogress; int fails; /* statistics */ unsigned long forwards; /* statistics */ int fd; struct timeval lasttry; int reported; } target[2*MAX_HOSTS];} receiver_stats[MAX_LINKS_PER_HOST];int n_accepted = 0;static struct { int fd; struct in_addr addr; int senderix;} accepted[2*MAX_HOSTS+2];static int receiver_socket = -1;static enum{FD_PIPE, FD_LISTEN, FD_ACCEPTED, FD_RECEIVER, FD_INPROGRESS } fds_type[2*MAX_HOSTS+2];int n_fds = 0;static struct pollfd fds[2*MAX_HOSTS+2];static struct receiver fds_receivers[2*MAX_HOSTS+2];static int fds_targetix[2*MAX_HOSTS+2];static int rebuild_fds = 1;int n_senders = 0;static struct { struct host* host; struct in_addr addr; int hostix; struct timeval last; alivestate state; int up; int down;} senders[2*MAX_HOSTS];static struct timeval host_last_event_stamp[MAX_HOSTS] = {{0, 0}, };static unsigned long host_last_seq_no[MAX_HOSTS] = {0, };static struct timeval now;static unsigned long sequence_number = 0;static void disconnect_receiver(struct receiver* receiver, int targetix);void (*isup_event_handler)(struct mtp_event*) = NULL;void (*isup_block_handler)(struct link*) = NULL;static void set_socket_opt(int s, int l, int o, int v){ int err; int len = sizeof(v); if ((err = setsockopt(s, l, o, &v, len)) < 0) { ast_log(LOG_WARNING, "Cannot set socket option, %s\n", strerror(errno)); }}static void declare_host_state(struct host* host, alivestate state){ if (host->state != state) { host->state = state; if (state == STATE_DEAD) { int i; int receiverix, targetix; for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) { struct receiver* receiver = &this_host->receivers[receiverix]; for (targetix = 0; targetix < receiver->n_targets; targetix++) { if (receiver->targets[targetix].host == host) disconnect_receiver(receiver, targetix); } } if (isup_block_handler) { for (i = 0; i < host->n_spans; i++) { struct link* link = host->spans[i].link; if (link->enabled) (*isup_block_handler)(link); } } ast_log(LOG_WARNING, "No alive signal from %s, assumed down.\n", host->name); } else if (state == STATE_ALIVE) { ast_log(LOG_WARNING, "Alive signal from %s, now up.\n", host->name); } }}static int find_sender(struct host* host, struct in_addr addr){ int i; for (i = 0; i < n_senders; i++) if ((senders[i].host == host) && (memcmp(&senders[i].addr, &addr, sizeof(addr)) == 0)) return i; return -1;}static void add_sender(struct host* host, struct in_addr addr, int hostix){ if (find_sender(host, addr) != -1) { ast_log(LOG_NOTICE, "Cluster has multiple identical entries: host %s %s\n", host->name, inaddr2s(addr)); return; } senders[n_senders].host = host; senders[n_senders].hostix = hostix; senders[n_senders].addr = addr; senders[n_senders].last.tv_sec = 0; senders[n_senders].last.tv_usec = 0; senders[n_senders].state = STATE_UNKNOWN; senders[n_senders].up = 0; senders[n_senders].down = 0; ast_log(LOG_DEBUG, "Added host %s %s, hostix %d, id %d\n", host->name, inaddr2s(addr), senders[n_senders].hostix, n_senders); n_senders++;}static void set_sender_last(int senderix, struct timeval last){ struct host* host = senders[senderix].host; senders[senderix].last = last; if (senders[senderix].state != STATE_ALIVE) { senders[senderix].up += 1; ast_log(LOG_WARNING, "Alive signal from %s %s\n", senders[senderix].host->name, inaddr2s(senders[senderix].addr)); } senders[senderix].state = STATE_ALIVE; host_last_event_stamp[senders[senderix].hostix] = last; declare_host_state(host, STATE_ALIVE);}static void check_senders(void){ int i; int hostix = 0; struct host* host; for (i = 0; i < n_senders; i++) { int tdiff = timediff_msec(now, senders[i].last); if ((tdiff > CLUSTER_ALIVE_TIMEOUT) && (senders[i].state == STATE_ALIVE)) { ast_log(LOG_WARNING, "No alive signal from %s %s, for %d msec\n", senders[i].host->name, inaddr2s(senders[i].addr), tdiff); senders[i].state = STATE_DEAD; senders[i].down += 1; } } while ((host = lookup_host_by_id(hostix)) != NULL) { if (host != this_host) { int alive = 0; int dead = 0; for (i = 0; i < n_senders; i++) { if (senders[i].host == host) { alive = alive || (senders[i].state == STATE_ALIVE); dead = dead || (senders[i].state == STATE_DEAD); } } if (dead && !alive) { int tdiff = timediff_msec(now, host_last_event_stamp[hostix]); if (tdiff > CLUSTER_ALIVE_TIMEOUT) { declare_host_state(host, STATE_DEAD); } } } hostix++; }}static void cluster_put(int linkix, unsigned char* buf, int len){ int res = 0; if (!cluster_running) return; res = lffifo_put(receivebuf, (unsigned char *)buf, len); if(res) { ast_log(LOG_ERROR, "Cluster receive buffer full, packet lost.\n"); /* Todo FIFO full ... */ } else { res = write(receivepipe[1], &linkix, sizeof(linkix)); if (res < 0) { ast_log(LOG_NOTICE, "Could not write cluster event pipe: %s.\n", strerror(errno)); } }}void cluster_mtp_received(struct link* link, struct mtp_event* event){ if (!cluster_running || !this_host->n_receivers) return; ast_log(LOG_DEBUG, "cluster mtp received on link '%s', typ=%d\n", link ? link->name : "?", event->typ); cluster_put(link ? link->linkix : -1, (unsigned char *)event, sizeof(*event) + event->len);}void cluster_mtp_sent(struct link* link, struct mtp_req* req){ if (!cluster_running || !this_host->n_receivers) return; ast_log(LOG_DEBUG, "cluster mtp sent on link '%s', typ=%d\n", link ? link->name : "?", req->typ); cluster_put(link ? link->linkix : -1, (unsigned char *)req, sizeof(*req) + req->len);}void cluster_mtp_forward(struct mtp_req* req){ int typ = req->typ; struct link* link = req->isup.link; if (!cluster_running) return; ast_log(LOG_DEBUG, "cluster mtp forward, link %s, typ=%d, len=%d\n", link ? link->name : "?", req->typ, req->len); req->typ = MTP_REQ_ISUP_FORWARD; cluster_put(link ? link->linkix : -1, (unsigned char *)req, sizeof(*req) + req->len); req->typ = typ;}int cluster_receivers_alive(struct linkset* linkset){ int i, j; if (this_host->has_signalling_receivers) { for (i = 0; i < this_host->n_receivers; i++) { for (j = 0; j < this_host->receivers[i].n_targets; j++) { struct host* host = this_host->receivers[i].targets[j].host; int l; if (host->state != STATE_ALIVE) continue; for (l = 0; l < host->n_spans; l++) { struct link* link = host->spans[l].link; if (link->schannel >= 0) return 1; } } } } return 0;}static int setup_receiver_socket(void){ struct sockaddr_in sock; in_addr_t addr = INADDR_ANY; memset(&sock, 0, sizeof(struct sockaddr_in)); sock.sin_family = AF_INET; sock.sin_port = htons(clusterlistenport); memcpy(&sock.sin_addr, &addr, sizeof(addr)); receiver_socket = socket(PF_INET, SOCK_STREAM, 0); if (receiver_socket < 0) { ast_log(LOG_ERROR, "Cannot create receiver socket, errno=%d: %s\n", errno, strerror(errno)); return -1; } set_socket_opt(receiver_socket, SOL_SOCKET, SO_REUSEADDR, 1); if (bind(receiver_socket, &sock, sizeof(sock)) < 0) { ast_log(LOG_ERROR, "Cannot bind receiver socket, errno=%d: %s\n", errno, strerror(errno)); close(receiver_socket); receiver_socket = -1; return -1; } if (listen(receiver_socket, MAX_HOSTS) < 0) { ast_log(LOG_ERROR, "Cannot listen on receiver socket, errno=%d: %s\n", errno, strerror(errno)); close(receiver_socket); receiver_socket = -1; return -1;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -