📄 mbus.c
字号:
curr = next; } *queue = NULL;}void mbus_exit(struct mbus *m) { int i; ASSERT(m != NULL); mbus_validate(m); mbus_qmsg(m, "()", "mbus.bye", "", FALSE); mbus_send(m); /* FIXME: It should be a fatal error to call mbus_exit() if some messages are still outstanding. */ /* We will need an mbus_flush() call first though, to ensure nothing is waiting. */ mbus_flush_msgs(&m->cmd_queue); mbus_flush_msgs(&m->waiting_ack); if (m->encrkey != NULL) { xfree(m->encrkey); } if (m->hashkey != NULL) { xfree(m->hashkey); } udp_exit(m->s); /* Clean up other_* */ for (i=m->num_other_addr-1; i>=0; i--){ remove_other_addr(m, m->other_addr[i]); } xfree(m->addr); xfree(m->other_addr); xfree(m->other_hello); xfree(m->cfg); xfree(m);}void mbus_send(struct mbus *m){ /* Send one, or more, messages previosly queued with mbus_qmsg(). */ /* Messages for the same destination are batched together. Stops */ /* when a reliable message is sent, until the ACK is received. */ struct mbus_msg *curr = m->cmd_queue; int i; mbus_validate(m); if (m->waiting_ack != NULL) { return; } while (curr != NULL) { mbus_msg_validate(curr); /* It's okay for us to send messages which haven't been marked as complete - */ /* that just means we're sending something which has the potential to have */ /* more data piggybacked. However, if it's not complete it MUST be the last */ /* in the list, or something has been reordered - which is bad. */ if (!curr->complete) { ASSERT(curr->next == NULL); } if (curr->reliable) { if (!mbus_addr_valid(m, curr->dest)) { debug_msg("Trying to send reliably to an unknown address...\n"); if (m->err_handler == NULL) { abort(); } m->err_handler(curr->seqnum, MBUS_DESTINATION_UNKNOWN); } if (!mbus_addr_unique(m, curr->dest)) { debug_msg("Trying to send reliably but address is not unique...\n"); if (m->err_handler == NULL) { abort(); } m->err_handler(curr->seqnum, MBUS_DESTINATION_NOT_UNIQUE); } } /* Create the message... */ mb_header(curr->seqnum, curr->comp_time.tv_sec, (char)(curr->reliable?'R':'U'), m->addr, curr->dest, -1); for (i = 0; i < curr->num_cmds; i++) { ASSERT(m->index_sent == (curr->idx_list[i] - 1)); m->index_sent = curr->idx_list[i]; mb_add_command(curr->cmd_list[i], curr->arg_list[i]); } mb_send(m); m->cmd_queue = curr->next; if (curr->reliable) { /* Reliable message, wait for the ack... */ gettimeofday(&(curr->send_time), NULL); m->waiting_ack = curr; curr->next = NULL; return; } else { while (curr->num_cmds > 0) { curr->num_cmds--; xfree(curr->cmd_list[curr->num_cmds]); curr->cmd_list[curr->num_cmds] = NULL; xfree(curr->arg_list[curr->num_cmds]); curr->arg_list[curr->num_cmds] = NULL; } xfree(curr->dest); xfree(curr); } curr = m->cmd_queue; }}void mbus_qmsg(struct mbus *m, const char *dest, const char *cmnd, const char *args, int reliable){ /* Queue up a message for sending. The message is not */ /* actually sent until mbus_send() is called. */ struct mbus_msg *curr = m->cmd_queue; struct mbus_msg *prev = NULL; int alen = strlen(cmnd) + strlen(args) + 4; int i; mbus_validate(m); while (curr != NULL) { mbus_msg_validate(curr); if (!curr->complete) { /* This message is still open for new commands. It MUST be the last in the */ /* cmd_queue, else commands will be reordered. */ ASSERT(curr->next == NULL); if (mbus_addr_identical(curr->dest, dest) && (curr->num_cmds < MBUS_MAX_QLEN) && ((curr->message_size + alen) < (MBUS_BUF_SIZE - 500))) { curr->num_cmds++; curr->reliable |= reliable; curr->cmd_list[curr->num_cmds-1] = xstrdup(cmnd); curr->arg_list[curr->num_cmds-1] = xstrdup(args); curr->idx_list[curr->num_cmds-1] = ++(m->index); curr->message_size += alen; mbus_msg_validate(curr); return; } else { curr->complete = TRUE; } } prev = curr; curr = curr->next; } /* If we get here, we've not found an open message in the cmd_queue. We */ /* have to create a new message, and add it to the end of the cmd_queue. */ curr = (struct mbus_msg *) xmalloc(sizeof(struct mbus_msg)); curr->magic = MBUS_MSG_MAGIC; curr->next = NULL; curr->dest = xstrdup(dest); curr->retransmit_count = 0; curr->message_size = alen + 60 + strlen(dest) + strlen(m->addr); curr->seqnum = ++m->seqnum; curr->reliable = reliable; curr->complete = FALSE; curr->num_cmds = 1; curr->cmd_list[0] = xstrdup(cmnd); curr->arg_list[0] = xstrdup(args); curr->idx_list[curr->num_cmds-1] = ++(m->index); for (i = 1; i < MBUS_MAX_QLEN; i++) { curr->cmd_list[i] = NULL; curr->arg_list[i] = NULL; } if (prev == NULL) { m->cmd_queue = curr; } else { prev->next = curr; } gettimeofday(&(curr->send_time), NULL); gettimeofday(&(curr->comp_time), NULL); mbus_msg_validate(curr);}#define mbus_qmsgf(m, dest, reliable, cmnd, format, var) \{ \char buffer[MBUS_BUF_SIZE]; \mbus_validate(m); \snprintf(buffer, MBUS_BUF_SIZE, format, var); \mbus_qmsg(m, dest, cmnd, buffer, reliable); \}#if 0void mbus_qmsgf(struct mbus *m, const char *dest, int reliable, const char *cmnd, const char *format, ...){ /* This is a wrapper around mbus_qmsg() which does a printf() style format into */ /* a buffer. Saves the caller from having to a a malloc(), write the args string */ /* and then do a free(), and also saves worring about overflowing the buffer, so */ /* removing a common source of bugs! */ char buffer[MBUS_BUF_SIZE]; va_list ap; mbus_validate(m); va_start(ap, format);#ifdef WIN32 _vsnprintf(buffer, MBUS_BUF_SIZE, format, ap);#else vsnprintf(buffer, MBUS_BUF_SIZE, format, ap);#endif va_end(ap); mbus_qmsg(m, dest, cmnd, buffer, reliable);}#endifint mbus_recv(struct mbus *m, void *data, struct timeval *timeout){ char *auth, *ver, *src, *dst, *ack, *r, *cmd, *param, *npos; char buffer[MBUS_BUF_SIZE]; int buffer_len, seq, a, rx, ts, authlen, loop_count; char ackbuf[MBUS_ACK_BUF_SIZE]; char digest[16]; unsigned char initVec[8] = {0,0,0,0,0,0,0,0}; struct timeval t; struct mbus_parser *mp, *mp2; mbus_validate(m); rx = FALSE; loop_count = 0; while (loop_count++ < 10) { memset(buffer, 0, MBUS_BUF_SIZE); ASSERT(m->s != NULL); udp_fd_zero(); udp_fd_set(m->s); t.tv_sec = timeout->tv_sec; t.tv_usec = timeout->tv_usec; if ((udp_select(&t) > 0) && udp_fd_isset(m->s)) { buffer_len = udp_recv(m->s, buffer, MBUS_BUF_SIZE); if (buffer_len > 0) { rx = TRUE; } else { return rx; } } else { return FALSE; } if (m->encrkey != NULL) { /* Decrypt the message... */ if ((buffer_len % 8) != 0) { debug_msg("Encrypted message not a multiple of 8 bytes in length\n"); continue; } memcpy(mb_cryptbuf, buffer, buffer_len); memset(initVec, 0, 8); qfDES_CBC_d(m->encrkey, mb_cryptbuf, buffer_len, initVec); memcpy(buffer, mb_cryptbuf, buffer_len); } /* Sanity check that this is a vaguely sensible format message... Should prevent */ /* problems if we're fed complete garbage, but won't prevent determined hackers. */ if (strncmp(buffer + MBUS_AUTH_LEN + 1, "mbus/1.0", 8) != 0) { continue; } mp = mbus_parse_init(buffer); /* remove trailing 0 bytes */ npos = (char *) strchr(buffer,'\0'); if(npos!=NULL) { buffer_len=npos-buffer; } /* Parse the authentication header */ if (!mbus_parse_sym(mp, &auth)) { debug_msg("Failed to parse authentication header\n"); mbus_parse_done(mp); continue; } /* Check that the packet authenticates correctly... */ authlen = strlen(auth); hmac_md5(buffer + authlen + 1, buffer_len - authlen - 1, m->hashkey, m->hashkeylen, digest); base64encode(digest, 12, ackbuf, 16); if ((strlen(auth) != 16) || (strncmp(auth, ackbuf, 16) != 0)) { debug_msg("Failed to authenticate message...\n"); mbus_parse_done(mp); continue; } /* Parse the header */ if (!mbus_parse_sym(mp, &ver)) { mbus_parse_done(mp); debug_msg("Parser failed version (1): %s\n",ver); continue; } if (strcmp(ver, "mbus/1.0") != 0) { mbus_parse_done(mp); debug_msg("Parser failed version (2): %s\n",ver); continue; } if (!mbus_parse_int(mp, &seq)) { mbus_parse_done(mp); debug_msg("Parser failed seq\n"); continue; } if (!mbus_parse_int(mp, &ts)) { mbus_parse_done(mp); debug_msg("Parser failed ts\n"); continue; } if (!mbus_parse_sym(mp, &r)) { mbus_parse_done(mp); debug_msg("Parser failed reliable\n"); continue; } if (!mbus_parse_lst(mp, &src)) { mbus_parse_done(mp); debug_msg("Parser failed src\n"); continue; } if (!mbus_parse_lst(mp, &dst)) { mbus_parse_done(mp); debug_msg("Parser failed dst\n"); continue; } if (!mbus_parse_lst(mp, &ack)) { mbus_parse_done(mp); debug_msg("Parser failed ack\n"); continue; } store_other_addr(m, src); /* Check if the message was addressed to us... */ if (mbus_addr_match(m->addr, dst)) { /* ...if so, process any ACKs received... */ mp2 = mbus_parse_init(ack); while (mbus_parse_int(mp2, &a)) { if (mbus_waiting_ack(m)) { if (m->waiting_ack->seqnum == a) { while (m->waiting_ack->num_cmds > 0) { m->waiting_ack->num_cmds--; xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]); xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]); } xfree(m->waiting_ack->dest); xfree(m->waiting_ack); m->waiting_ack = NULL; } else { debug_msg("Got ACK %d but wanted %d\n", a, m->waiting_ack->seqnum); } } else { debug_msg("Got ACK %d but wasn't expecting it\n", a); } } mbus_parse_done(mp2); /* ...if an ACK was requested, send one... */ if (strcmp(r, "R") == 0) { char *newsrc = (char *) xmalloc(strlen(src) + 3); struct timeval t; sprintf(newsrc, "(%s)", src); /* Yes, this is a kludge. */ gettimeofday(&t, NULL); mb_header(++m->seqnum, (int) t.tv_sec, 'U', m->addr, newsrc, seq); mb_send(m); xfree(newsrc); } else if (strcmp(r, "U") == 0) { /* Unreliable message.... not need to do anything */ } else { debug_msg("Message with invalid reliability field \"%s\" ignored\n", r); } /* ...and process the commands contained in the message */ while (mbus_parse_sym(mp, &cmd)) { if (mbus_parse_lst(mp, ¶m)) { char *newsrc = (char *) xmalloc(strlen(src) + 3); sprintf(newsrc, "(%s)", src); /* Yes, this is a kludge. */ /* Finally, we snoop on the message we just passed to the application, */ /* to do housekeeping of our list of known mbus sources... */ if (strcmp(cmd, "mbus.bye") == 0) { remove_other_addr(m, newsrc); } if (strcmp(cmd, "mbus.hello") == 0) { /* Mark this source as activ. We remove dead sources in mbus_heartbeat */ store_other_addr(m, newsrc); } m->cmd_handler(newsrc, cmd, param, data); xfree(newsrc); } else { debug_msg("Unable to parse mbus command:\n"); debug_msg("cmd = %s\n", cmd); debug_msg("arg = %s\n", param); break; } } } mbus_parse_done(mp); } return rx;}#define RZ_HANDLE_WAITING 1#define RZ_HANDLE_GO 2struct mbus_rz { char *peer; char *token; struct mbus *m; void *data; int mode; void (*cmd_handler)(char *src, char *cmd, char *args, void *data);};static void rz_handler(char *src, char *cmd, char *args, void *data){ struct mbus_rz *r = (struct mbus_rz *) data; struct mbus_parser *mp; if ((r->mode == RZ_HANDLE_WAITING) && (strcmp(cmd, "mbus.waiting") == 0)) { char *t; mp = mbus_parse_init(args); mbus_parse_str(mp, &t); if (strcmp(mbus_decode_str(t), r->token) == 0) { if (r->peer != NULL) xfree(r->peer); r->peer = xstrdup(src); } mbus_parse_done(mp); } else if ((r->mode == RZ_HANDLE_GO) && (strcmp(cmd, "mbus.go") == 0)) { char *t; mp = mbus_parse_init(args); mbus_parse_str(mp, &t); if (strcmp(mbus_decode_str(t), r->token) == 0) { if (r->peer != NULL) xfree(r->peer); r->peer = xstrdup(src); } mbus_parse_done(mp); } else { r->cmd_handler(src, cmd, args, r->data); }}char *mbus_rendezvous_waiting(struct mbus *m, char *addr, char *token, void *data){ /* Loop, sending mbus.waiting(token) to "addr", until we get mbus.go(token) */ /* back from our peer. Any other mbus commands received whilst waiting are */ /* processed in the normal manner, as if mbus_recv() had been called. */ char *token_e, *peer; struct timeval timeout; struct mbus_rz *r; mbus_validate(m); r = (struct mbus_rz *) xmalloc(sizeof(struct mbus_rz)); r->peer = NULL; r->token = token; r->m = m; r->data = data; r->mode = RZ_HANDLE_GO; r->cmd_handler = m->cmd_handler; m->cmd_handler = rz_handler; token_e = mbus_encode_str(token); while (r->peer == NULL) { timeout.tv_sec = 0; timeout.tv_usec = 100000; mbus_heartbeat(m, 1); mbus_qmsgf(m, addr, FALSE, "mbus.waiting", "%s", token_e); mbus_send(m); mbus_recv(m, r, &timeout); mbus_retransmit(m); } m->cmd_handler = r->cmd_handler; peer = r->peer; xfree(r); xfree(token_e); return peer;}char *mbus_rendezvous_go(struct mbus *m, char *token, void *data){ /* Wait until we receive mbus.waiting(token), then send mbus.go(token) back to */ /* the sender of that message. Whilst waiting, other mbus commands are processed */ /* in the normal manner as if mbus_recv() had been called. */ char *token_e, *peer; struct timeval timeout; struct mbus_rz *r; mbus_validate(m); r = (struct mbus_rz *) xmalloc(sizeof(struct mbus_rz)); r->peer = NULL; r->token = token; r->m = m; r->data = data; r->mode = RZ_HANDLE_WAITING; r->cmd_handler = m->cmd_handler; m->cmd_handler = rz_handler; token_e = mbus_encode_str(token); while (r->peer == NULL) { timeout.tv_sec = 0; timeout.tv_usec = 100000; mbus_heartbeat(m, 1); mbus_send(m); mbus_recv(m, r, &timeout); mbus_retransmit(m); } mbus_qmsgf(m, r->peer, TRUE, "mbus.go", "%s", token_e); do { mbus_heartbeat(m, 1); mbus_retransmit(m); mbus_send(m); timeout.tv_sec = 0; timeout.tv_usec = 100000; mbus_recv(m, r, &timeout); } while (!mbus_sent_all(m)); m->cmd_handler = r->cmd_handler; peer = r->peer; xfree(r); xfree(token_e); return peer;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -