📄 cluster.c
字号:
} return 0;}static void disconnect_receiver(struct receiver* receiver, int targetix){ struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix]; if (receiver_stat->target[targetix].connected || receiver_stat->target[targetix].inprogress) { ast_log(LOG_DEBUG, "Disconnect receiver %s %d\n", receiver->targets[targetix].host->name, targetix); if (receiver_stat->target[targetix].fd != -1) { close(receiver_stat->target[targetix].fd); receiver_stat->target[targetix].fd = -1; } receiver_stat->target[targetix].connected = 0; receiver_stat->target[targetix].inprogress = 0; receiver_stat->target[targetix].fails += 1; }}static void connect_receiver(int receiverix, int targetix){ struct sockaddr_in sock; struct in_addr addr = this_host->receivers[receiverix].targets[targetix].inf->addr; char* host_name = this_host->receivers[receiverix].targets[targetix].host->name; int s; int flags; int res; receiver_stats[receiverix].target[targetix].fd = -1; receiver_stats[receiverix].target[targetix].connected = 0; receiver_stats[receiverix].target[targetix].inprogress = 0; gettimeofday(&receiver_stats[receiverix].target[targetix].lasttry, NULL); s = socket(PF_INET, SOCK_STREAM, 0); if (s < 0) { ast_log(LOG_ERROR, "Cannot create receiver socket, errno=%d: %s\n", errno, strerror(errno)); return; } memset(&sock, 0, sizeof(struct sockaddr_in)); sock.sin_family = AF_INET; sock.sin_port = htons(clusterlistenport); memcpy(&sock.sin_addr, &addr, sizeof(addr)); res = fcntl(s, F_GETFL); if(res < 0) { ast_log(LOG_WARNING, "SS7: Could not obtain flags for socket fd: %s.\n", strerror(errno)); return; } flags = res | O_NONBLOCK; res = fcntl(s, F_SETFL, flags); if(res < 0) { ast_log(LOG_WARNING, "SS7: Could not set socket fd non-blocking: %s.\n", strerror(errno)); return; } ast_log(LOG_DEBUG, "Trying to connect to %s %s\n", host_name, inaddr2s(sock.sin_addr)); if (connect(s, &sock, sizeof(sock)) < 0) { if (errno != EINPROGRESS) { ast_log(LOG_ERROR, "Cannot connect receiver socket %s, %s\n", inaddr2s(sock.sin_addr), strerror(errno)); close(s); return; } // set_socket_opt(s, SOL_TCP, TCP_NODELAY, 1); } receiver_stats[receiverix].target[targetix].fd = s; receiver_stats[receiverix].target[targetix].inprogress = 1;}static void connect_receivers(void){ int receiverix, targetix; for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) { for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) { connect_receiver(receiverix, targetix); } }}static int check_receiver_connections(void){ int receiverix, targetix; int any = 0; for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) { for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) { int tdiff = timediff_msec(now, receiver_stats[receiverix].target[targetix].lasttry); if (!receiver_stats[receiverix].target[targetix].connected && !receiver_stats[receiverix].target[targetix].inprogress) { if (tdiff > CLUSTER_CONNECT_RETRY_INTERVAL) { any++; connect_receiver(receiverix, targetix); } } else if (receiver_stats[receiverix].target[targetix].inprogress) { if (tdiff > CLUSTER_CONNECT_TIMEOUT) { close(receiver_stats[receiverix].target[targetix].fd); receiver_stats[receiverix].target[targetix].inprogress = 0; any++; ast_log(LOG_NOTICE, "Timed out on receiver connection to %s, receiverix %d targetix %d, tdiff %d\n", inaddr2s(this_host->receivers[receiverix].targets[targetix].inf->addr), receiverix, targetix, tdiff); } } } } return any;}static void cluster_send_packet(struct receiver* receiver, int targetix, unsigned char* buf, int len){ int res; struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix]; // ast_log(LOG_DEBUG, "send packet %s, targetix %d, connected %d\n", receiver->targets[targetix].host->name, targetix, receiver_stat->target[targetix].connected); if (receiver_stats[receiver->receiverix].target[targetix].connected) { gettimeofday(&receiver_stat->target[targetix].lasttry, NULL); res = write(receiver_stat->target[targetix].fd, buf, len); if (res < 0) { close(receiver_stat->target[targetix].fd); receiver_stat->target[targetix].connected = 0; receiver_stat->target[targetix].fails += 1; rebuild_fds = 1; ast_log(LOG_ERROR, "Write socket to host '%s' target %d, errno=%d: %s\n", receiver->targets[targetix].host->name, targetix, errno, strerror(errno)); } }}static void cluster_send_packets(struct receiver* receiver, unsigned char* buf, int len){ int targetix, firstsendix = -1; struct mtp_event* event = (struct mtp_event*) buf; struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix]; event->seq_no = sequence_number++; for (targetix = 0; targetix < receiver->n_targets; targetix++) { ast_log(LOG_DEBUG, "send packets %s, targetix %d, connected %d\n", receiver->targets[targetix].host->name, targetix, receiver_stat->target[targetix].connected); if (receiver_stat->target[targetix].connected) { if (firstsendix == -1) firstsendix = targetix; if ((event->typ != MTP_REQ_ISUP_FORWARD) || ((event->typ == MTP_REQ_ISUP_FORWARD) && /* Only one other host should forward ISUP packet */ (receiver->targets[targetix].host == receiver->targets[firstsendix].host))) if (event->typ == MTP_REQ_ISUP_FORWARD) receiver_stat->target[targetix].forwards += 1; cluster_send_packet(receiver, targetix, buf, len); } }}static void cluster_send_keep_alive(void){ struct mtp_event event; int receiverix, targetix; event.typ = MTP_EVENT_ALIVE; event.len = 0; event.seq_no = sequence_number++; for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) { for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) { int tdiff = timediff_msec(now, receiver_stats[receiverix].target[targetix].lasttry); if (tdiff > CLUSTER_KEEP_ALIVE_INTERVAL) cluster_send_packet(&this_host->receivers[receiverix], targetix, (char*) &event, sizeof(event)); } }}static int find_next_timeout(void){ int receiverix, targetix; int maxwait = CLUSTER_KEEP_ALIVE_INTERVAL; for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) { for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) { if (receiver_stats[receiverix].target[targetix].connected) { int tdiff = timediff_msec(now, receiver_stats[receiverix].target[targetix].lasttry); int wait = CLUSTER_KEEP_ALIVE_INTERVAL - tdiff; if (wait < maxwait) maxwait = wait; } } } if (maxwait < 0) maxwait = 0; return maxwait;}static int cluster_receive_packet(int senderix, int fd){ int res; int hostix = senders[senderix].hostix; char buf[MTP_EVENT_MAX_SIZE]; struct mtp_event* event = (struct mtp_event*) &buf; struct mtp_req* req = (struct mtp_req*) &buf; int sz = sizeof(event->typ); res = read(fd, buf, sz); if (res <= 0) { ast_log(LOG_NOTICE, "Could not read received packet: %s.\n", strerror(errno)); return -1; } else if (res == 0) { ast_log(LOG_NOTICE, "Received 0 bytes, closing socket: %s.\n", strerror(errno)); shutdown(fd, SHUT_RDWR); return -1; } if (event->typ < MTP_EVENT_ALIVE) { res = read(fd, &buf[sz], sizeof(*req)-sz); if (res > 0) res = read(fd, req->buf, req->len); // ast_log(LOG_DEBUG, "Received mtp req %d, buff len %d, res %d\n", req->typ, req->len, res); } else { res = read(fd, &buf[sz], sizeof(*event)-sz); if (res > 0) res = read(fd, event->buf, event->len); } if (host_last_seq_no[hostix] >= event->seq_no) { return 0; } host_last_seq_no[hostix] = event->seq_no; if (res > 0) { ast_log(LOG_DEBUG, "Received event, senderix=%d, hostix=%d, lastseq=%ld, seqno=%ld, typ=%d\n", senderix, hostix, host_last_seq_no[hostix], event->seq_no, event->typ); if ((event->typ == MTP_EVENT_ISUP) || (event->typ == MTP_REQ_ISUP_FORWARD)) { if (isup_event_handler) (*isup_event_handler)(event); } } if (res < 0) ast_log(LOG_NOTICE, "Could not read received packet: %s.\n", strerror(errno)); return res;}static void *cluster_thread_main(void *data){ int i, j; int res; fds[0].fd = receivepipe[0]; fds[0].events = POLLIN; fds_type[0] = FD_PIPE; fds[1].fd = receiver_socket; if (receiver_socket > 0) fds[1].events = POLLIN; else fds[1].events = 0; fds_type[1] = FD_LISTEN; ast_verbose(VERBOSE_PREFIX_3 "Starting cluster thread, pid=%d.\n", getpid()); while (cluster_running) { int timeout; int maxtimeout; gettimeofday(&now, NULL); timeout = ast_sched_wait(cluster_sched); maxtimeout = find_next_timeout(); if(timeout <= 0 || timeout > CLUSTER_WAKEUP_INTERVAL) { timeout = CLUSTER_WAKEUP_INTERVAL; } if (timeout > maxtimeout) timeout = maxtimeout; if (rebuild_fds) { n_fds = 2; for (i = 0; i < n_accepted; i++) { fds[n_fds].fd = accepted[i].fd; fds[n_fds].events = POLLIN|POLLERR|POLLHUP; fds_type[n_fds++] = FD_ACCEPTED; } for (i = 0; i < this_host->n_receivers; i++) { for (j = 0; j < this_host->receivers[i].n_targets; j++) { fds_receivers[n_fds] = this_host->receivers[i]; fds_targetix[n_fds] = j; if (receiver_stats[i].target[j].connected) { fds[n_fds].fd = receiver_stats[i].target[j].fd; fds[n_fds].events = POLLERR|POLLHUP; fds_type[n_fds++] = FD_RECEIVER; } else if (receiver_stats[i].target[j].inprogress) { fds[n_fds].fd = receiver_stats[i].target[j].fd; fds[n_fds].events = POLLOUT|POLLERR|POLLHUP; fds_type[n_fds++] = FD_INPROGRESS; } } } rebuild_fds = 0; } res = poll(fds, n_fds, timeout); gettimeofday(&now, NULL); if(res < 0) { if(errno == EINTR) { /* Just try again. */ } else { ast_log(LOG_ERROR, "poll() failure, errno=%d: %s\n", errno, strerror(errno)); } } else if(res > 0) { for (i = 0; i < n_fds; i++) { if(!(fds[i].revents & (POLLERR|POLLNVAL|POLLHUP|POLLIN|POLLOUT))) continue; switch (fds_type[i]) { case FD_PIPE: { if(fds[i].revents & POLLIN) { int linkix; unsigned char fifobuf[1024]; struct mtp_req* req = (struct mtp_req*) &fifobuf; res = read(fds[i].fd, &linkix, sizeof(linkix)); if (res < 0) { ast_log(LOG_NOTICE, "Could not read cluster event pipe: %s.\n", strerror(errno)); continue; } if ((res = lffifo_get(receivebuf, fifobuf, sizeof(fifobuf))) != 0) { if(res < 0) { ast_log(LOG_ERROR, "Got oversize packet in cluster receive buffer.\n"); continue; } } ast_log(LOG_DEBUG, "fifo get res %d, typ %d, linkix %d, link %s\n", res, req->typ, linkix, links[linkix].name); if (res > 0) { if ((req->typ == MTP_REQ_ISUP) || (req->typ == MTP_REQ_ISUP_FORWARD) || (req->typ == MTP_EVENT_ISUP)) { if (links[linkix].receiver) { cluster_send_packets(links[linkix].receiver, fifobuf, res); } else { ast_log(LOG_WARNING, "No way to send packet to cluster, link='%s', reqtype=%d\n", links[linkix].name, req->typ); } } } } break; } case FD_LISTEN: { if(fds[i].revents & POLLIN) { struct sockaddr_in from_addr; unsigned int len = sizeof(struct sockaddr_in); int afd = accept(receiver_socket, (struct sockaddr *)&from_addr, &len); if (afd != -1) { struct host* host = lookup_host_by_addr(from_addr.sin_addr); if (host) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -