⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mbus.c

📁 jpeg and mpeg 编解码技术源代码
💻 C
📖 第 1 页 / 共 3 页
字号:
	mbus_validate(m);

	if (!mbus_waiting_ack(m)) {
		return;
	}

	mbus_msg_validate(curr);

	gettimeofday(&time, NULL);

	/* diff is time in milliseconds that the message has been awaiting an ACK */
	diff = ((time.tv_sec * 1000) + (time.tv_usec / 1000)) - ((curr->send_time.tv_sec * 1000) + (curr->send_time.tv_usec / 1000));
	if (diff > 10000) {
		debug_msg("Reliable mbus message failed!\n");
		if (m->err_handler == NULL) {
			abort();
		}
		m->err_handler(curr->seqnum, MBUS_MESSAGE_LOST);
		/* if we don't delete this failed message, the error handler
                   gets triggered every time we call mbus_retransmit */
		while (m->waiting_ack->num_cmds > 0) {
		    m->waiting_ack->num_cmds--;
		    xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]);
		    xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]);
		}
		xfree(m->waiting_ack->dest);
		xfree(m->waiting_ack);
		m->waiting_ack = NULL;
		return;
	} 
	/* Note: We only send one retransmission each time, to avoid
	 * overflowing the receiver with a burst of requests...
	 */
	if ((diff > 750) && (curr->retransmit_count == 2)) {
		resend(m, curr);
		return;
	} 
	if ((diff > 500) && (curr->retransmit_count == 1)) {
		resend(m, curr);
		return;
	} 
	if ((diff > 250) && (curr->retransmit_count == 0)) {
		resend(m, curr);
		return;
	}
}

void mbus_heartbeat(struct mbus *m, int interval)
{
	struct timeval	curr_time;
	char	*a = (char *) xmalloc(3);
	sprintf(a, "()");

	mbus_validate(m);

	gettimeofday(&curr_time, NULL);
	if (curr_time.tv_sec - m->last_heartbeat.tv_sec >= interval) {
		mb_header(++m->seqnum, (int) curr_time.tv_sec, 'U', m->addr, "()", -1);
		mb_add_command("mbus.hello", "");
		mb_send(m);

		m->last_heartbeat = curr_time;
		/* Remove dead sources */
		remove_inactiv_other_addr(m, curr_time, interval);
	}
	xfree(a);
}

int mbus_waiting_ack(struct mbus *m)
{
	mbus_validate(m);
	return m->waiting_ack != NULL;
}

int mbus_sent_all(struct mbus *m)
{
	mbus_validate(m);
	return (m->cmd_queue == NULL) && (m->waiting_ack == NULL);
}

struct mbus *mbus_init(void  (*cmd_handler)(char *src, char *cmd, char *arg, void *dat), 
		       void  (*err_handler)(int seqnum, int reason),
		       char  *addr)
{
	struct mbus		*m;
	struct mbus_key	 	 k;
	struct mbus_parser	*mp;
	int		 	 i;
	char            	*net_addr, *tmp;
	uint16_t         	 net_port;
	int              	 net_scope;

	m = (struct mbus *) xmalloc(sizeof(struct mbus));
	if (m == NULL) {
		debug_msg("Unable to allocate memory for mbus\n");
		return NULL;
	}

	m->cfg = mbus_create_config();
	mbus_lock_config_file(m->cfg);
	net_addr = (char *) xmalloc(20);
	mbus_get_net_addr(m->cfg, net_addr, &net_port, &net_scope);
	m->s		  = udp_init(net_addr, net_port, net_port, net_scope);
        if (m->s == NULL) {
                debug_msg("Unable to initialize mbus address\n");
                xfree(m);
                return NULL;
        }
	m->seqnum         = 0;
	m->cmd_handler    = cmd_handler;
	m->err_handler	  = err_handler;
	m->num_other_addr = 0;
	m->max_other_addr = 10;
	m->other_addr     = (char **) xmalloc(sizeof(char *) * 10);
	m->other_hello    = (struct timeval **) xmalloc(sizeof(struct timeval *) * 10);
	for (i = 0; i < 10; i++) {
		m->other_addr[i]  = NULL;
		m->other_hello[i] = NULL;
	}
	m->cmd_queue	  = NULL;
	m->waiting_ack	  = NULL;
	m->magic          = MBUS_MAGIC;
	m->index          = 0;
	m->index_sent     = 0;

	mp = mbus_parse_init(xstrdup(addr));
	if (!mbus_parse_lst(mp, &tmp)) {
		debug_msg("Invalid mbus address\n");
		abort();
	}
	m->addr = xstrdup(tmp);
	mbus_parse_done(mp);
	ASSERT(m->addr != NULL);

	gettimeofday(&(m->last_heartbeat), NULL);

	mbus_get_encrkey(m->cfg, &k);
	m->encrkey    = k.key;
	m->encrkeylen = k.key_len;

	mbus_get_hashkey(m->cfg, &k);
	m->hashkey    = k.key;
	m->hashkeylen = k.key_len;

	mbus_unlock_config_file(m->cfg);

	xfree(net_addr);

	return m;
}

void mbus_cmd_handler(struct mbus *m, void  (*cmd_handler)(char *src, char *cmd, char *arg, void *dat))
{
	mbus_validate(m);
	m->cmd_handler = cmd_handler;
}

static void mbus_flush_msgs(struct mbus_msg **queue)
{
        struct mbus_msg *curr, *next;
        int i;
	
        curr = *queue;
        while(curr) {
                next = curr->next;
                xfree(curr->dest);
                for(i = 0; i < curr->num_cmds; i++) {
                        xfree(curr->cmd_list[i]);
                        xfree(curr->arg_list[i]);
                }
		xfree(curr);
                curr = next;
        }
	*queue = NULL;
}

void mbus_exit(struct mbus *m) 
{
        int i;

        ASSERT(m != NULL);
	mbus_validate(m);

	mbus_qmsg(m, "()", "mbus.bye", "", FALSE);
	mbus_send(m);

	/* FIXME: It should be a fatal error to call mbus_exit() if some messages are still outstanding. */
	/*        We will need an mbus_flush() call first though, to ensure nothing is waiting.          */
        mbus_flush_msgs(&m->cmd_queue);
        mbus_flush_msgs(&m->waiting_ack);

        if (m->encrkey != NULL) {
                xfree(m->encrkey);
        }
        if (m->hashkey != NULL) {
        	xfree(m->hashkey);
	}

        udp_exit(m->s);

	/* Clean up other_* */
	for (i=m->num_other_addr-1; i>=0; i--){
	    remove_other_addr(m, m->other_addr[i]);
	}

        xfree(m->addr);
	xfree(m->other_addr);
	xfree(m->other_hello);
	xfree(m->cfg);
        xfree(m);
}

void mbus_send(struct mbus *m)
{
	/* Send one, or more, messages previosly queued with mbus_qmsg(). */
	/* Messages for the same destination are batched together. Stops  */
	/* when a reliable message is sent, until the ACK is received.    */
	struct mbus_msg	*curr = m->cmd_queue;
	int		 i;

	mbus_validate(m);
	if (m->waiting_ack != NULL) {
		return;
	}

	while (curr != NULL) {
		mbus_msg_validate(curr);
		/* It's okay for us to send messages which haven't been marked as complete - */
		/* that just means we're sending something which has the potential to have   */
		/* more data piggybacked. However, if it's not complete it MUST be the last  */
		/* in the list, or something has been reordered - which is bad.              */
		if (!curr->complete) {
			ASSERT(curr->next == NULL);
		}

		if (curr->reliable) {
		        if (!mbus_addr_valid(m, curr->dest)) {
			    debug_msg("Trying to send reliably to an unknown address...\n");
			    if (m->err_handler == NULL) {
				abort();
			    }
			    m->err_handler(curr->seqnum, MBUS_DESTINATION_UNKNOWN);
			}
		        if (!mbus_addr_unique(m, curr->dest)) {
			    debug_msg("Trying to send reliably but address is not unique...\n");
			    if (m->err_handler == NULL) {
				abort();
			    }
			    m->err_handler(curr->seqnum, MBUS_DESTINATION_NOT_UNIQUE);
			}
		}
		/* Create the message... */
		mb_header(curr->seqnum, curr->comp_time.tv_sec, (char)(curr->reliable?'R':'U'), m->addr, curr->dest, -1);
		for (i = 0; i < curr->num_cmds; i++) {
			ASSERT(m->index_sent == (curr->idx_list[i] - 1));
			m->index_sent = curr->idx_list[i];
			mb_add_command(curr->cmd_list[i], curr->arg_list[i]);
		}
		mb_send(m);
		
		m->cmd_queue = curr->next;
		if (curr->reliable) {
			/* Reliable message, wait for the ack... */
			gettimeofday(&(curr->send_time), NULL);
			m->waiting_ack = curr;
			curr->next = NULL;
			return;
		} else {
			while (curr->num_cmds > 0) {
				curr->num_cmds--;
				xfree(curr->cmd_list[curr->num_cmds]); curr->cmd_list[curr->num_cmds] = NULL;
				xfree(curr->arg_list[curr->num_cmds]); curr->arg_list[curr->num_cmds] = NULL;
			}
			xfree(curr->dest);
			xfree(curr);
		}
		curr = m->cmd_queue;
	}
}

void mbus_qmsg(struct mbus *m, const char *dest, const char *cmnd, const char *args, int reliable)
{
	/* Queue up a message for sending. The message is not */
	/* actually sent until mbus_send() is called.         */
	struct mbus_msg	*curr = m->cmd_queue;
	struct mbus_msg	*prev = NULL;
	int		 alen = strlen(cmnd) + strlen(args) + 4;
	int		 i;

	mbus_validate(m);
	while (curr != NULL) {
		mbus_msg_validate(curr);
		if (!curr->complete) {
			/* This message is still open for new commands. It MUST be the last in the */
			/* cmd_queue, else commands will be reordered.                             */
			ASSERT(curr->next == NULL);
			if (mbus_addr_identical(curr->dest, dest) &&
		            (curr->num_cmds < MBUS_MAX_QLEN) && ((curr->message_size + alen) < (MBUS_BUF_SIZE - 500))) {
				curr->num_cmds++;
				curr->reliable |= reliable;
				curr->cmd_list[curr->num_cmds-1] = xstrdup(cmnd);
				curr->arg_list[curr->num_cmds-1] = xstrdup(args);
				curr->idx_list[curr->num_cmds-1] = ++(m->index);
				curr->message_size += alen;
				mbus_msg_validate(curr);
				return;
			} else {
				curr->complete = TRUE;
			}
		}
		prev = curr;
		curr = curr->next;
	}
	/* If we get here, we've not found an open message in the cmd_queue.  We */
	/* have to create a new message, and add it to the end of the cmd_queue. */
	curr = (struct mbus_msg *) xmalloc(sizeof(struct mbus_msg));
	curr->magic            = MBUS_MSG_MAGIC;
	curr->next             = NULL;
	curr->dest             = xstrdup(dest);
	curr->retransmit_count = 0;
	curr->message_size     = alen + 60 + strlen(dest) + strlen(m->addr);
	curr->seqnum           = ++m->seqnum;
	curr->reliable         = reliable;
	curr->complete         = FALSE;
	curr->num_cmds         = 1;
	curr->cmd_list[0]      = xstrdup(cmnd);
	curr->arg_list[0]      = xstrdup(args);
	curr->idx_list[curr->num_cmds-1] = ++(m->index);
	for (i = 1; i < MBUS_MAX_QLEN; i++) {
		curr->cmd_list[i] = NULL;
		curr->arg_list[i] = NULL;
	}
	if (prev == NULL) {
		m->cmd_queue = curr;
	} else {
		prev->next = curr;
	}
	gettimeofday(&(curr->send_time), NULL);
	gettimeofday(&(curr->comp_time), NULL);
	mbus_msg_validate(curr);
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -