⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mbus.c

📁 linux系统下的音频通信
💻 C
📖 第 1 页 / 共 3 页
字号:
	mb_send(m);	curr->retransmit_count++;}void mbus_retransmit(struct mbus *m){	struct mbus_msg	*curr = m->waiting_ack;	struct timeval	time;	long		diff;	mbus_validate(m);	if (!mbus_waiting_ack(m)) {		return;	}	mbus_msg_validate(curr);	gettimeofday(&time, NULL);	/* diff is time in milliseconds that the message has been awaiting an ACK */	diff = ((time.tv_sec * 1000) + (time.tv_usec / 1000)) - ((curr->send_time.tv_sec * 1000) + (curr->send_time.tv_usec / 1000));	if (diff > 10000) {		debug_msg("Reliable mbus message failed!\n");		if (m->err_handler == NULL) {			abort();		}		m->err_handler(curr->seqnum, MBUS_MESSAGE_LOST);		/* if we don't delete this failed message, the error handler                   gets triggered every time we call mbus_retransmit */		while (m->waiting_ack->num_cmds > 0) {		    m->waiting_ack->num_cmds--;		    xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]);		    xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]);		}		xfree(m->waiting_ack->dest);		xfree(m->waiting_ack);		m->waiting_ack = NULL;		return;	} 	/* Note: We only send one retransmission each time, to avoid	 * overflowing the receiver with a burst of requests...	 */	if ((diff > 750) && (curr->retransmit_count == 2)) {		resend(m, curr);		return;	} 	if ((diff > 500) && (curr->retransmit_count == 1)) {		resend(m, curr);		return;	} 	if ((diff > 250) && (curr->retransmit_count == 0)) {		resend(m, curr);		return;	}}void mbus_heartbeat(struct mbus *m, int interval){	struct timeval	curr_time;	char	*a = (char *) xmalloc(3);	sprintf(a, "()");	mbus_validate(m);	gettimeofday(&curr_time, NULL);	if (curr_time.tv_sec - m->last_heartbeat.tv_sec >= interval) {	    mb_header(++m->seqnum, curr_time, 'U', m->addr, "()", -1);		mb_add_command("mbus.hello", "");		mb_send(m);		m->last_heartbeat = curr_time;		/* Remove dead sources */		remove_inactiv_other_addr(m, curr_time, interval);	}	xfree(a);}int mbus_waiting_ack(struct mbus *m){	mbus_validate(m);	return m->waiting_ack != NULL;}int mbus_sent_all(struct mbus *m){	mbus_validate(m);	return (m->cmd_queue == NULL) && (m->waiting_ack == NULL);}struct mbus *mbus_init(void  (*cmd_handler)(char *src, char *cmd, char *arg, void *dat), 		       void  (*err_handler)(int seqnum, int reason),		       char  *addr){	struct mbus		*m;	struct mbus_key	 	 k;	struct mbus_parser	*mp;	int		 	 i;	char            	*net_addr, *tmp;	uint16_t         	 net_port;	int              	 net_scope;	m = (struct mbus *) xmalloc(sizeof(struct mbus));	if (m == NULL) {		debug_msg("Unable to allocate memory for mbus\n");		return NULL;	}	m->cfg = mbus_create_config();	mbus_lock_config_file(m->cfg);	net_addr = (char *) xmalloc(20);	mbus_get_net_addr(m->cfg, net_addr, &net_port, &net_scope);	m->s		  = udp_init(net_addr, net_port, net_port, net_scope);        if (m->s == NULL) {                debug_msg("Unable to initialize mbus address\n");                xfree(m);                return NULL;        }	m->seqnum         = 0;	m->cmd_handler    = cmd_handler;	m->err_handler	  = err_handler;	m->num_other_addr = 0;	m->max_other_addr = 10;	m->other_addr     = (char **) xmalloc(sizeof(char *) * 10);	m->other_hello    = (struct timeval **) xmalloc(sizeof(struct timeval *) * 10);	for (i = 0; i < 10; i++) {		m->other_addr[i]  = NULL;		m->other_hello[i] = NULL;	}	m->cmd_queue	  = NULL;	m->waiting_ack	  = NULL;	m->magic          = MBUS_MAGIC;	m->index          = 0;	m->index_sent     = 0;	mp = mbus_parse_init(xstrdup(addr));	if (!mbus_parse_lst(mp, &tmp)) {		debug_msg("Invalid mbus address\n");		abort();	}	m->addr = xstrdup(tmp);	mbus_parse_done(mp);	assert(m->addr != NULL);	gettimeofday(&(m->last_heartbeat), NULL);	mbus_get_encrkey(m->cfg, &k);	m->encrkey    = k.key;	m->encrkeylen = k.key_len;	mbus_get_hashkey(m->cfg, &k);	m->hashkey    = k.key;	m->hashkeylen = k.key_len;	mbus_unlock_config_file(m->cfg);	xfree(net_addr);	return m;}void mbus_cmd_handler(struct mbus *m, void  (*cmd_handler)(char *src, char *cmd, char *arg, void *dat)){	mbus_validate(m);	m->cmd_handler = cmd_handler;}static void mbus_flush_msgs(struct mbus_msg **queue){        struct mbus_msg *curr, *next;        int i;	        curr = *queue;        while(curr) {                next = curr->next;                xfree(curr->dest);                for(i = 0; i < curr->num_cmds; i++) {                        xfree(curr->cmd_list[i]);                        xfree(curr->arg_list[i]);                }		xfree(curr);                curr = next;        }	*queue = NULL;}void mbus_exit(struct mbus *m) {        int i;        assert(m != NULL);	mbus_validate(m);	mbus_qmsg(m, "()", "mbus.bye", "", FALSE);	mbus_send(m);	/* FIXME: It should be a fatal error to call mbus_exit() if some messages are still outstanding. */	/*        We will need an mbus_flush() call first though, to ensure nothing is waiting.          */        mbus_flush_msgs(&m->cmd_queue);        mbus_flush_msgs(&m->waiting_ack);        if (m->encrkey != NULL) {                xfree(m->encrkey);        }        if (m->hashkey != NULL) {        	xfree(m->hashkey);	}        udp_exit(m->s);	/* Clean up other_* */	for (i=m->num_other_addr-1; i>=0; i--){	    remove_other_addr(m, m->other_addr[i]);	}        xfree(m->addr);	xfree(m->other_addr);	xfree(m->other_hello);	xfree(m->cfg);        xfree(m);}void mbus_send(struct mbus *m){	/* Send one, or more, messages previosly queued with mbus_qmsg(). */	/* Messages for the same destination are batched together. Stops  */	/* when a reliable message is sent, until the ACK is received.    */	struct mbus_msg	*curr = m->cmd_queue;	int		 i;	mbus_validate(m);	if (m->waiting_ack != NULL) {		return;	}	while (curr != NULL) {		mbus_msg_validate(curr);		/* It's okay for us to send messages which haven't been marked as complete - */		/* that just means we're sending something which has the potential to have   */		/* more data piggybacked. However, if it's not complete it MUST be the last  */		/* in the list, or something has been reordered - which is bad.              */		if (!curr->complete) {			assert(curr->next == NULL);		}		if (curr->reliable) {		        if (!mbus_addr_valid(m, curr->dest)) {			    debug_msg("Trying to send reliably to an unknown address...\n");			    if (m->err_handler == NULL) {				abort();			    }			    m->err_handler(curr->seqnum, MBUS_DESTINATION_UNKNOWN);			}		        if (!mbus_addr_unique(m, curr->dest)) {			    debug_msg("Trying to send reliably but address is not unique...\n");			    if (m->err_handler == NULL) {				abort();			    }			    m->err_handler(curr->seqnum, MBUS_DESTINATION_NOT_UNIQUE);			}		}		/* Create the message... */	    mb_header(curr->seqnum, curr->comp_time, (char)(curr->reliable?'R':'U'), m->addr, curr->dest, -1);		for (i = 0; i < curr->num_cmds; i++) {			assert(m->index_sent == (curr->idx_list[i] - 1));			m->index_sent = curr->idx_list[i];			mb_add_command(curr->cmd_list[i], curr->arg_list[i]);		}		mb_send(m);				m->cmd_queue = curr->next;		if (curr->reliable) {			/* Reliable message, wait for the ack... */			gettimeofday(&(curr->send_time), NULL);			m->waiting_ack = curr;			curr->next = NULL;			return;		} else {			while (curr->num_cmds > 0) {				curr->num_cmds--;				xfree(curr->cmd_list[curr->num_cmds]); curr->cmd_list[curr->num_cmds] = NULL;				xfree(curr->arg_list[curr->num_cmds]); curr->arg_list[curr->num_cmds] = NULL;			}			xfree(curr->dest);			xfree(curr);		}		curr = m->cmd_queue;	}}void mbus_qmsg(struct mbus *m, const char *dest, const char *cmnd, const char *args, int reliable){	/* Queue up a message for sending. The message is not */	/* actually sent until mbus_send() is called.         */	struct mbus_msg	*curr = m->cmd_queue;	struct mbus_msg	*prev = NULL;	int		 alen = strlen(cmnd) + strlen(args) + 4;	int		 i;	mbus_validate(m);	while (curr != NULL) {		mbus_msg_validate(curr);		if (!curr->complete) {			/* This message is still open for new commands. It MUST be the last in the */			/* cmd_queue, else commands will be reordered.                             */			assert(curr->next == NULL);			if (mbus_addr_identical(curr->dest, dest) &&		            (curr->num_cmds < MBUS_MAX_QLEN) && ((curr->message_size + alen) < (MBUS_BUF_SIZE - 500))) {				curr->num_cmds++;				curr->reliable |= reliable;				curr->cmd_list[curr->num_cmds-1] = xstrdup(cmnd);				curr->arg_list[curr->num_cmds-1] = xstrdup(args);				curr->idx_list[curr->num_cmds-1] = ++(m->index);				curr->message_size += alen;				mbus_msg_validate(curr);				return;			} else {				curr->complete = TRUE;			}		}		prev = curr;		curr = curr->next;	}	/* If we get here, we've not found an open message in the cmd_queue.  We */	/* have to create a new message, and add it to the end of the cmd_queue. */	curr = (struct mbus_msg *) xmalloc(sizeof(struct mbus_msg));	curr->magic            = MBUS_MSG_MAGIC;	curr->next             = NULL;	curr->dest             = xstrdup(dest);	curr->retransmit_count = 0;	curr->message_size     = alen + 60 + strlen(dest) + strlen(m->addr);	curr->seqnum           = ++m->seqnum;	curr->reliable         = reliable;	curr->complete         = FALSE;	curr->num_cmds         = 1;	curr->cmd_list[0]      = xstrdup(cmnd);	curr->arg_list[0]      = xstrdup(args);	curr->idx_list[curr->num_cmds-1] = ++(m->index);	for (i = 1; i < MBUS_MAX_QLEN; i++) {		curr->cmd_list[i] = NULL;		curr->arg_list[i] = NULL;	}	if (prev == NULL) {		m->cmd_queue = curr;	} else {		prev->next = curr;	}	gettimeofday(&(curr->send_time), NULL);	gettimeofday(&(curr->comp_time), NULL);	mbus_msg_validate(curr);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -