⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cluster.c

📁 asterisk 的7号信令处理模块
💻 C
📖 第 1 页 / 共 3 页
字号:
/* cluster.c - chan_ss7 clustering/redundancy * * Copyright (C) 2006, Sifira A/S. * * Author: Anders Baekgaard <ab@sifira.dk> * * This file is part of chan_ss7. * * chan_ss7 is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * chan_ss7 is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with chan_ss7; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */#include <errno.h>#include <stdlib.h>#include <string.h>#include <stdio.h>#include <time.h>#include <sys/param.h>#include <arpa/inet.h>#include <sys/socket.h>#include <unistd.h>#include <fcntl.h>#include <netdb.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <pthread.h>#include <signal.h>#include <sys/poll.h>#include <sys/ioctl.h>#include <sys/time.h>#include "asterisk/options.h"#include "asterisk/logger.h"#include "asterisk/config.h"#include "asterisk/sched.h"#include "asterisk/utils.h"#include "asterisk/cli.h"#include "asterisk/lock.h"#include "asterisk/channel.h"#include "astversion.h"#include "config.h"#include "lffifo.h"#include "utils.h"#include "mtp.h"#include "cluster.h"/* Delay between cluster thread wakeups. */#define CLUSTER_WAKEUP_INTERVAL 500#define CLUSTER_KEEP_ALIVE_INTERVAL 500#define CLUSTER_ALIVE_TIMEOUT 1000#define CLUSTER_CONNECT_RETRY_INTERVAL 2000#define CLUSTER_CONNECT_TIMEOUT 10000static int receivepipe[2];static struct lffifo *receivebuf;static struct sched_context *cluster_sched = NULL;static pthread_t cluster_thread = AST_PTHREADT_NULL;static int cluster_running = 0;static struct receiver_stat {  struct {    int connected;    int inprogress;    int fails; /* statistics */    unsigned long forwards; /* statistics */    int fd;    struct timeval lasttry;    int reported;  } target[2*MAX_HOSTS];} receiver_stats[MAX_LINKS_PER_HOST];int n_accepted = 0;static struct {  int fd;  struct in_addr addr;  int senderix;} accepted[2*MAX_HOSTS+2];static int receiver_socket = -1;static enum{FD_PIPE, FD_LISTEN, FD_ACCEPTED, FD_RECEIVER, FD_INPROGRESS } fds_type[2*MAX_HOSTS+2];int n_fds = 0;static struct pollfd fds[2*MAX_HOSTS+2];static struct receiver fds_receivers[2*MAX_HOSTS+2];static int fds_targetix[2*MAX_HOSTS+2];static int rebuild_fds = 1;int n_senders = 0;static struct {  struct host* host;  struct in_addr addr;  int hostix;  struct timeval last;  alivestate state;  int up;  int down;} senders[2*MAX_HOSTS];static struct timeval host_last_event_stamp[MAX_HOSTS] = {{0, 0}, };static unsigned long host_last_seq_no[MAX_HOSTS] = {0, };static struct timeval now;static unsigned long sequence_number = 0;static void disconnect_receiver(struct receiver* receiver, int targetix);void (*isup_event_handler)(struct mtp_event*) = NULL;void (*isup_block_handler)(struct link*) = NULL;static void set_socket_opt(int s, int l, int o, int v){  int err;  int len = sizeof(v);  if ((err = setsockopt(s, l, o, &v, len)) < 0) {    ast_log(LOG_WARNING, "Cannot set socket option, %s\n", strerror(errno));  }}static void declare_host_state(struct host* host, alivestate state){  if (host->state != state) {    host->state = state;    if (state == STATE_DEAD) {      int i;      int receiverix, targetix;      for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {	struct receiver* receiver = &this_host->receivers[receiverix];	for (targetix = 0; targetix < receiver->n_targets; targetix++) {	  if (receiver->targets[targetix].host == host)	    disconnect_receiver(receiver, targetix);	}      }      if (isup_block_handler) {	for (i = 0; i < host->n_spans; i++) {	  struct link* link = host->spans[i].link;	  if (link->enabled)	    (*isup_block_handler)(link);	}      }      ast_log(LOG_WARNING, "No alive signal from %s, assumed down.\n", host->name);    }    else if (state == STATE_ALIVE) {      ast_log(LOG_WARNING, "Alive signal from %s, now up.\n", host->name);    }  }}static int find_sender(struct host* host, struct in_addr addr){  int i;  for (i = 0; i < n_senders; i++)    if ((senders[i].host == host) && (memcmp(&senders[i].addr, &addr, sizeof(addr)) == 0))      return i;  return -1;}static void add_sender(struct host* host, struct in_addr addr, int hostix){  if (find_sender(host, addr) != -1) {    ast_log(LOG_NOTICE, "Cluster has multiple identical entries: host %s %s\n", host->name, inaddr2s(addr));    return;  }  senders[n_senders].host = host;  senders[n_senders].hostix = hostix;  senders[n_senders].addr = addr;  senders[n_senders].last.tv_sec = 0;  senders[n_senders].last.tv_usec = 0;  senders[n_senders].state = STATE_UNKNOWN;  senders[n_senders].up = 0;  senders[n_senders].down = 0;  ast_log(LOG_DEBUG, "Added host %s %s, hostix %d, id %d\n", host->name, inaddr2s(addr), senders[n_senders].hostix, n_senders);  n_senders++;}static void set_sender_last(int senderix, struct timeval last){  struct host* host = senders[senderix].host;  senders[senderix].last = last;  if (senders[senderix].state != STATE_ALIVE) {    senders[senderix].up += 1;    ast_log(LOG_WARNING, "Alive signal from %s %s\n", senders[senderix].host->name, inaddr2s(senders[senderix].addr));  }  senders[senderix].state = STATE_ALIVE;  host_last_event_stamp[senders[senderix].hostix] = last;  declare_host_state(host, STATE_ALIVE);}static void check_senders(void){  int i;  int hostix = 0;  struct host* host;  for (i = 0; i < n_senders; i++) {    int tdiff = timediff_msec(now, senders[i].last);    if ((tdiff > CLUSTER_ALIVE_TIMEOUT) && (senders[i].state == STATE_ALIVE)) {      ast_log(LOG_WARNING, "No alive signal from %s %s, for %d msec\n", senders[i].host->name, inaddr2s(senders[i].addr), tdiff);      senders[i].state = STATE_DEAD;      senders[i].down += 1;    }  }  while ((host = lookup_host_by_id(hostix)) != NULL) {    if (host != this_host) {      int alive = 0;      int dead = 0;      for (i = 0; i < n_senders; i++) {	if (senders[i].host == host) {	  alive = alive || (senders[i].state == STATE_ALIVE);	  dead = dead || (senders[i].state == STATE_DEAD);	}      }      if (dead && !alive) {	int tdiff = timediff_msec(now, host_last_event_stamp[hostix]);	if (tdiff > CLUSTER_ALIVE_TIMEOUT) {	  declare_host_state(host, STATE_DEAD);	}      }    }    hostix++;  }}static void cluster_put(int linkix, unsigned char* buf, int len){  int res = 0;  if (!cluster_running)    return;  res = lffifo_put(receivebuf, (unsigned char *)buf, len);  if(res) {    ast_log(LOG_ERROR, "Cluster receive buffer full, packet lost.\n");    /* Todo FIFO full ... */  } else {    res = write(receivepipe[1], &linkix, sizeof(linkix));    if (res < 0) {      ast_log(LOG_NOTICE, "Could not write cluster event pipe: %s.\n", strerror(errno));    }  }}void cluster_mtp_received(struct link* link, struct mtp_event* event){  if (!cluster_running || !this_host->n_receivers)    return;  ast_log(LOG_DEBUG, "cluster mtp received on link '%s', typ=%d\n", link ? link->name : "?", event->typ);  cluster_put(link ? link->linkix : -1, (unsigned char *)event, sizeof(*event) + event->len);}void cluster_mtp_sent(struct link* link, struct mtp_req* req){  if (!cluster_running || !this_host->n_receivers)    return;  ast_log(LOG_DEBUG, "cluster mtp sent on link '%s', typ=%d\n", link ? link->name : "?", req->typ);  cluster_put(link ? link->linkix : -1, (unsigned char *)req, sizeof(*req) + req->len);}void cluster_mtp_forward(struct mtp_req* req){  int typ = req->typ;  struct link* link = req->isup.link;  if (!cluster_running)    return;  ast_log(LOG_DEBUG, "cluster mtp forward, link %s, typ=%d, len=%d\n", link ? link->name : "?", req->typ, req->len);  req->typ = MTP_REQ_ISUP_FORWARD;  cluster_put(link ? link->linkix : -1, (unsigned char *)req, sizeof(*req) + req->len);  req->typ = typ;}int cluster_receivers_alive(struct linkset* linkset){  int i, j;  if (this_host->has_signalling_receivers) {    for (i = 0; i < this_host->n_receivers; i++) {      for (j = 0; j < this_host->receivers[i].n_targets; j++) {	struct host* host = this_host->receivers[i].targets[j].host;	int l;	if (host->state != STATE_ALIVE)	  continue;	for (l = 0; l < host->n_spans; l++) {	  struct link* link = host->spans[l].link;	  if (link->schannel >= 0)	    return 1;	}      }    }  }  return 0;}static int setup_receiver_socket(void){  struct sockaddr_in sock;  in_addr_t addr = INADDR_ANY;  memset(&sock, 0, sizeof(struct sockaddr_in));  sock.sin_family = AF_INET;  sock.sin_port = htons(clusterlistenport);  memcpy(&sock.sin_addr, &addr, sizeof(addr));  receiver_socket = socket(PF_INET, SOCK_STREAM, 0);  if (receiver_socket < 0) {    ast_log(LOG_ERROR, "Cannot create receiver socket, errno=%d: %s\n", errno, strerror(errno));    return -1;  }  set_socket_opt(receiver_socket, SOL_SOCKET, SO_REUSEADDR, 1);  if (bind(receiver_socket, &sock, sizeof(sock)) < 0) {    ast_log(LOG_ERROR, "Cannot bind receiver socket, errno=%d: %s\n", errno, strerror(errno));    close(receiver_socket);    receiver_socket = -1;    return -1;  }  if (listen(receiver_socket, MAX_HOSTS) < 0) {    ast_log(LOG_ERROR, "Cannot listen on receiver socket, errno=%d: %s\n", errno, strerror(errno));    close(receiver_socket);    receiver_socket = -1;    return -1;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -