📄 cluster.c
字号:
/* cluster.c - chan_ss7 clustering/redundancy
*
* Copyright (C) 2006, Sifira A/S.
*
* Author: Anders Baekgaard <ab@sifira.dk>
*
* This file is part of chan_ss7.
*
* chan_ss7 is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* chan_ss7 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with chan_ss7; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <time.h>
#include <sys/param.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h>
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <pthread.h>
#include <signal.h>
#include <sys/poll.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include "asterisk/options.h"
#include "asterisk/logger.h"
#include "asterisk/config.h"
#include "asterisk/sched.h"
#include "asterisk/utils.h"
#include "asterisk/cli.h"
#include "asterisk/lock.h"
#include "asterisk/channel.h"
#include "astversion.h"
#include "config.h"
#include "lffifo.h"
#include "utils.h"
#include "mtp.h"
#include "cluster.h"
/* Delay between cluster thread wakeups. */
#define CLUSTER_WAKEUP_INTERVAL 500
#define CLUSTER_KEEP_ALIVE_INTERVAL 500
#define CLUSTER_ALIVE_TIMEOUT 1000
#define CLUSTER_CONNECT_RETRY_INTERVAL 2000
#define CLUSTER_CONNECT_TIMEOUT 10000
static int receivepipe[2] = {-1, -1};
static struct lffifo *receivebuf;
static struct sched_context *cluster_sched = NULL;
static pthread_t cluster_thread = AST_PTHREADT_NULL;
static int cluster_running = 0;
static struct receiver_stat {
struct {
int connected;
int inprogress;
int fails; /* statistics */
unsigned long forwards; /* statistics */
int fd;
struct timeval lasttry;
int reported;
} target[2*MAX_HOSTS];
} receiver_stats[MAX_LINKS_PER_HOST];
int n_accepted = 0;
static struct {
int fd;
struct in_addr addr;
int senderix;
} accepted[2*MAX_HOSTS+2];
static int receiver_socket = -1;
static enum{FD_PIPE, FD_LISTEN, FD_ACCEPTED, FD_RECEIVER, FD_INPROGRESS } fds_type[2*MAX_HOSTS+2];
int n_fds = 0;
static struct pollfd fds[2*MAX_HOSTS+2];
static struct receiver fds_receivers[2*MAX_HOSTS+2];
static int fds_targetix[2*MAX_HOSTS+2];
static int rebuild_fds = 1;
int n_senders = 0;
static struct {
struct host* host;
struct in_addr addr;
int hostix;
struct timeval last;
alivestate state;
int up;
int down;
} senders[2*MAX_HOSTS];
static struct timeval host_last_event_stamp[MAX_HOSTS] = {{0, 0}, };
static unsigned long host_last_seq_no[MAX_HOSTS] = {0, };
static struct timeval now;
static unsigned long sequence_number = 0;
static void disconnect_receiver(struct receiver* receiver, int targetix);
void (*isup_event_handler)(struct mtp_event*) = NULL;
void (*isup_block_handler)(struct link*) = NULL;
static void set_socket_opt(int s, int l, int o, int v)
{
int err;
int len = sizeof(v);
if ((err = setsockopt(s, l, o, &v, len)) < 0) {
ast_log(LOG_WARNING, "Cannot set socket option, %s\n", strerror(errno));
}
}
static void declare_host_state(struct host* host, alivestate state)
{
if (host->state != state) {
host->state = state;
if (state == STATE_DEAD) {
int i;
int receiverix, targetix;
for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {
struct receiver* receiver = &this_host->receivers[receiverix];
for (targetix = 0; targetix < receiver->n_targets; targetix++) {
if (receiver->targets[targetix].host == host)
disconnect_receiver(receiver, targetix);
}
}
if (isup_block_handler) {
for (i = 0; i < host->n_spans; i++) {
struct link* link = host->spans[i].link;
if (link->enabled)
(*isup_block_handler)(link);
}
}
ast_log(LOG_WARNING, "No alive signal from %s, assumed down.\n", host->name);
}
else if (state == STATE_ALIVE) {
ast_log(LOG_WARNING, "Alive signal from %s, now up.\n", host->name);
}
}
}
static int find_sender(struct host* host, struct in_addr addr)
{
int i;
for (i = 0; i < n_senders; i++)
if ((senders[i].host == host) && (memcmp(&senders[i].addr, &addr, sizeof(addr)) == 0))
return i;
return -1;
}
static void add_sender(struct host* host, struct in_addr addr, int hostix)
{
if (find_sender(host, addr) != -1) {
ast_log(LOG_NOTICE, "Cluster has multiple identical entries: host %s %s\n", host->name, inaddr2s(addr));
return;
}
senders[n_senders].host = host;
senders[n_senders].hostix = hostix;
senders[n_senders].addr = addr;
senders[n_senders].last.tv_sec = 0;
senders[n_senders].last.tv_usec = 0;
senders[n_senders].state = STATE_UNKNOWN;
senders[n_senders].up = 0;
senders[n_senders].down = 0;
ast_log(LOG_DEBUG, "Added host %s %s, hostix %d, id %d\n", host->name, inaddr2s(addr), senders[n_senders].hostix, n_senders);
n_senders++;
}
static void set_sender_last(int senderix, struct timeval last)
{
struct host* host = senders[senderix].host;
senders[senderix].last = last;
if (senders[senderix].state != STATE_ALIVE) {
senders[senderix].up += 1;
ast_log(LOG_WARNING, "Alive signal from %s %s\n", senders[senderix].host->name, inaddr2s(senders[senderix].addr));
}
senders[senderix].state = STATE_ALIVE;
host_last_event_stamp[senders[senderix].hostix] = last;
declare_host_state(host, STATE_ALIVE);
}
static void check_senders(void)
{
int i;
int hostix = 0;
struct host* host;
for (i = 0; i < n_senders; i++) {
int tdiff = timediff_msec(now, senders[i].last);
if ((tdiff > CLUSTER_ALIVE_TIMEOUT) && (senders[i].state == STATE_ALIVE)) {
ast_log(LOG_WARNING, "No alive signal from %s %s, for %d msec\n", senders[i].host->name, inaddr2s(senders[i].addr), tdiff);
senders[i].state = STATE_DEAD;
senders[i].down += 1;
}
}
while ((host = lookup_host_by_id(hostix)) != NULL) {
if (host != this_host) {
int alive = 0;
int dead = 0;
for (i = 0; i < n_senders; i++) {
if (senders[i].host == host) {
alive = alive || (senders[i].state == STATE_ALIVE);
dead = dead || (senders[i].state == STATE_DEAD);
}
}
if (dead && !alive) {
int tdiff = timediff_msec(now, host_last_event_stamp[hostix]);
if (tdiff > CLUSTER_ALIVE_TIMEOUT) {
declare_host_state(host, STATE_DEAD);
}
}
}
hostix++;
}
}
static void cluster_put(int linkix, unsigned char* buf, int len)
{
int res = 0;
if (!cluster_running)
return;
res = lffifo_put(receivebuf, (unsigned char *)buf, len);
if(res) {
ast_log(LOG_ERROR, "Cluster receive buffer full, packet lost.\n");
/* Todo FIFO full ... */
} else {
res = write(receivepipe[1], &linkix, sizeof(linkix));
if (res < 0) {
ast_log(LOG_NOTICE, "Could not write cluster event pipe: %s.\n", strerror(errno));
}
}
}
void cluster_mtp_received(struct link* link, struct mtp_event* event)
{
if (!cluster_running || !this_host->n_receivers)
return;
ast_log(LOG_DEBUG, "cluster mtp received on link '%s', typ=%d\n", link ? link->name : "?", event->typ);
cluster_put(link ? link->linkix : -1, (unsigned char *)event, sizeof(*event) + event->len);
}
void cluster_mtp_sent(struct link* link, struct mtp_req* req)
{
if (!cluster_running || !this_host->n_receivers)
return;
ast_log(LOG_DEBUG, "cluster mtp sent on link '%s', typ=%d\n", link ? link->name : "?", req->typ);
cluster_put(link ? link->linkix : -1, (unsigned char *)req, sizeof(*req) + req->len);
}
void cluster_mtp_forward(struct mtp_req* req)
{
int typ = req->typ;
struct link* link = req->isup.link;
if (!cluster_running)
return;
ast_log(LOG_DEBUG, "cluster mtp forward, link %s, typ=%d, len=%d\n", link ? link->name : "?", req->typ, req->len);
req->typ = MTP_REQ_ISUP_FORWARD;
cluster_put(link ? link->linkix : -1, (unsigned char *)req, sizeof(*req) + req->len);
req->typ = typ;
}
int cluster_receivers_alive(struct linkset* linkset)
{
int i, j;
if (this_host->has_signalling_receivers) {
for (i = 0; i < this_host->n_receivers; i++) {
for (j = 0; j < this_host->receivers[i].n_targets; j++) {
struct host* host = this_host->receivers[i].targets[j].host;
int l;
if (host->state != STATE_ALIVE)
continue;
for (l = 0; l < host->n_spans; l++) {
struct link* link = host->spans[l].link;
if (link->schannel >= 0)
return 1;
}
}
}
}
return 0;
}
static int setup_receiver_socket(void)
{
struct sockaddr_in sock;
in_addr_t addr = INADDR_ANY;
memset(&sock, 0, sizeof(struct sockaddr_in));
sock.sin_family = AF_INET;
sock.sin_port = htons(clusterlistenport);
memcpy(&sock.sin_addr, &addr, sizeof(addr));
receiver_socket = socket(PF_INET, SOCK_STREAM, 0);
if (receiver_socket < 0) {
ast_log(LOG_ERROR, "Cannot create receiver socket, errno=%d: %s\n", errno, strerror(errno));
return -1;
}
set_socket_opt(receiver_socket, SOL_SOCKET, SO_REUSEADDR, 1);
if (bind(receiver_socket, &sock, sizeof(sock)) < 0) {
ast_log(LOG_ERROR, "Cannot bind receiver socket, errno=%d: %s\n", errno, strerror(errno));
close(receiver_socket);
receiver_socket = -1;
return -1;
}
if (listen(receiver_socket, MAX_HOSTS) < 0) {
ast_log(LOG_ERROR, "Cannot listen on receiver socket, errno=%d: %s\n", errno, strerror(errno));
close(receiver_socket);
receiver_socket = -1;
return -1;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -