📄 mbus.c
字号:
mb_send(m); curr->retransmit_count++;}void mbus_retransmit(struct mbus *m){ struct mbus_msg *curr = m->waiting_ack; struct timeval time; long diff; mbus_validate(m); if (!mbus_waiting_ack(m)) { return; } mbus_msg_validate(curr); gettimeofday(&time, NULL); /* diff is time in milliseconds that the message has been awaiting an ACK */ diff = ((time.tv_sec * 1000) + (time.tv_usec / 1000)) - ((curr->send_time.tv_sec * 1000) + (curr->send_time.tv_usec / 1000)); if (diff > 10000) { debug_msg("Reliable mbus message failed!\n"); if (m->err_handler == NULL) { abort(); } m->err_handler(curr->seqnum, MBUS_MESSAGE_LOST); /* if we don't delete this failed message, the error handler gets triggered every time we call mbus_retransmit */ while (m->waiting_ack->num_cmds > 0) { m->waiting_ack->num_cmds--; xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]); xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]); } xfree(m->waiting_ack->dest); xfree(m->waiting_ack); m->waiting_ack = NULL; return; } /* Note: We only send one retransmission each time, to avoid * overflowing the receiver with a burst of requests... */ if ((diff > 750) && (curr->retransmit_count == 2)) { resend(m, curr); return; } if ((diff > 500) && (curr->retransmit_count == 1)) { resend(m, curr); return; } if ((diff > 250) && (curr->retransmit_count == 0)) { resend(m, curr); return; }}void mbus_heartbeat(struct mbus *m, int interval){ struct timeval curr_time; char *a = (char *) xmalloc(3); sprintf(a, "()"); mbus_validate(m); gettimeofday(&curr_time, NULL); if (curr_time.tv_sec - m->last_heartbeat.tv_sec >= interval) { mb_header(++m->seqnum, curr_time, 'U', m->addr, "()", -1); mb_add_command("mbus.hello", ""); mb_send(m); m->last_heartbeat = curr_time; /* Remove dead sources */ remove_inactiv_other_addr(m, curr_time, interval); } xfree(a);}int mbus_waiting_ack(struct mbus *m){ mbus_validate(m); return m->waiting_ack != NULL;}int mbus_sent_all(struct mbus *m){ mbus_validate(m); return (m->cmd_queue == NULL) && (m->waiting_ack == NULL);}struct mbus *mbus_init(void (*cmd_handler)(char *src, char *cmd, char *arg, void *dat), void (*err_handler)(int seqnum, int reason), char *addr){ struct mbus *m; struct mbus_key k; struct mbus_parser *mp; int i; char *net_addr, *tmp; uint16_t net_port; int net_scope; m = (struct mbus *) xmalloc(sizeof(struct mbus)); if (m == NULL) { debug_msg("Unable to allocate memory for mbus\n"); return NULL; } m->cfg = mbus_create_config(); mbus_lock_config_file(m->cfg); net_addr = (char *) xmalloc(20); mbus_get_net_addr(m->cfg, net_addr, &net_port, &net_scope); m->s = udp_init(net_addr, net_port, net_port, net_scope); if (m->s == NULL) { debug_msg("Unable to initialize mbus address\n"); xfree(m); return NULL; } m->seqnum = 0; m->cmd_handler = cmd_handler; m->err_handler = err_handler; m->num_other_addr = 0; m->max_other_addr = 10; m->other_addr = (char **) xmalloc(sizeof(char *) * 10); m->other_hello = (struct timeval **) xmalloc(sizeof(struct timeval *) * 10); for (i = 0; i < 10; i++) { m->other_addr[i] = NULL; m->other_hello[i] = NULL; } m->cmd_queue = NULL; m->waiting_ack = NULL; m->magic = MBUS_MAGIC; m->index = 0; m->index_sent = 0; mp = mbus_parse_init(xstrdup(addr)); if (!mbus_parse_lst(mp, &tmp)) { debug_msg("Invalid mbus address\n"); abort(); } m->addr = xstrdup(tmp); mbus_parse_done(mp); assert(m->addr != NULL); gettimeofday(&(m->last_heartbeat), NULL); mbus_get_encrkey(m->cfg, &k); m->encrkey = k.key; m->encrkeylen = k.key_len; mbus_get_hashkey(m->cfg, &k); m->hashkey = k.key; m->hashkeylen = k.key_len; mbus_unlock_config_file(m->cfg); xfree(net_addr); return m;}void mbus_cmd_handler(struct mbus *m, void (*cmd_handler)(char *src, char *cmd, char *arg, void *dat)){ mbus_validate(m); m->cmd_handler = cmd_handler;}static void mbus_flush_msgs(struct mbus_msg **queue){ struct mbus_msg *curr, *next; int i; curr = *queue; while(curr) { next = curr->next; xfree(curr->dest); for(i = 0; i < curr->num_cmds; i++) { xfree(curr->cmd_list[i]); xfree(curr->arg_list[i]); } xfree(curr); curr = next; } *queue = NULL;}void mbus_exit(struct mbus *m) { int i; assert(m != NULL); mbus_validate(m); mbus_qmsg(m, "()", "mbus.bye", "", FALSE); mbus_send(m); /* FIXME: It should be a fatal error to call mbus_exit() if some messages are still outstanding. */ /* We will need an mbus_flush() call first though, to ensure nothing is waiting. */ mbus_flush_msgs(&m->cmd_queue); mbus_flush_msgs(&m->waiting_ack); if (m->encrkey != NULL) { xfree(m->encrkey); } if (m->hashkey != NULL) { xfree(m->hashkey); } udp_exit(m->s); /* Clean up other_* */ for (i=m->num_other_addr-1; i>=0; i--){ remove_other_addr(m, m->other_addr[i]); } xfree(m->addr); xfree(m->other_addr); xfree(m->other_hello); xfree(m->cfg); xfree(m);}void mbus_send(struct mbus *m){ /* Send one, or more, messages previosly queued with mbus_qmsg(). */ /* Messages for the same destination are batched together. Stops */ /* when a reliable message is sent, until the ACK is received. */ struct mbus_msg *curr = m->cmd_queue; int i; mbus_validate(m); if (m->waiting_ack != NULL) { return; } while (curr != NULL) { mbus_msg_validate(curr); /* It's okay for us to send messages which haven't been marked as complete - */ /* that just means we're sending something which has the potential to have */ /* more data piggybacked. However, if it's not complete it MUST be the last */ /* in the list, or something has been reordered - which is bad. */ if (!curr->complete) { assert(curr->next == NULL); } if (curr->reliable) { if (!mbus_addr_valid(m, curr->dest)) { debug_msg("Trying to send reliably to an unknown address...\n"); if (m->err_handler == NULL) { abort(); } m->err_handler(curr->seqnum, MBUS_DESTINATION_UNKNOWN); } if (!mbus_addr_unique(m, curr->dest)) { debug_msg("Trying to send reliably but address is not unique...\n"); if (m->err_handler == NULL) { abort(); } m->err_handler(curr->seqnum, MBUS_DESTINATION_NOT_UNIQUE); } } /* Create the message... */ mb_header(curr->seqnum, curr->comp_time, (char)(curr->reliable?'R':'U'), m->addr, curr->dest, -1); for (i = 0; i < curr->num_cmds; i++) { assert(m->index_sent == (curr->idx_list[i] - 1)); m->index_sent = curr->idx_list[i]; mb_add_command(curr->cmd_list[i], curr->arg_list[i]); } mb_send(m); m->cmd_queue = curr->next; if (curr->reliable) { /* Reliable message, wait for the ack... */ gettimeofday(&(curr->send_time), NULL); m->waiting_ack = curr; curr->next = NULL; return; } else { while (curr->num_cmds > 0) { curr->num_cmds--; xfree(curr->cmd_list[curr->num_cmds]); curr->cmd_list[curr->num_cmds] = NULL; xfree(curr->arg_list[curr->num_cmds]); curr->arg_list[curr->num_cmds] = NULL; } xfree(curr->dest); xfree(curr); } curr = m->cmd_queue; }}void mbus_qmsg(struct mbus *m, const char *dest, const char *cmnd, const char *args, int reliable){ /* Queue up a message for sending. The message is not */ /* actually sent until mbus_send() is called. */ struct mbus_msg *curr = m->cmd_queue; struct mbus_msg *prev = NULL; int alen = strlen(cmnd) + strlen(args) + 4; int i; mbus_validate(m); while (curr != NULL) { mbus_msg_validate(curr); if (!curr->complete) { /* This message is still open for new commands. It MUST be the last in the */ /* cmd_queue, else commands will be reordered. */ assert(curr->next == NULL); if (mbus_addr_identical(curr->dest, dest) && (curr->num_cmds < MBUS_MAX_QLEN) && ((curr->message_size + alen) < (MBUS_BUF_SIZE - 500))) { curr->num_cmds++; curr->reliable |= reliable; curr->cmd_list[curr->num_cmds-1] = xstrdup(cmnd); curr->arg_list[curr->num_cmds-1] = xstrdup(args); curr->idx_list[curr->num_cmds-1] = ++(m->index); curr->message_size += alen; mbus_msg_validate(curr); return; } else { curr->complete = TRUE; } } prev = curr; curr = curr->next; } /* If we get here, we've not found an open message in the cmd_queue. We */ /* have to create a new message, and add it to the end of the cmd_queue. */ curr = (struct mbus_msg *) xmalloc(sizeof(struct mbus_msg)); curr->magic = MBUS_MSG_MAGIC; curr->next = NULL; curr->dest = xstrdup(dest); curr->retransmit_count = 0; curr->message_size = alen + 60 + strlen(dest) + strlen(m->addr); curr->seqnum = ++m->seqnum; curr->reliable = reliable; curr->complete = FALSE; curr->num_cmds = 1; curr->cmd_list[0] = xstrdup(cmnd); curr->arg_list[0] = xstrdup(args); curr->idx_list[curr->num_cmds-1] = ++(m->index); for (i = 1; i < MBUS_MAX_QLEN; i++) { curr->cmd_list[i] = NULL; curr->arg_list[i] = NULL; } if (prev == NULL) { m->cmd_queue = curr; } else { prev->next = curr; } gettimeofday(&(curr->send_time), NULL); gettimeofday(&(curr->comp_time), NULL); mbus_msg_validate(curr);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -