📄 cluster.c
字号:
}
return 0;
}
static void disconnect_receiver(struct receiver* receiver, int targetix)
{
struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix];
if (receiver_stat->target[targetix].connected || receiver_stat->target[targetix].inprogress) {
ast_log(LOG_DEBUG, "Disconnect receiver %s %d\n", receiver->targets[targetix].host->name, targetix);
if (receiver_stat->target[targetix].fd != -1) {
close(receiver_stat->target[targetix].fd);
receiver_stat->target[targetix].fd = -1;
}
receiver_stat->target[targetix].connected = 0;
receiver_stat->target[targetix].inprogress = 0;
receiver_stat->target[targetix].fails += 1;
}
}
static void connect_receiver(int receiverix, int targetix)
{
struct sockaddr_in sock;
struct in_addr addr = this_host->receivers[receiverix].targets[targetix].inf->addr;
char* host_name = this_host->receivers[receiverix].targets[targetix].host->name;
int s;
int flags;
int res;
receiver_stats[receiverix].target[targetix].fd = -1;
receiver_stats[receiverix].target[targetix].connected = 0;
receiver_stats[receiverix].target[targetix].inprogress = 0;
gettimeofday(&receiver_stats[receiverix].target[targetix].lasttry, NULL);
s = socket(PF_INET, SOCK_STREAM, 0);
if (s < 0) {
ast_log(LOG_ERROR, "Cannot create receiver socket, errno=%d: %s\n", errno, strerror(errno));
return;
}
memset(&sock, 0, sizeof(struct sockaddr_in));
sock.sin_family = AF_INET;
sock.sin_port = htons(clusterlistenport);
memcpy(&sock.sin_addr, &addr, sizeof(addr));
res = fcntl(s, F_GETFL);
if(res < 0) {
ast_log(LOG_WARNING, "SS7: Could not obtain flags for socket fd: %s.\n", strerror(errno));
return;
}
flags = res | O_NONBLOCK;
res = fcntl(s, F_SETFL, flags);
if(res < 0) {
ast_log(LOG_WARNING, "SS7: Could not set socket fd non-blocking: %s.\n", strerror(errno));
return;
}
ast_log(LOG_DEBUG, "Trying to connect to %s %s\n", host_name, inaddr2s(sock.sin_addr));
if (connect(s, &sock, sizeof(sock)) < 0) {
if (errno != EINPROGRESS) {
ast_log(LOG_ERROR, "Cannot connect receiver socket %s, %s\n", inaddr2s(sock.sin_addr), strerror(errno));
close(s);
return;
}
// set_socket_opt(s, SOL_TCP, TCP_NODELAY, 1);
}
receiver_stats[receiverix].target[targetix].fd = s;
receiver_stats[receiverix].target[targetix].inprogress = 1;
}
static void connect_receivers(void)
{
int receiverix, targetix;
for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {
for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) {
connect_receiver(receiverix, targetix);
}
}
}
static int check_receiver_connections(void)
{
int receiverix, targetix;
int any = 0;
for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {
for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) {
int tdiff = timediff_msec(now, receiver_stats[receiverix].target[targetix].lasttry);
if (!receiver_stats[receiverix].target[targetix].connected && !receiver_stats[receiverix].target[targetix].inprogress) {
if (tdiff > CLUSTER_CONNECT_RETRY_INTERVAL) {
any++;
connect_receiver(receiverix, targetix);
}
}
else if (receiver_stats[receiverix].target[targetix].inprogress) {
if (tdiff > CLUSTER_CONNECT_TIMEOUT) {
close(receiver_stats[receiverix].target[targetix].fd);
receiver_stats[receiverix].target[targetix].inprogress = 0;
any++;
ast_log(LOG_NOTICE, "Timed out on receiver connection to %s, receiverix %d targetix %d, tdiff %d\n", inaddr2s(this_host->receivers[receiverix].targets[targetix].inf->addr), receiverix, targetix, tdiff);
}
}
}
}
return any;
}
static void cluster_send_packet(struct receiver* receiver, int targetix, unsigned char* buf, int len)
{
int res;
struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix];
// ast_log(LOG_DEBUG, "send packet %s, targetix %d, connected %d\n", receiver->targets[targetix].host->name, targetix, receiver_stat->target[targetix].connected);
if (receiver_stats[receiver->receiverix].target[targetix].connected) {
gettimeofday(&receiver_stat->target[targetix].lasttry, NULL);
res = write(receiver_stat->target[targetix].fd, buf, len);
if (res < 0) {
close(receiver_stat->target[targetix].fd);
receiver_stat->target[targetix].connected = 0;
receiver_stat->target[targetix].fails += 1;
rebuild_fds = 1;
ast_log(LOG_ERROR, "Write socket to host '%s' target %d, errno=%d: %s\n", receiver->targets[targetix].host->name, targetix, errno, strerror(errno));
}
}
}
static void cluster_send_packets(struct receiver* receiver, unsigned char* buf, int len)
{
int targetix, firstsendix = -1;
struct mtp_event* event = (struct mtp_event*) buf;
struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix];
event->seq_no = sequence_number++;
for (targetix = 0; targetix < receiver->n_targets; targetix++) {
ast_log(LOG_DEBUG, "send packets %s, targetix %d, connected %d\n", receiver->targets[targetix].host->name, targetix, receiver_stat->target[targetix].connected);
if (receiver_stat->target[targetix].connected) {
if (firstsendix == -1)
firstsendix = targetix;
if ((event->typ != MTP_REQ_ISUP_FORWARD) ||
((event->typ == MTP_REQ_ISUP_FORWARD) && /* Only one other host should forward ISUP packet */
(receiver->targets[targetix].host == receiver->targets[firstsendix].host)))
if (event->typ == MTP_REQ_ISUP_FORWARD)
receiver_stat->target[targetix].forwards += 1;
cluster_send_packet(receiver, targetix, buf, len);
}
}
}
static void cluster_send_keep_alive(void)
{
struct mtp_event event;
int receiverix, targetix;
event.typ = MTP_EVENT_ALIVE;
event.len = 0;
event.seq_no = sequence_number++;
for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {
for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) {
int tdiff = timediff_msec(now, receiver_stats[receiverix].target[targetix].lasttry);
if (tdiff > CLUSTER_KEEP_ALIVE_INTERVAL)
cluster_send_packet(&this_host->receivers[receiverix], targetix, (char*) &event, sizeof(event));
}
}
}
static int find_next_timeout(void)
{
int receiverix, targetix;
int maxwait = CLUSTER_KEEP_ALIVE_INTERVAL;
for (receiverix = 0; receiverix < this_host->n_receivers; receiverix++) {
for (targetix = 0; targetix < this_host->receivers[receiverix].n_targets; targetix++) {
if (receiver_stats[receiverix].target[targetix].connected) {
int tdiff = timediff_msec(now, receiver_stats[receiverix].target[targetix].lasttry);
int wait = CLUSTER_KEEP_ALIVE_INTERVAL - tdiff;
if (wait < maxwait)
maxwait = wait;
}
}
}
if (maxwait < 0)
maxwait = 0;
return maxwait;
}
static int cluster_receive_packet(int senderix, int fd)
{
int res;
int hostix = senders[senderix].hostix;
char buf[MTP_EVENT_MAX_SIZE];
struct mtp_event* event = (struct mtp_event*) &buf;
struct mtp_req* req = (struct mtp_req*) &buf;
int sz = sizeof(event->typ);
res = read(fd, buf, sz);
if (res <= 0) {
ast_log(LOG_NOTICE, "Could not read received packet: %s.\n", strerror(errno));
return -1;
}
else if (res == 0) {
ast_log(LOG_NOTICE, "Received 0 bytes, closing socket: %s.\n", strerror(errno));
shutdown(fd, SHUT_RDWR);
return -1;
}
if (event->typ < MTP_EVENT_ALIVE) {
res = read(fd, &buf[sz], sizeof(*req)-sz);
if (res > 0)
res = read(fd, req->buf, req->len);
// ast_log(LOG_DEBUG, "Received mtp req %d, buff len %d, res %d\n", req->typ, req->len, res);
}
else {
res = read(fd, &buf[sz], sizeof(*event)-sz);
if (res > 0)
res = read(fd, event->buf, event->len);
}
if (host_last_seq_no[hostix] >= event->seq_no) {
return 0;
}
host_last_seq_no[hostix] = event->seq_no;
if (res > 0) {
ast_log(LOG_DEBUG, "Received event, senderix=%d, hostix=%d, lastseq=%ld, seqno=%ld, typ=%d\n", senderix, hostix, host_last_seq_no[hostix], event->seq_no, event->typ);
if ((event->typ == MTP_EVENT_ISUP) || (event->typ == MTP_REQ_ISUP_FORWARD)) {
if (isup_event_handler)
(*isup_event_handler)(event);
}
}
if (res < 0)
ast_log(LOG_NOTICE, "Could not read received packet: %s.\n", strerror(errno));
return res;
}
static void *cluster_thread_main(void *data)
{
int i, j;
int res;
fds[0].fd = receivepipe[0];
fds[0].events = POLLIN;
fds_type[0] = FD_PIPE;
fds[1].fd = receiver_socket;
if (receiver_socket > 0)
fds[1].events = POLLIN;
else
fds[1].events = 0;
fds_type[1] = FD_LISTEN;
ast_verbose(VERBOSE_PREFIX_3 "Starting cluster thread, pid=%d.\n", getpid());
while (cluster_running) {
int timeout;
int maxtimeout;
gettimeofday(&now, NULL);
timeout = ast_sched_wait(cluster_sched);
maxtimeout = find_next_timeout();
if(timeout <= 0 || timeout > CLUSTER_WAKEUP_INTERVAL) {
timeout = CLUSTER_WAKEUP_INTERVAL;
}
if (timeout > maxtimeout)
timeout = maxtimeout;
if (rebuild_fds) {
n_fds = 2;
for (i = 0; i < n_accepted; i++) {
fds[n_fds].fd = accepted[i].fd;
fds[n_fds].events = POLLIN|POLLERR|POLLHUP;
fds_type[n_fds++] = FD_ACCEPTED;
}
for (i = 0; i < this_host->n_receivers; i++) {
for (j = 0; j < this_host->receivers[i].n_targets; j++) {
fds_receivers[n_fds] = this_host->receivers[i];
fds_targetix[n_fds] = j;
if (receiver_stats[i].target[j].connected) {
fds[n_fds].fd = receiver_stats[i].target[j].fd;
fds[n_fds].events = POLLERR|POLLHUP;
fds_type[n_fds++] = FD_RECEIVER;
}
else if (receiver_stats[i].target[j].inprogress) {
fds[n_fds].fd = receiver_stats[i].target[j].fd;
fds[n_fds].events = POLLOUT|POLLERR|POLLHUP;
fds_type[n_fds++] = FD_INPROGRESS;
}
}
}
rebuild_fds = 0;
}
res = poll(fds, n_fds, timeout);
gettimeofday(&now, NULL);
if(res < 0) {
if(errno == EINTR) {
/* Just try again. */
} else {
ast_log(LOG_ERROR, "poll() failure, errno=%d: %s\n", errno, strerror(errno));
}
} else if(res > 0) {
for (i = 0; i < n_fds; i++) {
if(!(fds[i].revents & (POLLERR|POLLNVAL|POLLHUP|POLLIN|POLLOUT)))
continue;
switch (fds_type[i]) {
case FD_PIPE: {
if(fds[i].revents & POLLIN) {
int linkix;
unsigned char fifobuf[1024];
struct mtp_req* req = (struct mtp_req*) &fifobuf;
res = read(fds[i].fd, &linkix, sizeof(linkix));
if (res < 0) {
ast_log(LOG_NOTICE, "Could not read cluster event pipe: %s.\n", strerror(errno));
continue;
}
if ((res = lffifo_get(receivebuf, fifobuf, sizeof(fifobuf))) != 0) {
if(res < 0) {
ast_log(LOG_ERROR, "Got oversize packet in cluster receive buffer.\n");
continue;
}
}
ast_log(LOG_DEBUG, "fifo get res %d, typ %d, linkix %d, link %s\n", res, req->typ, linkix, links[linkix].name);
if (res > 0) {
if ((req->typ == MTP_REQ_ISUP) || (req->typ == MTP_REQ_ISUP_FORWARD) || (req->typ == MTP_EVENT_ISUP)) {
if (links[linkix].receiver) {
cluster_send_packets(links[linkix].receiver, fifobuf, res);
}
else {
ast_log(LOG_WARNING, "No way to send packet to cluster, link='%s', reqtype=%d\n", links[linkix].name, req->typ);
}
}
}
}
break;
}
case FD_LISTEN: {
if(fds[i].revents & POLLIN) {
struct sockaddr_in from_addr;
unsigned int len = sizeof(struct sockaddr_in);
int afd = accept(receiver_socket, (struct sockaddr *)&from_addr, &len);
if (afd != -1) {
struct host* host = lookup_host_by_addr(from_addr.sin_addr);
if (host) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -