⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cluster.c

📁 asterisk1.4.6版本下 7#信令驱动 源码
💻 C
📖 第 1 页 / 共 3 页
字号:
  }
  return 0;
}

static void disconnect_receiver(struct receiver* receiver, int targetix)
{
  struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix];

  if (receiver_stat->target[targetix].connected || receiver_stat->target[targetix].inprogress) {
    ast_log(LOG_DEBUG, "Disconnect receiver %s %d\n", receiver->targets[targetix].host->name, targetix);
    if (receiver_stat->target[targetix].fd != -1) {
      close(receiver_stat->target[targetix].fd);
      receiver_stat->target[targetix].fd = -1;
    }
    receiver_stat->target[targetix].connected = 0;
    receiver_stat->target[targetix].inprogress = 0;
    receiver_stat->target[targetix].fails += 1;
  }
}

static void connect_receiver(int receiverix, int targetix)
{
  struct sockaddr_in sock;
  struct in_addr addr = this_host->receivers[receiverix].targets[targetix].inf->addr;
  char* host_name = this_host->receivers[receiverix].targets[targetix].host->name;
  int s;
  int flags;
  int res;

  receiver_stats[receiverix].target[targetix].fd = -1;
  receiver_stats[receiverix].target[targetix].connected = 0;
  receiver_stats[receiverix].target[targetix].inprogress = 0;
  gettimeofday(&receiver_stats[receiverix].target[targetix].lasttry, NULL);
  s = socket(PF_INET, SOCK_STREAM, 0);
  if (s < 0) {
    ast_log(LOG_ERROR, "Cannot create receiver socket, errno=%d: %s\n", errno, strerror(errno));
    return;
  }
  memset(&sock, 0, sizeof(struct sockaddr_in));
  sock.sin_family = AF_INET;
  sock.sin_port = htons(clusterlistenport);
  memcpy(&sock.sin_addr, &addr, sizeof(addr));

  res = fcntl(s, F_GETFL);
  if(res < 0) {
    ast_log(LOG_WARNING, "SS7: Could not obtain flags for socket fd: %s.\n", strerror(errno));
    return;
  }
  flags = res | O_NONBLOCK;
  res = fcntl(s, F_SETFL, flags);
  if(res < 0) {
    ast_log(LOG_WARNING, "SS7: Could not set socket fd non-blocking: %s.\n", strerror(errno));
    return;
  }
  ast_log(LOG_DEBUG, "Trying to connect to %s %s\n", host_name, inaddr2s(sock.sin_addr));

  if (connect(s, &sock, sizeof(sock)) < 0) {
    if (errno != EINPROGRESS) {
      ast_log(LOG_ERROR, "Cannot connect receiver socket %s, %s\n", inaddr2s(sock.sin_addr), strerror(errno));
      close(s);
      return;
    }
    // set_socket_opt(s, SOL_TCP, TCP_NODELAY, 1);
  }
  receiver_stats[receiverix].target[targetix].fd = s;
  receiver_stats[receiverix].target[targetix].inprogress = 1;
}

static void connect_receivers(void)
{
  int receiverix, targetix;

  for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {
    for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) {
      connect_receiver(receiverix, targetix);
    }
  }
}

static int check_receiver_connections(void)
{
  int receiverix, targetix;
  int any = 0;

  for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {
    for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) {
      int tdiff = timediff_msec(now, receiver_stats[receiverix].target[targetix].lasttry);
      if (!receiver_stats[receiverix].target[targetix].connected && !receiver_stats[receiverix].target[targetix].inprogress) {
	if (tdiff > CLUSTER_CONNECT_RETRY_INTERVAL) {
	  any++;
	  connect_receiver(receiverix, targetix);
	}
      }
      else if (receiver_stats[receiverix].target[targetix].inprogress) {
	if (tdiff > CLUSTER_CONNECT_TIMEOUT) {
	  close(receiver_stats[receiverix].target[targetix].fd);
	  receiver_stats[receiverix].target[targetix].inprogress = 0;
	  any++;
	  ast_log(LOG_NOTICE, "Timed out on receiver connection to %s, receiverix %d targetix %d, tdiff %d\n", inaddr2s(this_host->receivers[receiverix].targets[targetix].inf->addr), receiverix, targetix, tdiff);
	}
      }
    }
  }
  return any;
}

static void cluster_send_packet(struct receiver* receiver, int targetix, unsigned char* buf, int len)
{
  int res;
  struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix];

  //  ast_log(LOG_DEBUG, "send packet %s, targetix %d, connected %d\n", receiver->targets[targetix].host->name, targetix, receiver_stat->target[targetix].connected);
  if (receiver_stats[receiver->receiverix].target[targetix].connected) {
    gettimeofday(&receiver_stat->target[targetix].lasttry, NULL);
    res = write(receiver_stat->target[targetix].fd, buf, len);
    if (res < 0) {
      close(receiver_stat->target[targetix].fd);
      receiver_stat->target[targetix].connected = 0;
      receiver_stat->target[targetix].fails += 1;
      rebuild_fds = 1;
      ast_log(LOG_ERROR, "Write socket to host '%s' target %d, errno=%d: %s\n", receiver->targets[targetix].host->name, targetix, errno, strerror(errno));
    }
  }
}

static void cluster_send_packets(struct receiver* receiver, unsigned char* buf, int len)
{
  int targetix, firstsendix = -1;
  struct mtp_event* event = (struct mtp_event*) buf;
  struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix];

  event->seq_no = sequence_number++;
  for (targetix = 0; targetix < receiver->n_targets; targetix++) {
    ast_log(LOG_DEBUG, "send packets %s, targetix %d, connected %d\n", receiver->targets[targetix].host->name, targetix, receiver_stat->target[targetix].connected);
    if (receiver_stat->target[targetix].connected) {
      if (firstsendix == -1)
	firstsendix = targetix;
      if ((event->typ != MTP_REQ_ISUP_FORWARD) ||
	  ((event->typ == MTP_REQ_ISUP_FORWARD) && /* Only one other host should forward ISUP packet */
	   (receiver->targets[targetix].host == receiver->targets[firstsendix].host)))
	if (event->typ == MTP_REQ_ISUP_FORWARD)
	  receiver_stat->target[targetix].forwards += 1;
	cluster_send_packet(receiver, targetix, buf, len);
    }
  }
}

static void cluster_send_keep_alive(void)
{
  struct mtp_event event;
  int receiverix, targetix;

  event.typ = MTP_EVENT_ALIVE;
  event.len = 0;
  event.seq_no = sequence_number++;
  for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {
    for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) {
      int tdiff = timediff_msec(now, receiver_stats[receiverix].target[targetix].lasttry);
      if (tdiff > CLUSTER_KEEP_ALIVE_INTERVAL)
	cluster_send_packet(&this_host->receivers[receiverix], targetix, (char*) &event, sizeof(event));
    }
  }
}

static int find_next_timeout(void)
{
  int receiverix, targetix;
  int maxwait = CLUSTER_KEEP_ALIVE_INTERVAL;

  for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {
    for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) {
      if (receiver_stats[receiverix].target[targetix].connected) {
	int tdiff = timediff_msec(now, receiver_stats[receiverix].target[targetix].lasttry);
	int wait = CLUSTER_KEEP_ALIVE_INTERVAL - tdiff;
	if (wait < maxwait)
	  maxwait = wait;
      }
    }
  }
  if (maxwait < 0)
    maxwait = 0;
  return maxwait;
}

static int cluster_receive_packet(int senderix, int fd)
{
  int res;
  int hostix = senders[senderix].hostix;
  char buf[MTP_EVENT_MAX_SIZE];
  struct mtp_event* event = (struct mtp_event*) &buf;
  struct mtp_req* req = (struct mtp_req*) &buf;
  int sz = sizeof(event->typ);

  res = read(fd, buf, sz);
  if (res <= 0) {
    ast_log(LOG_NOTICE, "Could not read received packet: %s.\n", strerror(errno));
    return -1;
  }
  else if (res == 0) {
    ast_log(LOG_NOTICE, "Received 0 bytes, closing socket: %s.\n", strerror(errno));
    shutdown(fd, SHUT_RDWR);
    return -1;
  }
  if (event->typ < MTP_EVENT_ALIVE) {
    res = read(fd, &buf[sz], sizeof(*req)-sz);
    if (res > 0)
      res = read(fd, req->buf, req->len);
    //    ast_log(LOG_DEBUG, "Received mtp req %d, buff len %d, res %d\n", req->typ, req->len, res);
  }
  else {
    res = read(fd, &buf[sz], sizeof(*event)-sz);
    if (res > 0)
      res = read(fd, event->buf, event->len);
  }
  if (host_last_seq_no[hostix] >= event->seq_no) {
    return 0;
  }
  host_last_seq_no[hostix] = event->seq_no;
  if (res > 0) {
    ast_log(LOG_DEBUG, "Received event, senderix=%d, hostix=%d, lastseq=%ld, seqno=%ld, typ=%d\n", senderix, hostix, host_last_seq_no[hostix], event->seq_no, event->typ);
    if ((event->typ == MTP_EVENT_ISUP) || (event->typ == MTP_REQ_ISUP_FORWARD)) {
      if (isup_event_handler)
	(*isup_event_handler)(event);
    }
  }
  if (res < 0)
    ast_log(LOG_NOTICE, "Could not read received packet: %s.\n", strerror(errno));
  return res;
}

static void *cluster_thread_main(void *data)
{
  int i, j;
  int res;
  fds[0].fd = receivepipe[0];
  fds[0].events = POLLIN;
  fds_type[0] = FD_PIPE;
  fds[1].fd = receiver_socket;
  if (receiver_socket > 0)
    fds[1].events = POLLIN;
  else
    fds[1].events = 0;
  fds_type[1] = FD_LISTEN;

  ast_verbose(VERBOSE_PREFIX_3 "Starting cluster thread, pid=%d.\n", getpid());

  while (cluster_running) {
    int timeout;
    int maxtimeout;

    gettimeofday(&now, NULL);
    timeout = ast_sched_wait(cluster_sched);
    maxtimeout = find_next_timeout();
    if(timeout <= 0 || timeout > CLUSTER_WAKEUP_INTERVAL) {
      timeout = CLUSTER_WAKEUP_INTERVAL;
    }
    if (timeout > maxtimeout)
      timeout = maxtimeout;
    if (rebuild_fds) {
      n_fds = 2;
      for (i = 0; i < n_accepted; i++) {
	fds[n_fds].fd = accepted[i].fd;
	fds[n_fds].events = POLLIN|POLLERR|POLLHUP;
	fds_type[n_fds++] = FD_ACCEPTED;
      }
      for (i = 0; i < this_host->n_receivers; i++) {
	for (j = 0; j < this_host->receivers[i].n_targets; j++) {
	  fds_receivers[n_fds] = this_host->receivers[i];
	  fds_targetix[n_fds] = j;
	  if (receiver_stats[i].target[j].connected) {
	    fds[n_fds].fd = receiver_stats[i].target[j].fd;
	    fds[n_fds].events = POLLERR|POLLHUP;
	    fds_type[n_fds++] = FD_RECEIVER;
	  }
	  else if (receiver_stats[i].target[j].inprogress) {
	    fds[n_fds].fd = receiver_stats[i].target[j].fd;
	    fds[n_fds].events = POLLOUT|POLLERR|POLLHUP;
	    fds_type[n_fds++] = FD_INPROGRESS;
	  }
	}
      }
      rebuild_fds = 0;
    }
    res = poll(fds, n_fds, timeout);
    gettimeofday(&now, NULL);

    if(res < 0) {
      if(errno == EINTR) {
        /* Just try again. */
      } else {
        ast_log(LOG_ERROR, "poll() failure, errno=%d: %s\n", errno, strerror(errno));
      }
    } else if(res > 0) {
      for (i = 0; i < n_fds; i++) {
	if(!(fds[i].revents & (POLLERR|POLLNVAL|POLLHUP|POLLIN|POLLOUT)))
	  continue;
	switch (fds_type[i]) {
	case FD_PIPE: {
	  if(fds[i].revents & POLLIN) {
	    int linkix;
	    unsigned char fifobuf[1024];
	    struct mtp_req* req = (struct mtp_req*) &fifobuf;

	    res = read(fds[i].fd, &linkix, sizeof(linkix));
	    if (res < 0) {
	      ast_log(LOG_NOTICE, "Could not read cluster event pipe: %s.\n", strerror(errno));
	      continue;
	    }
	    if ((res = lffifo_get(receivebuf, fifobuf, sizeof(fifobuf))) != 0) {
	      if(res < 0) {
		ast_log(LOG_ERROR, "Got oversize packet in cluster receive buffer.\n");
		continue;
	      }
	    }
	    ast_log(LOG_DEBUG, "fifo get res %d, typ %d, linkix %d, link %s\n", res, req->typ, linkix, links[linkix].name);
	    if (res > 0) {
	      if ((req->typ == MTP_REQ_ISUP) || (req->typ == MTP_REQ_ISUP_FORWARD) || (req->typ == MTP_EVENT_ISUP)) {
		if (links[linkix].receiver) {
		  cluster_send_packets(links[linkix].receiver, fifobuf, res);
		}
		else {
		  ast_log(LOG_WARNING, "No way to send packet to cluster, link='%s', reqtype=%d\n", links[linkix].name, req->typ);
		}
	      }
	    }
	    
	  }
	  break;
	}
	case FD_LISTEN: {
	  if(fds[i].revents & POLLIN) {
	    struct sockaddr_in from_addr;
	    unsigned int len = sizeof(struct sockaddr_in);
	    int afd = accept(receiver_socket, (struct sockaddr *)&from_addr, &len);
	    if (afd != -1) {
	      struct host* host = lookup_host_by_addr(from_addr.sin_addr);
	      if (host) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -