⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cluster.c

📁 asterisk 的7号信令处理模块
💻 C
📖 第 1 页 / 共 3 页
字号:
  }  return 0;}static void disconnect_receiver(struct receiver* receiver, int targetix){  struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix];  if (receiver_stat->target[targetix].connected || receiver_stat->target[targetix].inprogress) {    ast_log(LOG_DEBUG, "Disconnect receiver %s %d\n", receiver->targets[targetix].host->name, targetix);    if (receiver_stat->target[targetix].fd != -1) {      close(receiver_stat->target[targetix].fd);      receiver_stat->target[targetix].fd = -1;    }    receiver_stat->target[targetix].connected = 0;    receiver_stat->target[targetix].inprogress = 0;    receiver_stat->target[targetix].fails += 1;  }}static void connect_receiver(int receiverix, int targetix){  struct sockaddr_in sock;  struct in_addr addr = this_host->receivers[receiverix].targets[targetix].inf->addr;  char* host_name = this_host->receivers[receiverix].targets[targetix].host->name;  int s;  int flags;  int res;  receiver_stats[receiverix].target[targetix].fd = -1;  receiver_stats[receiverix].target[targetix].connected = 0;  receiver_stats[receiverix].target[targetix].inprogress = 0;  gettimeofday(&receiver_stats[receiverix].target[targetix].lasttry, NULL);  s = socket(PF_INET, SOCK_STREAM, 0);  if (s < 0) {    ast_log(LOG_ERROR, "Cannot create receiver socket, errno=%d: %s\n", errno, strerror(errno));    return;  }  memset(&sock, 0, sizeof(struct sockaddr_in));  sock.sin_family = AF_INET;  sock.sin_port = htons(clusterlistenport);  memcpy(&sock.sin_addr, &addr, sizeof(addr));  res = fcntl(s, F_GETFL);  if(res < 0) {    ast_log(LOG_WARNING, "SS7: Could not obtain flags for socket fd: %s.\n", strerror(errno));    return;  }  flags = res | O_NONBLOCK;  res = fcntl(s, F_SETFL, flags);  if(res < 0) {    ast_log(LOG_WARNING, "SS7: Could not set socket fd non-blocking: %s.\n", strerror(errno));    return;  }  ast_log(LOG_DEBUG, "Trying to connect to %s %s\n", host_name, inaddr2s(sock.sin_addr));  if (connect(s, &sock, sizeof(sock)) < 0) {    if (errno != EINPROGRESS) {      ast_log(LOG_ERROR, "Cannot connect receiver socket %s, %s\n", inaddr2s(sock.sin_addr), strerror(errno));      close(s);      return;    }    // set_socket_opt(s, SOL_TCP, TCP_NODELAY, 1);  }  receiver_stats[receiverix].target[targetix].fd = s;  receiver_stats[receiverix].target[targetix].inprogress = 1;}static void connect_receivers(void){  int receiverix, targetix;  for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {    for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) {      connect_receiver(receiverix, targetix);    }  }}static int check_receiver_connections(void){  int receiverix, targetix;  int any = 0;  for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {    for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) {      int tdiff = timediff_msec(now, receiver_stats[receiverix].target[targetix].lasttry);      if (!receiver_stats[receiverix].target[targetix].connected && !receiver_stats[receiverix].target[targetix].inprogress) {	if (tdiff > CLUSTER_CONNECT_RETRY_INTERVAL) {	  any++;	  connect_receiver(receiverix, targetix);	}      }      else if (receiver_stats[receiverix].target[targetix].inprogress) {	if (tdiff > CLUSTER_CONNECT_TIMEOUT) {	  close(receiver_stats[receiverix].target[targetix].fd);	  receiver_stats[receiverix].target[targetix].inprogress = 0;	  any++;	  ast_log(LOG_NOTICE, "Timed out on receiver connection to %s, receiverix %d targetix %d, tdiff %d\n", inaddr2s(this_host->receivers[receiverix].targets[targetix].inf->addr), receiverix, targetix, tdiff);	}      }    }  }  return any;}static void cluster_send_packet(struct receiver* receiver, int targetix, unsigned char* buf, int len){  int res;  struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix];  //  ast_log(LOG_DEBUG, "send packet %s, targetix %d, connected %d\n", receiver->targets[targetix].host->name, targetix, receiver_stat->target[targetix].connected);  if (receiver_stats[receiver->receiverix].target[targetix].connected) {    gettimeofday(&receiver_stat->target[targetix].lasttry, NULL);    res = write(receiver_stat->target[targetix].fd, buf, len);    if (res < 0) {      close(receiver_stat->target[targetix].fd);      receiver_stat->target[targetix].connected = 0;      receiver_stat->target[targetix].fails += 1;      rebuild_fds = 1;      ast_log(LOG_ERROR, "Write socket to host '%s' target %d, errno=%d: %s\n", receiver->targets[targetix].host->name, targetix, errno, strerror(errno));    }  }}static void cluster_send_packets(struct receiver* receiver, unsigned char* buf, int len){  int targetix, firstsendix = -1;  struct mtp_event* event = (struct mtp_event*) buf;  struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix];  event->seq_no = sequence_number++;  for (targetix = 0; targetix < receiver->n_targets; targetix++) {    ast_log(LOG_DEBUG, "send packets %s, targetix %d, connected %d\n", receiver->targets[targetix].host->name, targetix, receiver_stat->target[targetix].connected);    if (receiver_stat->target[targetix].connected) {      if (firstsendix == -1)	firstsendix = targetix;      if ((event->typ != MTP_REQ_ISUP_FORWARD) ||	  ((event->typ == MTP_REQ_ISUP_FORWARD) && /* Only one other host should forward ISUP packet */	   (receiver->targets[targetix].host == receiver->targets[firstsendix].host)))	if (event->typ == MTP_REQ_ISUP_FORWARD)	  receiver_stat->target[targetix].forwards += 1;	cluster_send_packet(receiver, targetix, buf, len);    }  }}static void cluster_send_keep_alive(void){  struct mtp_event event;  int receiverix, targetix;  event.typ = MTP_EVENT_ALIVE;  event.len = 0;  event.seq_no = sequence_number++;  for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {    for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) {      int tdiff = timediff_msec(now, receiver_stats[receiverix].target[targetix].lasttry);      if (tdiff > CLUSTER_KEEP_ALIVE_INTERVAL)	cluster_send_packet(&this_host->receivers[receiverix], targetix, (char*) &event, sizeof(event));    }  }}static int find_next_timeout(void){  int receiverix, targetix;  int maxwait = CLUSTER_KEEP_ALIVE_INTERVAL;  for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {    for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) {      if (receiver_stats[receiverix].target[targetix].connected) {	int tdiff = timediff_msec(now, receiver_stats[receiverix].target[targetix].lasttry);	int wait = CLUSTER_KEEP_ALIVE_INTERVAL - tdiff;	if (wait < maxwait)	  maxwait = wait;      }    }  }  if (maxwait < 0)    maxwait = 0;  return maxwait;}static int cluster_receive_packet(int senderix, int fd){  int res;  int hostix = senders[senderix].hostix;  char buf[MTP_EVENT_MAX_SIZE];  struct mtp_event* event = (struct mtp_event*) &buf;  struct mtp_req* req = (struct mtp_req*) &buf;  int sz = sizeof(event->typ);  res = read(fd, buf, sz);  if (res <= 0) {    ast_log(LOG_NOTICE, "Could not read received packet: %s.\n", strerror(errno));    return -1;  }  else if (res == 0) {    ast_log(LOG_NOTICE, "Received 0 bytes, closing socket: %s.\n", strerror(errno));    shutdown(fd, SHUT_RDWR);    return -1;  }  if (event->typ < MTP_EVENT_ALIVE) {    res = read(fd, &buf[sz], sizeof(*req)-sz);    if (res > 0)      res = read(fd, req->buf, req->len);    //    ast_log(LOG_DEBUG, "Received mtp req %d, buff len %d, res %d\n", req->typ, req->len, res);  }  else {    res = read(fd, &buf[sz], sizeof(*event)-sz);    if (res > 0)      res = read(fd, event->buf, event->len);  }  if (host_last_seq_no[hostix] >= event->seq_no) {    return 0;  }  host_last_seq_no[hostix] = event->seq_no;  if (res > 0) {    ast_log(LOG_DEBUG, "Received event, senderix=%d, hostix=%d, lastseq=%ld, seqno=%ld, typ=%d\n", senderix, hostix, host_last_seq_no[hostix], event->seq_no, event->typ);    if ((event->typ == MTP_EVENT_ISUP) || (event->typ == MTP_REQ_ISUP_FORWARD)) {      if (isup_event_handler)	(*isup_event_handler)(event);    }  }  if (res < 0)    ast_log(LOG_NOTICE, "Could not read received packet: %s.\n", strerror(errno));  return res;}static void *cluster_thread_main(void *data){  int i, j;  int res;  fds[0].fd = receivepipe[0];  fds[0].events = POLLIN;  fds_type[0] = FD_PIPE;  fds[1].fd = receiver_socket;  if (receiver_socket > 0)    fds[1].events = POLLIN;  else    fds[1].events = 0;  fds_type[1] = FD_LISTEN;  ast_verbose(VERBOSE_PREFIX_3 "Starting cluster thread, pid=%d.\n", getpid());  while (cluster_running) {    int timeout;    int maxtimeout;    gettimeofday(&now, NULL);    timeout = ast_sched_wait(cluster_sched);    maxtimeout = find_next_timeout();    if(timeout <= 0 || timeout > CLUSTER_WAKEUP_INTERVAL) {      timeout = CLUSTER_WAKEUP_INTERVAL;    }    if (timeout > maxtimeout)      timeout = maxtimeout;    if (rebuild_fds) {      n_fds = 2;      for (i = 0; i < n_accepted; i++) {	fds[n_fds].fd = accepted[i].fd;	fds[n_fds].events = POLLIN|POLLERR|POLLHUP;	fds_type[n_fds++] = FD_ACCEPTED;      }      for (i = 0; i < this_host->n_receivers; i++) {	for (j = 0; j < this_host->receivers[i].n_targets; j++) {	  fds_receivers[n_fds] = this_host->receivers[i];	  fds_targetix[n_fds] = j;	  if (receiver_stats[i].target[j].connected) {	    fds[n_fds].fd = receiver_stats[i].target[j].fd;	    fds[n_fds].events = POLLERR|POLLHUP;	    fds_type[n_fds++] = FD_RECEIVER;	  }	  else if (receiver_stats[i].target[j].inprogress) {	    fds[n_fds].fd = receiver_stats[i].target[j].fd;	    fds[n_fds].events = POLLOUT|POLLERR|POLLHUP;	    fds_type[n_fds++] = FD_INPROGRESS;	  }	}      }      rebuild_fds = 0;    }    res = poll(fds, n_fds, timeout);    gettimeofday(&now, NULL);    if(res < 0) {      if(errno == EINTR) {        /* Just try again. */      } else {        ast_log(LOG_ERROR, "poll() failure, errno=%d: %s\n", errno, strerror(errno));      }    } else if(res > 0) {      for (i = 0; i < n_fds; i++) {	if(!(fds[i].revents & (POLLERR|POLLNVAL|POLLHUP|POLLIN|POLLOUT)))	  continue;	switch (fds_type[i]) {	case FD_PIPE: {	  if(fds[i].revents & POLLIN) {	    int linkix;	    unsigned char fifobuf[1024];	    struct mtp_req* req = (struct mtp_req*) &fifobuf;	    res = read(fds[i].fd, &linkix, sizeof(linkix));	    if (res < 0) {	      ast_log(LOG_NOTICE, "Could not read cluster event pipe: %s.\n", strerror(errno));	      continue;	    }	    if ((res = lffifo_get(receivebuf, fifobuf, sizeof(fifobuf))) != 0) {	      if(res < 0) {		ast_log(LOG_ERROR, "Got oversize packet in cluster receive buffer.\n");		continue;	      }	    }	    ast_log(LOG_DEBUG, "fifo get res %d, typ %d, linkix %d, link %s\n", res, req->typ, linkix, links[linkix].name);	    if (res > 0) {	      if ((req->typ == MTP_REQ_ISUP) || (req->typ == MTP_REQ_ISUP_FORWARD) || (req->typ == MTP_EVENT_ISUP)) {		if (links[linkix].receiver) {		  cluster_send_packets(links[linkix].receiver, fifobuf, res);		}		else {		  ast_log(LOG_WARNING, "No way to send packet to cluster, link='%s', reqtype=%d\n", links[linkix].name, req->typ);		}	      }	    }	    	  }	  break;	}	case FD_LISTEN: {	  if(fds[i].revents & POLLIN) {	    struct sockaddr_in from_addr;	    unsigned int len = sizeof(struct sockaddr_in);	    int afd = accept(receiver_socket, (struct sockaddr *)&from_addr, &len);	    if (afd != -1) {	      struct host* host = lookup_host_by_addr(from_addr.sin_addr);	      if (host) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -