⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cluster.c

📁 asterisk1.4.6版本下 7#信令驱动 源码
💻 C
📖 第 1 页 / 共 3 页
字号:
/* cluster.c - chan_ss7 clustering/redundancy
 *
 * Copyright (C) 2006, Sifira A/S.
 *
 * Author: Anders Baekgaard <ab@sifira.dk>
 *
 * This file is part of chan_ss7.
 *
 * chan_ss7 is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * chan_ss7 is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with chan_ss7; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 */


#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <time.h>
#include <sys/param.h>
#include <arpa/inet.h>

#include <sys/socket.h>
#include <unistd.h>
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <pthread.h>
#include <signal.h>
#include <sys/poll.h>
#include <sys/ioctl.h>
#include <sys/time.h>

#include "asterisk/options.h"
#include "asterisk/logger.h"
#include "asterisk/config.h"
#include "asterisk/sched.h"
#include "asterisk/utils.h"
#include "asterisk/cli.h"
#include "asterisk/lock.h"
#include "asterisk/channel.h"


#include "astversion.h"
#include "config.h"
#include "lffifo.h"
#include "utils.h"
#include "mtp.h"
#include "cluster.h"

/* Delay between cluster thread wakeups. */
#define CLUSTER_WAKEUP_INTERVAL 500
#define CLUSTER_KEEP_ALIVE_INTERVAL 500
#define CLUSTER_ALIVE_TIMEOUT 1000
#define CLUSTER_CONNECT_RETRY_INTERVAL 2000
#define CLUSTER_CONNECT_TIMEOUT 10000

static int receivepipe[2] = {-1, -1};
static struct lffifo *receivebuf;

static struct sched_context *cluster_sched = NULL;
static pthread_t cluster_thread = AST_PTHREADT_NULL;
static int cluster_running = 0;

static struct receiver_stat {
  struct {
    int connected;
    int inprogress;
    int fails; /* statistics */
    unsigned long forwards; /* statistics */
    int fd;
    struct timeval lasttry;
    int reported;
  } target[2*MAX_HOSTS];
} receiver_stats[MAX_LINKS_PER_HOST];

int n_accepted = 0;
static struct {
  int fd;
  struct in_addr addr;
  int senderix;
} accepted[2*MAX_HOSTS+2];

static int receiver_socket = -1;

static enum{FD_PIPE, FD_LISTEN, FD_ACCEPTED, FD_RECEIVER, FD_INPROGRESS } fds_type[2*MAX_HOSTS+2];
int n_fds = 0;
static struct pollfd fds[2*MAX_HOSTS+2];
static struct receiver fds_receivers[2*MAX_HOSTS+2];
static int fds_targetix[2*MAX_HOSTS+2];
static int rebuild_fds = 1;

int n_senders = 0;
static struct {
  struct host* host;
  struct in_addr addr;
  int hostix;
  struct timeval last;
  alivestate state;
  int up;
  int down;
} senders[2*MAX_HOSTS];

static struct timeval host_last_event_stamp[MAX_HOSTS] = {{0, 0}, };
static unsigned long host_last_seq_no[MAX_HOSTS] = {0, };

static struct timeval now;

static unsigned long sequence_number = 0;


static void disconnect_receiver(struct receiver* receiver, int targetix);

void (*isup_event_handler)(struct mtp_event*) = NULL;
void (*isup_block_handler)(struct link*) = NULL;

static void set_socket_opt(int s, int l, int o, int v)
{
  int err;
  int len = sizeof(v);

  if ((err = setsockopt(s, l, o, &v, len)) < 0) {
    ast_log(LOG_WARNING, "Cannot set socket option, %s\n", strerror(errno));
  }
}

static void declare_host_state(struct host* host, alivestate state)
{
  if (host->state != state) {
    host->state = state;
    if (state == STATE_DEAD) {
      int i;
      int receiverix, targetix;

      for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {
	struct receiver* receiver = &this_host->receivers[receiverix];
	for (targetix = 0; targetix < receiver->n_targets; targetix++) {
	  if (receiver->targets[targetix].host == host)
	    disconnect_receiver(receiver, targetix);
	}
      }
      if (isup_block_handler) {
	for (i = 0; i < host->n_spans; i++) {
	  struct link* link = host->spans[i].link;
	  if (link->enabled)
	    (*isup_block_handler)(link);
	}
      }
      ast_log(LOG_WARNING, "No alive signal from %s, assumed down.\n", host->name);
    }
    else if (state == STATE_ALIVE) {
      ast_log(LOG_WARNING, "Alive signal from %s, now up.\n", host->name);
    }
  }
}

static int find_sender(struct host* host, struct in_addr addr)
{
  int i;
  for (i = 0; i < n_senders; i++)
    if ((senders[i].host == host) && (memcmp(&senders[i].addr, &addr, sizeof(addr)) == 0))
      return i;
  return -1;
}

static void add_sender(struct host* host, struct in_addr addr, int hostix)
{
  if (find_sender(host, addr) != -1) {
    ast_log(LOG_NOTICE, "Cluster has multiple identical entries: host %s %s\n", host->name, inaddr2s(addr));
    return;
  }
  senders[n_senders].host = host;
  senders[n_senders].hostix = hostix;
  senders[n_senders].addr = addr;
  senders[n_senders].last.tv_sec = 0;
  senders[n_senders].last.tv_usec = 0;
  senders[n_senders].state = STATE_UNKNOWN;
  senders[n_senders].up = 0;
  senders[n_senders].down = 0;
  ast_log(LOG_DEBUG, "Added host %s %s, hostix %d, id %d\n", host->name, inaddr2s(addr), senders[n_senders].hostix, n_senders);
  n_senders++;
}

static void set_sender_last(int senderix, struct timeval last)
{
  struct host* host = senders[senderix].host;
  senders[senderix].last = last;
  if (senders[senderix].state != STATE_ALIVE) {
    senders[senderix].up += 1;
    ast_log(LOG_WARNING, "Alive signal from %s %s\n", senders[senderix].host->name, inaddr2s(senders[senderix].addr));
  }
  senders[senderix].state = STATE_ALIVE;
  host_last_event_stamp[senders[senderix].hostix] = last;
  declare_host_state(host, STATE_ALIVE);
}

static void check_senders(void)
{
  int i;
  int hostix = 0;
  struct host* host;

  for (i = 0; i < n_senders; i++) {
    int tdiff = timediff_msec(now, senders[i].last);
    if ((tdiff > CLUSTER_ALIVE_TIMEOUT) && (senders[i].state == STATE_ALIVE)) {
      ast_log(LOG_WARNING, "No alive signal from %s %s, for %d msec\n", senders[i].host->name, inaddr2s(senders[i].addr), tdiff);
      senders[i].state = STATE_DEAD;
      senders[i].down += 1;
    }
  }
  while ((host = lookup_host_by_id(hostix)) != NULL) {
    if (host != this_host) {
      int alive = 0;
      int dead = 0;
      for (i = 0; i < n_senders; i++) {
	if (senders[i].host == host) {
	  alive = alive || (senders[i].state == STATE_ALIVE);
	  dead = dead || (senders[i].state == STATE_DEAD);
	}
      }
      if (dead && !alive) {
	int tdiff = timediff_msec(now, host_last_event_stamp[hostix]);
	if (tdiff > CLUSTER_ALIVE_TIMEOUT) {
	  declare_host_state(host, STATE_DEAD);
	}
      }
    }
    hostix++;
  }
}

static void cluster_put(int linkix, unsigned char* buf, int len)
{
  int res = 0;

  if (!cluster_running)
    return;
  res = lffifo_put(receivebuf, (unsigned char *)buf, len);
  if(res) {
    ast_log(LOG_ERROR, "Cluster receive buffer full, packet lost.\n");
    /* Todo FIFO full ... */
  } else {
    res = write(receivepipe[1], &linkix, sizeof(linkix));
    if (res < 0) {
      ast_log(LOG_NOTICE, "Could not write cluster event pipe: %s.\n", strerror(errno));
    }
  }
}

void cluster_mtp_received(struct link* link, struct mtp_event* event)
{
  if (!cluster_running || !this_host->n_receivers)
    return;
  ast_log(LOG_DEBUG, "cluster mtp received on link '%s', typ=%d\n", link ? link->name : "?", event->typ);
  cluster_put(link ? link->linkix : -1, (unsigned char *)event, sizeof(*event) + event->len);
}

void cluster_mtp_sent(struct link* link, struct mtp_req* req)
{
  if (!cluster_running || !this_host->n_receivers)
    return;
  ast_log(LOG_DEBUG, "cluster mtp sent on link '%s', typ=%d\n", link ? link->name : "?", req->typ);
  cluster_put(link ? link->linkix : -1, (unsigned char *)req, sizeof(*req) + req->len);
}

void cluster_mtp_forward(struct mtp_req* req)
{
  int typ = req->typ;
  struct link* link = req->isup.link;
  if (!cluster_running)
    return;
  ast_log(LOG_DEBUG, "cluster mtp forward, link %s, typ=%d, len=%d\n", link ? link->name : "?", req->typ, req->len);
  req->typ = MTP_REQ_ISUP_FORWARD;
  cluster_put(link ? link->linkix : -1, (unsigned char *)req, sizeof(*req) + req->len);
  req->typ = typ;
}

int cluster_receivers_alive(struct linkset* linkset)
{
  int i, j;

  if (this_host->has_signalling_receivers) {
    for (i = 0; i < this_host->n_receivers; i++) {
      for (j = 0; j < this_host->receivers[i].n_targets; j++) {
	struct host* host = this_host->receivers[i].targets[j].host;
	int l;
	if (host->state != STATE_ALIVE)
	  continue;
	for (l = 0; l < host->n_spans; l++) {
	  struct link* link = host->spans[l].link;
	  if (link->schannel >= 0)
	    return 1;
	}
      }
    }
  }
  return 0;
}

static int setup_receiver_socket(void)
{
  struct sockaddr_in sock;
  in_addr_t addr = INADDR_ANY;

  memset(&sock, 0, sizeof(struct sockaddr_in));
  sock.sin_family = AF_INET;
  sock.sin_port = htons(clusterlistenport);
  memcpy(&sock.sin_addr, &addr, sizeof(addr));

  receiver_socket = socket(PF_INET, SOCK_STREAM, 0);
  if (receiver_socket < 0) {
    ast_log(LOG_ERROR, "Cannot create receiver socket, errno=%d: %s\n", errno, strerror(errno));
    return -1;
  }
  set_socket_opt(receiver_socket, SOL_SOCKET, SO_REUSEADDR, 1);
  if (bind(receiver_socket, &sock, sizeof(sock)) < 0) {
    ast_log(LOG_ERROR, "Cannot bind receiver socket, errno=%d: %s\n", errno, strerror(errno));
    close(receiver_socket);
    receiver_socket = -1;
    return -1;
  }
  if (listen(receiver_socket, MAX_HOSTS) < 0) {
    ast_log(LOG_ERROR, "Cannot listen on receiver socket, errno=%d: %s\n", errno, strerror(errno));
    close(receiver_socket);
    receiver_socket = -1;
    return -1;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -