📄 cluster.c
字号:
int senderix = find_sender(host, from_addr.sin_addr);
if (senderix >= 0) {
set_sender_last(senderix, now);
accepted[n_accepted].fd = afd;
accepted[n_accepted].addr = from_addr.sin_addr;
accepted[n_accepted++].senderix = senderix;
rebuild_fds = 1;
host_last_seq_no[senders[senderix].hostix] = 0;
}
else {
ast_log(LOG_NOTICE, "Got socket connection from unexpected sender %s %s\n", host->name, inaddr2s(from_addr.sin_addr));
}
}
ast_log(LOG_NOTICE, "Accepted socket connection from %s, fd %d\n", host?host->name : "unknown", afd);
}
else {
ast_log(LOG_WARNING, "Accept of receiver connection failed: %s.\n", strerror(errno));
}
break;
}
}
case FD_ACCEPTED: {
int ix = i - 2;
int err = 0;
if(fds[i].revents & POLLIN) {
err = cluster_receive_packet(accepted[ix].senderix, fds[i].fd);
set_sender_last(accepted[ix].senderix, now);
}
if((err == -1) || (fds[i].revents & (POLLERR|POLLNVAL))) {
int error;
unsigned int len = sizeof(error);
getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &error, &len);
ast_log(LOG_NOTICE, "Got error on accepted socket: %d %s\n", i, strerror(error));
close(fds[i].fd);
for (j = ix; j < n_accepted-1; j++)
accepted[j] = accepted[j+1];
n_accepted--;
rebuild_fds = 1;
}
break;
}
case FD_RECEIVER:
case FD_INPROGRESS: {
struct receiver* receiver = &fds_receivers[i];
struct receiver_stat* receiver_stat = &receiver_stats[receiver->receiverix];
int targetix = fds_targetix[i];
char* host_name = receiver->targets[targetix].host->name;
char* if_name = receiver->targets[targetix].inf->name;
rebuild_fds = 1;
if(fds[i].revents & (POLLERR|POLLNVAL)) {
if (receiver_stat->target[targetix].reported++ % 100 == 0) {
int error;
unsigned int len = sizeof(error);
getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &error, &len);
ast_log(LOG_NOTICE, "Socket connection failed to host: %s, inf %s, addr %s, error: %s\n", host_name, if_name, inaddr2s(receiver->targets[targetix].inf->addr), strerror(error));
}
disconnect_receiver(receiver, targetix);
}
else if(fds[i].revents & (POLLHUP)) {
ast_log(LOG_NOTICE, "Lost connection to receiver host: %s, inf %s, addr %s\n", host_name, if_name, inaddr2s(receiver->targets[targetix].inf->addr));
disconnect_receiver(receiver, targetix);
}
else if(fds[i].revents & (POLLIN|POLLOUT)) {
ast_log(LOG_NOTICE, "Connected to receiver host: %s, inf %s, addr %s \n", host_name, if_name, inaddr2s(receiver->targets[targetix].inf->addr));
receiver_stat->target[targetix].connected = 1;
receiver_stat->target[targetix].inprogress = 0;
receiver_stat->target[targetix].reported = 0;
}
}
break;
}
}
}
cluster_send_keep_alive();
if (check_receiver_connections())
rebuild_fds = 1;
check_senders();
}
return NULL;
}
static void build_sender_list(void)
{
int hostix = 0;
struct host* host = NULL;
while ((host = lookup_host_by_id(hostix)) != NULL) {
if (host != this_host) {
int linkix, targetix;
for (linkix = 0; linkix < host->n_receivers; linkix++) {
for (targetix = 0; targetix < host->receivers[linkix].n_targets; targetix++) {
if (host->receivers[linkix].targets[targetix].host == this_host) {
int j;
for (j = 0; j < host->n_ifs; j++) {
add_sender(host, host->ifs[j].addr, hostix);
}
}
}
}
}
hostix++;
}
if (!n_senders) {
ast_log(LOG_DEBUG, "Found no senders to supervise\n");
}
}
static void wait_for_connections(void)
{
int cnt;
int linkix, targetix;
for (cnt = 0; cnt < 800; cnt++) {
int n = 0, c = 0;
for (linkix = 0; linkix < this_host->n_receivers; linkix++) {
for (targetix = 0; targetix < this_host->receivers[linkix].n_targets; targetix++) {
c += 1;
if (receiver_stats[linkix].target[targetix].connected)
n += 1;
}
}
if (cnt % 100 == 0)
ast_log(LOG_DEBUG, "wait %d %d %d %d\n", n, c, n_accepted, n_senders);
if ((n == c) && (n_accepted == n_senders))
break;
usleep(10*1000);
}
}
int cluster_init(void (*isup_event_handler_callback)(struct mtp_event*),
void (*isup_block_handler_callback)(struct link*))
{
int i, j;
int res;
int flags;
struct sched_param sp;
isup_event_handler = isup_event_handler_callback;
isup_block_handler = isup_block_handler_callback;
build_sender_list();
for (i = 0; i < this_host->n_receivers; i++) {
for (j = 0; j < this_host->receivers[i].n_targets; j++) {
receiver_stats[i].target[j].fd = -1;
receiver_stats[i].target[j].connected = 0;
receiver_stats[i].target[j].inprogress = 0;
receiver_stats[i].target[j].reported = 0;
}
}
for (i = 0; i < this_host->n_receivers; i++) {
for (j = 0; j < this_host->receivers[i].n_targets; j++) {
struct host* host = this_host->receivers[i].targets[j].host;
int l;
for (l = 0; l < host->n_spans; l++) {
struct link* link = host->spans[l].link;
if (link->schannel >= 0)
this_host->has_signalling_receivers = 1;
}
}
}
if (this_host->has_signalling_receivers)
if (setup_receiver_socket())
goto fail;
connect_receivers();
receivepipe[0] = receivepipe[1] = -1;
receivebuf = lffifo_alloc(200000);
res = pipe(receivepipe);
if(res < 0) {
ast_log(LOG_ERROR, "Unable to allocate cluster event pipe: %s.\n",
strerror(errno));
goto fail;
}
res = fcntl(receivepipe[0], F_GETFL);
if(res < 0) {
ast_log(LOG_ERROR, "Could not obtain flags for read end of "
"cluster event pipe: %s.\n", strerror(errno));
goto fail;
}
flags = res | O_NONBLOCK;
res = fcntl(receivepipe[0], F_SETFL, flags);
if(res < 0) {
ast_log(LOG_ERROR, "Could not set read end of cluster event pipe "
"non-blocking: %s.\n", strerror(errno));
goto fail;
}
res = fcntl(receivepipe[1], F_GETFL);
if(res < 0) {
ast_log(LOG_ERROR, "Could not obtain flags for write end of "
"cluster event pipe: %s.\n", strerror(errno));
goto fail;
}
flags = res | O_NONBLOCK;
res = fcntl(receivepipe[1], F_SETFL, flags);
if(res < 0) {
ast_log(LOG_ERROR, "Could not set write end of cluster event pipe "
"non-blocking: %s.\n", strerror(errno));
goto fail;
}
cluster_sched = sched_context_create();
if(cluster_sched == NULL) {
ast_log(LOG_ERROR, "Unable to create cluster scheduling context.\n");
goto fail;
}
cluster_running = 1; /* Otherwise there is a race, and
cluster may exit immediately */
if(ast_pthread_create(&cluster_thread, NULL, cluster_thread_main, NULL) < 0) {
ast_log(LOG_ERROR, "Unable to start cluster thread.\n");
cluster_running = 0;
goto fail;
}
memset(&sp, 0, sizeof(sp));
sp.sched_priority = 10;
res = pthread_setschedparam(cluster_thread, SCHED_RR, &sp);
if(res != 0) {
ast_log(LOG_WARNING, "Failed to set cluster thread to realtime priority: %s.\n",
strerror(res));
}
wait_for_connections();
return 0;
fail:
cluster_cleanup();
return -1;
}
void cluster_cleanup(void)
{
int i, j;
if(cluster_running) {
cluster_running = 0;
/* Monitor wakes up periodically, so no need to signal it explicitly. */
pthread_join(cluster_thread, NULL);
}
if(cluster_sched) {
sched_context_destroy(cluster_sched);
cluster_sched = NULL;
}
if(receivebuf) {
lffifo_free(receivebuf);
receivebuf = NULL;
}
if(receivepipe[0] != -1) {
close(receivepipe[0]);
receivepipe[0] = -1;
}
if(receivepipe[1] != -1) {
close(receivepipe[1]);
receivepipe[1] = -1;
}
if (receiver_socket != -1) {
shutdown(receiver_socket, SHUT_RDWR);
close(receiver_socket);
receiver_socket = -1;
}
for (i = 0; i < n_accepted; i++) {
shutdown(accepted[i].fd, SHUT_RDWR);
close(accepted[i].fd);
}
n_accepted = 0;
if (this_host) {
for (i = 0; i < this_host->n_receivers; i++) {
for (j = 0; j < this_host->receivers[i].n_targets; j++) {
if (receiver_stats[i].target[j].connected || receiver_stats[i].target[j].inprogress) {
shutdown(receiver_stats[i].target[j].fd, SHUT_RDWR);
close(receiver_stats[i].target[j].fd);
receiver_stats[i].target[j].connected = 0;
receiver_stats[i].target[j].inprogress = 0;
}
}
}
}
n_senders = 0;
}
int cmd_cluster_start(int fd, int argc, char *argv[])
{
if (!cluster_running)
return cluster_init(isup_event_handler, isup_block_handler);
return 0;
}
int cmd_cluster_stop(int fd, int argc, char *argv[])
{
if (cluster_running)
cluster_cleanup();
return 0;
}
int cmd_cluster_status(int fd, int argc, char *argv[])
{
int i;
int linkix, targetix;
gettimeofday(&now, NULL);
for (i = 0; i < n_senders; i++) {
int tdiff = timediff_msec(now, senders[i].last);
char* s = "";
switch (senders[i].state) {
case STATE_UNKNOWN:
s = "unknown"; tdiff = 0; break;
case STATE_ALIVE:
s = "alive"; break;
case STATE_DEAD:
s = "dead"; break;
}
ast_cli(fd, "sender %s, addr %s, state %s, last %d msec, up %d, down %d\n", senders[i].host->name, inaddr2s(senders[i].addr), s, tdiff, senders[i].up, senders[i].down);
}
for (linkix = 0; linkix < this_host->n_receivers; linkix++) {
for (targetix = 0; targetix < this_host->receivers[linkix].n_targets; targetix++) {
char* if_name = this_host->receivers[linkix].targets[targetix].inf->name;
char* host_name = this_host->receivers[linkix].targets[targetix].host->name;
struct in_addr addr = this_host->receivers[linkix].targets[targetix].inf->addr;
char* c = (receiver_stats[linkix].target[targetix].connected) ? "connected" : "";
char* p = (receiver_stats[linkix].target[targetix].inprogress) ? "inprogress" : "";
int tdiff = timediff_msec(now, receiver_stats[linkix].target[targetix].lasttry);
ast_cli(fd, "receiver %s if %s, addr %s, c:%s, p:%s, last try %d msec, %d fails, %lu forwards\n",
host_name, if_name, inaddr2s(addr), c, p,
tdiff, receiver_stats[linkix].target[targetix].fails,
receiver_stats[linkix].target[targetix].forwards);
}
}
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -