⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cluster.c

📁 asterisk 中国七号驱动修改代码可以在ASTERISIK-1。4上编译pa
💻 C
📖 第 1 页 / 共 3 页
字号:
		int senderix = find_sender(host, from_addr.sin_addr);		if (senderix >= 0) {		  set_sender_last(senderix, now);		  accepted[n_accepted].fd = afd;		  accepted[n_accepted].addr = from_addr.sin_addr;		  accepted[n_accepted++].senderix = senderix;		  rebuild_fds = 1;		  host_last_seq_no[senders[senderix].hostix] = 0;		}		else {		  ast_log(LOG_NOTICE, "Got socket connection from unexpected sender %s %s\n", host->name, inaddr2s(from_addr.sin_addr));		}	      }	      ast_log(LOG_NOTICE, "Accepted socket connection from %s, fd %d\n", host?host->name : "unknown", afd);	    }	    else {	      ast_log(LOG_WARNING, "Accept of receiver connection failed: %s.\n", strerror(errno));	    }	    break;	  }	}	case FD_ACCEPTED: {	  int ix = i - 2;	  int err = 0;	  if(fds[i].revents & POLLIN) {	    err = cluster_receive_packet(accepted[ix].senderix, fds[i].fd);	    set_sender_last(accepted[ix].senderix, now);	  }	  if((err == -1) || (fds[i].revents & (POLLERR|POLLNVAL))) {	    int error;	    unsigned int len = sizeof(error);	    getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &error, &len);	    ast_log(LOG_NOTICE, "Got error on accepted socket: %d %s\n", i, strerror(error));	    close(fds[i].fd);	    for (j = ix; j < n_accepted-1; j++)	      accepted[j] = accepted[j+1];	    n_accepted--;	    rebuild_fds = 1;	  }	  break;	}	case FD_RECEIVER:	case FD_INPROGRESS: {	  struct receiver* receiver = &fds_receivers[i];	  struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix];	  int targetix = fds_targetix[i];	  char* host_name = receiver->targets[targetix].host->name;	  char* if_name = receiver->targets[targetix].inf->name;	  rebuild_fds = 1;	  if(fds[i].revents & (POLLERR|POLLNVAL)) {	    if (receiver_stat->target[targetix].reported++ % 100 == 0) {	      int error;	      unsigned int len = sizeof(error);	      getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &error, &len);	      ast_log(LOG_NOTICE, "Socket connection failed to host: %s, inf %s, addr %s, error: %s\n", host_name, if_name, inaddr2s(receiver->targets[targetix].inf->addr), strerror(error));	    }	    disconnect_receiver(receiver, targetix);	  }	  else if(fds[i].revents & (POLLHUP)) {	    ast_log(LOG_NOTICE, "Lost connection to receiver host: %s, inf %s, addr %s\n", host_name, if_name, inaddr2s(receiver->targets[targetix].inf->addr));	    disconnect_receiver(receiver, targetix);	  }	  else if(fds[i].revents & (POLLIN|POLLOUT)) {	    ast_log(LOG_NOTICE, "Connected to receiver host: %s, inf %s, addr %s \n", host_name, if_name, inaddr2s(receiver->targets[targetix].inf->addr));	    receiver_stat->target[targetix].connected = 1;	    receiver_stat->target[targetix].inprogress = 0;	    receiver_stat->target[targetix].reported = 0;	  }	}	break;	}      }    }    cluster_send_keep_alive();    if (check_receiver_connections())      rebuild_fds = 1;    check_senders();  }  return NULL;}static void build_sender_list(void){  int hostix = 0;  struct host* host = NULL;    while ((host = lookup_host_by_id(hostix)) != NULL) {    if (host != this_host) {      int linkix, targetix;      for (linkix = 0; linkix < host->n_receivers; linkix++) {	for (targetix = 0; targetix < host->receivers[linkix].n_targets; targetix++) {	  if (host->receivers[linkix].targets[targetix].host == this_host) {	    int j;	    for (j = 0; j < host->n_ifs; j++) {	      add_sender(host, host->ifs[j].addr, hostix);	    }	  }	}      }    }    hostix++;  }  if (!n_senders) {    ast_log(LOG_DEBUG, "Found no senders to supervise\n");  }}static void wait_for_connections(void){  int cnt;  int linkix, targetix;  for (cnt = 0; cnt < 800; cnt++) {    int n = 0, c = 0;    for (linkix = 0; linkix < this_host->n_receivers; linkix++) {      for (targetix = 0; targetix < this_host->receivers[linkix].n_targets; targetix++) {	c += 1;	if (receiver_stats[linkix].target[targetix].connected)	  n += 1;      }    }    if (cnt % 100 == 0)      ast_log(LOG_DEBUG, "wait %d %d %d %d\n", n, c, n_accepted, n_senders);    if ((n == c) && (n_accepted == n_senders))      break;    usleep(10*1000);  }}int cluster_init(void (*isup_event_handler_callback)(struct mtp_event*),		 void (*isup_block_handler_callback)(struct link*)){  int i, j;  int res;  int flags;  struct sched_param sp;  isup_event_handler = isup_event_handler_callback;  isup_block_handler = isup_block_handler_callback;  build_sender_list();  for (i = 0; i < this_host->n_receivers; i++) {    for (j = 0; j < this_host->receivers[i].n_targets; j++) {      receiver_stats[i].target[j].fd = -1;      receiver_stats[i].target[j].connected = 0;      receiver_stats[i].target[j].inprogress = 0;      receiver_stats[i].target[j].reported = 0;    }  }  for (i = 0; i < this_host->n_receivers; i++) {    for (j = 0; j < this_host->receivers[i].n_targets; j++) {      struct host* host = this_host->receivers[i].targets[j].host;      int l;      for (l = 0; l < host->n_spans; l++) {	struct link* link = host->spans[l].link;	if (link->schannel >= 0)	  this_host->has_signalling_receivers = 1;      }    }  }  if (this_host->has_signalling_receivers)    if (setup_receiver_socket())      goto fail;  connect_receivers();  receivepipe[0] = receivepipe[1] = -1;  receivebuf = lffifo_alloc(200000);  res = pipe(receivepipe);  if(res < 0) {    ast_log(LOG_ERROR, "Unable to allocate cluster event pipe: %s.\n",            strerror(errno));    goto fail;  }  res = fcntl(receivepipe[0], F_GETFL);  if(res < 0) {    ast_log(LOG_ERROR, "Could not obtain flags for read end of "            "cluster event pipe: %s.\n", strerror(errno));    goto fail;  }  flags = res | O_NONBLOCK;  res = fcntl(receivepipe[0], F_SETFL, flags);  if(res < 0) {    ast_log(LOG_ERROR, "Could not set read end of cluster event pipe "            "non-blocking: %s.\n", strerror(errno));    goto fail;  }  res = fcntl(receivepipe[1], F_GETFL);  if(res < 0) {    ast_log(LOG_ERROR, "Could not obtain flags for write end of "            "cluster event pipe: %s.\n", strerror(errno));    goto fail;  }  flags = res | O_NONBLOCK;  res = fcntl(receivepipe[1], F_SETFL, flags);  if(res < 0) {    ast_log(LOG_ERROR, "Could not set write end of cluster event pipe "            "non-blocking: %s.\n", strerror(errno));    goto fail;  }  cluster_sched = sched_context_create();  if(cluster_sched == NULL) {    ast_log(LOG_ERROR, "Unable to create cluster scheduling context.\n");    goto fail;  }  cluster_running = 1;          /* Otherwise there is a race, and                                   cluster may exit immediately */  if(ast_pthread_create(&cluster_thread, NULL, cluster_thread_main, NULL) < 0) {    ast_log(LOG_ERROR, "Unable to start cluster thread.\n");    cluster_running = 0;    goto fail;  }  memset(&sp, 0, sizeof(sp));  sp.sched_priority = 10;  res = pthread_setschedparam(cluster_thread, SCHED_RR, &sp);  if(res != 0) {    ast_log(LOG_WARNING, "Failed to set cluster thread to realtime priority: %s.\n",            strerror(res));  }  wait_for_connections();  return 0; fail:  cluster_cleanup();  return -1;}void cluster_cleanup(void){  int i, j;  if(cluster_running) {    cluster_running = 0;    /* Monitor wakes up periodically, so no need to signal it explicitly. */    pthread_join(cluster_thread, NULL);  }  if(cluster_sched) {    sched_context_destroy(cluster_sched);    cluster_sched = NULL;  }  if(receivebuf) {    lffifo_free(receivebuf);    receivebuf = NULL;  }  if(receivepipe[0] != -1) {    close(receivepipe[0]);    receivepipe[0] = -1;  }  if(receivepipe[1] != -1) {    close(receivepipe[1]);    receivepipe[1] = -1;  }  if (receiver_socket != -1) {    shutdown(receiver_socket, SHUT_RDWR);    close(receiver_socket);    receiver_socket = -1;  }  for (i = 0; i < n_accepted; i++) {    shutdown(accepted[i].fd, SHUT_RDWR);    close(accepted[i].fd);  }  n_accepted = 0;  if (this_host) {    for (i = 0; i < this_host->n_receivers; i++) {      for (j = 0; j < this_host->receivers[i].n_targets; j++) {	if (receiver_stats[i].target[j].connected || receiver_stats[i].target[j].inprogress) {	  shutdown(receiver_stats[i].target[j].fd, SHUT_RDWR);	  close(receiver_stats[i].target[j].fd);	  receiver_stats[i].target[j].connected = 0;	  receiver_stats[i].target[j].inprogress = 0;	}      }    }  }  n_senders = 0;}int cmd_cluster_start(int fd, int argc, char *argv[]){  if (!cluster_running)    return cluster_init(isup_event_handler, isup_block_handler);  return 0;}int cmd_cluster_stop(int fd, int argc, char *argv[]){  if (cluster_running)    cluster_cleanup();  return 0;}int cmd_cluster_status(int fd, int argc, char *argv[]){  int i;  int linkix, targetix;  gettimeofday(&now, NULL);  for (i = 0; i < n_senders; i++) {    int tdiff = timediff_msec(now, senders[i].last);    char* s = "";    switch (senders[i].state) {    case STATE_UNKNOWN:      s = "unknown"; tdiff = 0; break;    case STATE_ALIVE:      s = "alive"; break;    case STATE_DEAD:      s = "dead"; break;    }    ast_cli(fd, "sender %s, addr %s, state %s, last %d msec, up %d, down %d\n", senders[i].host->name, inaddr2s(senders[i].addr), s, tdiff, senders[i].up, senders[i].down);  }  for (linkix = 0; linkix < this_host->n_receivers; linkix++) {    for (targetix = 0; targetix < this_host->receivers[linkix].n_targets; targetix++) {      char* if_name = this_host->receivers[linkix].targets[targetix].inf->name;      char* host_name = this_host->receivers[linkix].targets[targetix].host->name;      struct in_addr addr = this_host->receivers[linkix].targets[targetix].inf->addr;      char* c = (receiver_stats[linkix].target[targetix].connected) ? "connected" : "";      char* p = (receiver_stats[linkix].target[targetix].inprogress) ? "inprogress" : "";      int tdiff = timediff_msec(now, receiver_stats[linkix].target[targetix].lasttry);      ast_cli(fd, "receiver %s if %s, addr %s, c:%s, p:%s, last try %d msec, %d fails, %lu forwards\n",	      host_name, if_name,  inaddr2s(addr), c, p,	      tdiff, receiver_stats[linkix].target[targetix].fails,	      receiver_stats[linkix].target[targetix].forwards);    }  }  return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -