⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cluster.c

📁 asterisk1.4.6版本下 7#信令驱动 源码
💻 C
📖 第 1 页 / 共 3 页
字号:
		int senderix = find_sender(host, from_addr.sin_addr);
		if (senderix >= 0) {
		  set_sender_last(senderix, now);
		  accepted[n_accepted].fd = afd;
		  accepted[n_accepted].addr = from_addr.sin_addr;
		  accepted[n_accepted++].senderix = senderix;
		  rebuild_fds = 1;
		  host_last_seq_no[senders[senderix].hostix] = 0;
		}
		else {
		  ast_log(LOG_NOTICE, "Got socket connection from unexpected sender %s %s\n", host->name, inaddr2s(from_addr.sin_addr));
		}
	      }
	      ast_log(LOG_NOTICE, "Accepted socket connection from %s, fd %d\n", host?host->name : "unknown", afd);
	    }
	    else {
	      ast_log(LOG_WARNING, "Accept of receiver connection failed: %s.\n", strerror(errno));
	    }
	    break;
	  }
	}
	case FD_ACCEPTED: {
	  int ix = i - 2;
	  int err = 0;
	  if(fds[i].revents & POLLIN) {
	    err = cluster_receive_packet(accepted[ix].senderix, fds[i].fd);
	    set_sender_last(accepted[ix].senderix, now);
	  }
	  if((err == -1) || (fds[i].revents & (POLLERR|POLLNVAL))) {
	    int error;
	    unsigned int len = sizeof(error);
	    getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &error, &len);
	    ast_log(LOG_NOTICE, "Got error on accepted socket: %d %s\n", i, strerror(error));
	    close(fds[i].fd);
	    for (j = ix; j < n_accepted-1; j++)
	      accepted[j] = accepted[j+1];
	    n_accepted--;
	    rebuild_fds = 1;
	  }
	  break;
	}
	case FD_RECEIVER:
	case FD_INPROGRESS: {
	  struct receiver* receiver = &fds_receivers[i];
	  struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix];
	  int targetix = fds_targetix[i];
	  char* host_name = receiver->targets[targetix].host->name;
	  char* if_name = receiver->targets[targetix].inf->name;

	  rebuild_fds = 1;
	  if(fds[i].revents & (POLLERR|POLLNVAL)) {
	    if (receiver_stat->target[targetix].reported++ % 100 == 0) {
	      int error;
	      unsigned int len = sizeof(error);
	      getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &error, &len);
	      ast_log(LOG_NOTICE, "Socket connection failed to host: %s, inf %s, addr %s, error: %s\n", host_name, if_name, inaddr2s(receiver->targets[targetix].inf->addr), strerror(error));
	    }
	    disconnect_receiver(receiver, targetix);
	  }
	  else if(fds[i].revents & (POLLHUP)) {
	    ast_log(LOG_NOTICE, "Lost connection to receiver host: %s, inf %s, addr %s\n", host_name, if_name, inaddr2s(receiver->targets[targetix].inf->addr));
	    disconnect_receiver(receiver, targetix);
	  }
	  else if(fds[i].revents & (POLLIN|POLLOUT)) {
	    ast_log(LOG_NOTICE, "Connected to receiver host: %s, inf %s, addr %s \n", host_name, if_name, inaddr2s(receiver->targets[targetix].inf->addr));
	    receiver_stat->target[targetix].connected = 1;
	    receiver_stat->target[targetix].inprogress = 0;
	    receiver_stat->target[targetix].reported = 0;
	  }
	}
	break;
	}
      }
    }
    cluster_send_keep_alive();
    if (check_receiver_connections())
      rebuild_fds = 1;
    check_senders();
  }
  return NULL;
}

static void build_sender_list(void)
{
  int hostix = 0;
  struct host* host = NULL;
  
  while ((host = lookup_host_by_id(hostix)) != NULL) {
    if (host != this_host) {
      int linkix, targetix;

      for (linkix = 0; linkix < host->n_receivers; linkix++) {
	for (targetix = 0; targetix < host->receivers[linkix].n_targets; targetix++) {
	  if (host->receivers[linkix].targets[targetix].host == this_host) {
	    int j;
	    for (j = 0; j < host->n_ifs; j++) {
	      add_sender(host, host->ifs[j].addr, hostix);
	    }
	  }
	}
      }
    }
    hostix++;
  }
  if (!n_senders) {
    ast_log(LOG_DEBUG, "Found no senders to supervise\n");
  }
}

static void wait_for_connections(void)
{
  int cnt;
  int linkix, targetix;
  for (cnt = 0; cnt < 800; cnt++) {
    int n = 0, c = 0;
    for (linkix = 0; linkix < this_host->n_receivers; linkix++) {
      for (targetix = 0; targetix < this_host->receivers[linkix].n_targets; targetix++) {
	c += 1;
	if (receiver_stats[linkix].target[targetix].connected)
	  n += 1;
      }
    }
    if (cnt % 100 == 0)
      ast_log(LOG_DEBUG, "wait %d %d %d %d\n", n, c, n_accepted, n_senders);
    if ((n == c) && (n_accepted == n_senders))
      break;
    usleep(10*1000);
  }
}

int cluster_init(void (*isup_event_handler_callback)(struct mtp_event*),
		 void (*isup_block_handler_callback)(struct link*))
{
  int i, j;

  int res;
  int flags;
  struct sched_param sp;

  isup_event_handler = isup_event_handler_callback;
  isup_block_handler = isup_block_handler_callback;
  build_sender_list();
  for (i = 0; i < this_host->n_receivers; i++) {
    for (j = 0; j < this_host->receivers[i].n_targets; j++) {
      receiver_stats[i].target[j].fd = -1;
      receiver_stats[i].target[j].connected = 0;
      receiver_stats[i].target[j].inprogress = 0;
      receiver_stats[i].target[j].reported = 0;
    }
  }

  for (i = 0; i < this_host->n_receivers; i++) {
    for (j = 0; j < this_host->receivers[i].n_targets; j++) {
      struct host* host = this_host->receivers[i].targets[j].host;
      int l;
      for (l = 0; l < host->n_spans; l++) {
	struct link* link = host->spans[l].link;
	if (link->schannel >= 0)
	  this_host->has_signalling_receivers = 1;
      }
    }
  }

  if (this_host->has_signalling_receivers)
    if (setup_receiver_socket())
      goto fail;
  connect_receivers();
  receivepipe[0] = receivepipe[1] = -1;
  receivebuf = lffifo_alloc(200000);

  res = pipe(receivepipe);
  if(res < 0) {
    ast_log(LOG_ERROR, "Unable to allocate cluster event pipe: %s.\n",
            strerror(errno));
    goto fail;
  }
  res = fcntl(receivepipe[0], F_GETFL);
  if(res < 0) {
    ast_log(LOG_ERROR, "Could not obtain flags for read end of "
            "cluster event pipe: %s.\n", strerror(errno));
    goto fail;
  }
  flags = res | O_NONBLOCK;
  res = fcntl(receivepipe[0], F_SETFL, flags);
  if(res < 0) {
    ast_log(LOG_ERROR, "Could not set read end of cluster event pipe "
            "non-blocking: %s.\n", strerror(errno));
    goto fail;
  }
  res = fcntl(receivepipe[1], F_GETFL);
  if(res < 0) {
    ast_log(LOG_ERROR, "Could not obtain flags for write end of "
            "cluster event pipe: %s.\n", strerror(errno));
    goto fail;
  }
  flags = res | O_NONBLOCK;
  res = fcntl(receivepipe[1], F_SETFL, flags);
  if(res < 0) {
    ast_log(LOG_ERROR, "Could not set write end of cluster event pipe "
            "non-blocking: %s.\n", strerror(errno));
    goto fail;
  }
  cluster_sched = sched_context_create();
  if(cluster_sched == NULL) {
    ast_log(LOG_ERROR, "Unable to create cluster scheduling context.\n");
    goto fail;
  }

  cluster_running = 1;          /* Otherwise there is a race, and
                                   cluster may exit immediately */
  if(ast_pthread_create(&cluster_thread, NULL, cluster_thread_main, NULL) < 0) {
    ast_log(LOG_ERROR, "Unable to start cluster thread.\n");
    cluster_running = 0;
    goto fail;
  }
  memset(&sp, 0, sizeof(sp));
  sp.sched_priority = 10;
  res = pthread_setschedparam(cluster_thread, SCHED_RR, &sp);
  if(res != 0) {
    ast_log(LOG_WARNING, "Failed to set cluster thread to realtime priority: %s.\n",
            strerror(res));
  }
  wait_for_connections();
  return 0;
 fail:
  cluster_cleanup();
  return -1;
}

void cluster_cleanup(void)
{
  int i, j;

  if(cluster_running) {
    cluster_running = 0;
    /* Monitor wakes up periodically, so no need to signal it explicitly. */
    pthread_join(cluster_thread, NULL);
  }

  if(cluster_sched) {
    sched_context_destroy(cluster_sched);
    cluster_sched = NULL;
  }
  if(receivebuf) {
    lffifo_free(receivebuf);
    receivebuf = NULL;
  }
  if(receivepipe[0] != -1) {
    close(receivepipe[0]);
    receivepipe[0] = -1;
  }
  if(receivepipe[1] != -1) {
    close(receivepipe[1]);
    receivepipe[1] = -1;
  }
  if (receiver_socket != -1) {
    shutdown(receiver_socket, SHUT_RDWR);
    close(receiver_socket);
    receiver_socket = -1;
  }
  for (i = 0; i < n_accepted; i++) {
    shutdown(accepted[i].fd, SHUT_RDWR);
    close(accepted[i].fd);
  }
  n_accepted = 0;


  if (this_host) {
    for (i = 0; i < this_host->n_receivers; i++) {
      for (j = 0; j < this_host->receivers[i].n_targets; j++) {
	if (receiver_stats[i].target[j].connected || receiver_stats[i].target[j].inprogress) {
	  shutdown(receiver_stats[i].target[j].fd, SHUT_RDWR);
	  close(receiver_stats[i].target[j].fd);
	  receiver_stats[i].target[j].connected = 0;
	  receiver_stats[i].target[j].inprogress = 0;
	}
      }
    }
  }
  n_senders = 0;
}


int cmd_cluster_start(int fd, int argc, char *argv[])
{
  if (!cluster_running)
    return cluster_init(isup_event_handler, isup_block_handler);
  return 0;
}

int cmd_cluster_stop(int fd, int argc, char *argv[])
{
  if (cluster_running)
    cluster_cleanup();
  return 0;
}

int cmd_cluster_status(int fd, int argc, char *argv[])
{
  int i;
  int linkix, targetix;

  gettimeofday(&now, NULL);
  for (i = 0; i < n_senders; i++) {
    int tdiff = timediff_msec(now, senders[i].last);
    char* s = "";
    switch (senders[i].state) {
    case STATE_UNKNOWN:
      s = "unknown"; tdiff = 0; break;
    case STATE_ALIVE:
      s = "alive"; break;
    case STATE_DEAD:
      s = "dead"; break;
    }
    ast_cli(fd, "sender %s, addr %s, state %s, last %d msec, up %d, down %d\n", senders[i].host->name, inaddr2s(senders[i].addr), s, tdiff, senders[i].up, senders[i].down);
  }
  for (linkix = 0; linkix < this_host->n_receivers; linkix++) {
    for (targetix = 0; targetix < this_host->receivers[linkix].n_targets; targetix++) {
      char* if_name = this_host->receivers[linkix].targets[targetix].inf->name;
      char* host_name = this_host->receivers[linkix].targets[targetix].host->name;
      struct in_addr addr = this_host->receivers[linkix].targets[targetix].inf->addr;
      char* c = (receiver_stats[linkix].target[targetix].connected) ? "connected" : "";
      char* p = (receiver_stats[linkix].target[targetix].inprogress) ? "inprogress" : "";
      int tdiff = timediff_msec(now, receiver_stats[linkix].target[targetix].lasttry);

      ast_cli(fd, "receiver %s if %s, addr %s, c:%s, p:%s, last try %d msec, %d fails, %lu forwards\n",
	      host_name, if_name,  inaddr2s(addr), c, p,
	      tdiff, receiver_stats[linkix].target[targetix].fails,
	      receiver_stats[linkix].target[targetix].forwards);
    }
  }
  return 0;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -