⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mbus.c

📁 网络MPEG4IP流媒体开发源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
                curr = next;        }	*queue = NULL;}void mbus_exit(struct mbus *m) {        int i;        ASSERT(m != NULL);	mbus_validate(m);	mbus_qmsg(m, "()", "mbus.bye", "", FALSE);	mbus_send(m);	/* FIXME: It should be a fatal error to call mbus_exit() if some messages are still outstanding. */	/*        We will need an mbus_flush() call first though, to ensure nothing is waiting.          */        mbus_flush_msgs(&m->cmd_queue);        mbus_flush_msgs(&m->waiting_ack);        if (m->encrkey != NULL) {                xfree(m->encrkey);        }        if (m->hashkey != NULL) {        	xfree(m->hashkey);	}        udp_exit(m->s);	/* Clean up other_* */	for (i=m->num_other_addr-1; i>=0; i--){	    remove_other_addr(m, m->other_addr[i]);	}        xfree(m->addr);	xfree(m->other_addr);	xfree(m->other_hello);	xfree(m->cfg);        xfree(m);}void mbus_send(struct mbus *m){	/* Send one, or more, messages previosly queued with mbus_qmsg(). */	/* Messages for the same destination are batched together. Stops  */	/* when a reliable message is sent, until the ACK is received.    */	struct mbus_msg	*curr = m->cmd_queue;	int		 i;	mbus_validate(m);	if (m->waiting_ack != NULL) {		return;	}	while (curr != NULL) {		mbus_msg_validate(curr);		/* It's okay for us to send messages which haven't been marked as complete - */		/* that just means we're sending something which has the potential to have   */		/* more data piggybacked. However, if it's not complete it MUST be the last  */		/* in the list, or something has been reordered - which is bad.              */		if (!curr->complete) {			ASSERT(curr->next == NULL);		}		if (curr->reliable) {		        if (!mbus_addr_valid(m, curr->dest)) {			    debug_msg("Trying to send reliably to an unknown address...\n");			    if (m->err_handler == NULL) {				abort();			    }			    m->err_handler(curr->seqnum, MBUS_DESTINATION_UNKNOWN);			}		        if (!mbus_addr_unique(m, curr->dest)) {			    debug_msg("Trying to send reliably but address is not unique...\n");			    if (m->err_handler == NULL) {				abort();			    }			    m->err_handler(curr->seqnum, MBUS_DESTINATION_NOT_UNIQUE);			}		}		/* Create the message... */		mb_header(curr->seqnum, curr->comp_time.tv_sec, (char)(curr->reliable?'R':'U'), m->addr, curr->dest, -1);		for (i = 0; i < curr->num_cmds; i++) {			ASSERT(m->index_sent == (curr->idx_list[i] - 1));			m->index_sent = curr->idx_list[i];			mb_add_command(curr->cmd_list[i], curr->arg_list[i]);		}		mb_send(m);				m->cmd_queue = curr->next;		if (curr->reliable) {			/* Reliable message, wait for the ack... */			gettimeofday(&(curr->send_time), NULL);			m->waiting_ack = curr;			curr->next = NULL;			return;		} else {			while (curr->num_cmds > 0) {				curr->num_cmds--;				xfree(curr->cmd_list[curr->num_cmds]); curr->cmd_list[curr->num_cmds] = NULL;				xfree(curr->arg_list[curr->num_cmds]); curr->arg_list[curr->num_cmds] = NULL;			}			xfree(curr->dest);			xfree(curr);		}		curr = m->cmd_queue;	}}void mbus_qmsg(struct mbus *m, const char *dest, const char *cmnd, const char *args, int reliable){	/* Queue up a message for sending. The message is not */	/* actually sent until mbus_send() is called.         */	struct mbus_msg	*curr = m->cmd_queue;	struct mbus_msg	*prev = NULL;	int		 alen = strlen(cmnd) + strlen(args) + 4;	int		 i;	mbus_validate(m);	while (curr != NULL) {		mbus_msg_validate(curr);		if (!curr->complete) {			/* This message is still open for new commands. It MUST be the last in the */			/* cmd_queue, else commands will be reordered.                             */			ASSERT(curr->next == NULL);			if (mbus_addr_identical(curr->dest, dest) &&		            (curr->num_cmds < MBUS_MAX_QLEN) && ((curr->message_size + alen) < (MBUS_BUF_SIZE - 500))) {				curr->num_cmds++;				curr->reliable |= reliable;				curr->cmd_list[curr->num_cmds-1] = xstrdup(cmnd);				curr->arg_list[curr->num_cmds-1] = xstrdup(args);				curr->idx_list[curr->num_cmds-1] = ++(m->index);				curr->message_size += alen;				mbus_msg_validate(curr);				return;			} else {				curr->complete = TRUE;			}		}		prev = curr;		curr = curr->next;	}	/* If we get here, we've not found an open message in the cmd_queue.  We */	/* have to create a new message, and add it to the end of the cmd_queue. */	curr = (struct mbus_msg *) xmalloc(sizeof(struct mbus_msg));	curr->magic            = MBUS_MSG_MAGIC;	curr->next             = NULL;	curr->dest             = xstrdup(dest);	curr->retransmit_count = 0;	curr->message_size     = alen + 60 + strlen(dest) + strlen(m->addr);	curr->seqnum           = ++m->seqnum;	curr->reliable         = reliable;	curr->complete         = FALSE;	curr->num_cmds         = 1;	curr->cmd_list[0]      = xstrdup(cmnd);	curr->arg_list[0]      = xstrdup(args);	curr->idx_list[curr->num_cmds-1] = ++(m->index);	for (i = 1; i < MBUS_MAX_QLEN; i++) {		curr->cmd_list[i] = NULL;		curr->arg_list[i] = NULL;	}	if (prev == NULL) {		m->cmd_queue = curr;	} else {		prev->next = curr;	}	gettimeofday(&(curr->send_time), NULL);	gettimeofday(&(curr->comp_time), NULL);	mbus_msg_validate(curr);}#define mbus_qmsgf(m, dest, reliable, cmnd, format, var) \{ \char buffer[MBUS_BUF_SIZE]; \mbus_validate(m); \snprintf(buffer, MBUS_BUF_SIZE, format, var); \mbus_qmsg(m, dest, cmnd, buffer, reliable); \}#if 0void mbus_qmsgf(struct mbus *m, const char *dest, int reliable, const char *cmnd, const char *format, ...){	/* This is a wrapper around mbus_qmsg() which does a printf() style format into  */	/* a buffer. Saves the caller from having to a a malloc(), write the args string */	/* and then do a free(), and also saves worring about overflowing the buffer, so */	/* removing a common source of bugs!                                             */	char	buffer[MBUS_BUF_SIZE];	va_list	ap;	mbus_validate(m);	va_start(ap, format);#ifdef WIN32        _vsnprintf(buffer, MBUS_BUF_SIZE, format, ap);#else        vsnprintf(buffer, MBUS_BUF_SIZE, format, ap);#endif	va_end(ap);	mbus_qmsg(m, dest, cmnd, buffer, reliable);}#endifint mbus_recv(struct mbus *m, void *data, struct timeval *timeout){	char			*auth, *ver, *src, *dst, *ack, *r, *cmd, *param, *npos;	char	 		 buffer[MBUS_BUF_SIZE];	int	 		 buffer_len, seq, a, rx, ts, authlen, loop_count;	char	 		 ackbuf[MBUS_ACK_BUF_SIZE];	char	 		 digest[16];	unsigned char		 initVec[8] = {0,0,0,0,0,0,0,0};	struct timeval		 t;	struct mbus_parser	*mp, *mp2;	mbus_validate(m);	rx = FALSE;	loop_count = 0;	while (loop_count++ < 10) {		memset(buffer, 0, MBUS_BUF_SIZE);                ASSERT(m->s != NULL);		udp_fd_zero();		udp_fd_set(m->s);		t.tv_sec  = timeout->tv_sec;		t.tv_usec = timeout->tv_usec;                if ((udp_select(&t) > 0) && udp_fd_isset(m->s)) {			buffer_len = udp_recv(m->s, buffer, MBUS_BUF_SIZE);			if (buffer_len > 0) {				rx = TRUE;			} else {				return rx;			}		} else {			return FALSE;		}		if (m->encrkey != NULL) {			/* Decrypt the message... */			if ((buffer_len % 8) != 0) {				debug_msg("Encrypted message not a multiple of 8 bytes in length\n");				continue;			}			memcpy(mb_cryptbuf, buffer, buffer_len);			memset(initVec, 0, 8);			qfDES_CBC_d(m->encrkey, mb_cryptbuf, buffer_len, initVec);			memcpy(buffer, mb_cryptbuf, buffer_len);		}		/* Sanity check that this is a vaguely sensible format message... Should prevent */		/* problems if we're fed complete garbage, but won't prevent determined hackers. */		if (strncmp(buffer + MBUS_AUTH_LEN + 1, "mbus/1.0", 8) != 0) {			continue;		}		mp = mbus_parse_init(buffer);		/* remove trailing 0 bytes */		npos = (char *) strchr(buffer,'\0');		if(npos!=NULL) {			buffer_len=npos-buffer;		}		/* Parse the authentication header */		if (!mbus_parse_sym(mp, &auth)) {			debug_msg("Failed to parse authentication header\n");			mbus_parse_done(mp);			continue;		}		/* Check that the packet authenticates correctly... */		authlen = strlen(auth);		hmac_md5(buffer + authlen + 1, buffer_len - authlen - 1, m->hashkey, m->hashkeylen, digest);		base64encode(digest, 12, ackbuf, 16);		if ((strlen(auth) != 16) || (strncmp(auth, ackbuf, 16) != 0)) {			debug_msg("Failed to authenticate message...\n");			mbus_parse_done(mp);			continue;		}		/* Parse the header */		if (!mbus_parse_sym(mp, &ver)) {			mbus_parse_done(mp);			debug_msg("Parser failed version (1): %s\n",ver);			continue;		}		if (strcmp(ver, "mbus/1.0") != 0) {			mbus_parse_done(mp);			debug_msg("Parser failed version (2): %s\n",ver);			continue;		}		if (!mbus_parse_int(mp, &seq)) {			mbus_parse_done(mp);			debug_msg("Parser failed seq\n");			continue;		}		if (!mbus_parse_int(mp, &ts)) {			mbus_parse_done(mp);			debug_msg("Parser failed ts\n");			continue;		}		if (!mbus_parse_sym(mp, &r)) {			mbus_parse_done(mp);			debug_msg("Parser failed reliable\n");			continue;		}		if (!mbus_parse_lst(mp, &src)) {			mbus_parse_done(mp);			debug_msg("Parser failed src\n");			continue;		}		if (!mbus_parse_lst(mp, &dst)) {			mbus_parse_done(mp);			debug_msg("Parser failed dst\n");			continue;		}		if (!mbus_parse_lst(mp, &ack)) {			mbus_parse_done(mp);			debug_msg("Parser failed ack\n");			continue;		}		store_other_addr(m, src);		/* Check if the message was addressed to us... */		if (mbus_addr_match(m->addr, dst)) {			/* ...if so, process any ACKs received... */			mp2 = mbus_parse_init(ack);			while (mbus_parse_int(mp2, &a)) {				if (mbus_waiting_ack(m)) {					if (m->waiting_ack->seqnum == a) {						while (m->waiting_ack->num_cmds > 0) {							m->waiting_ack->num_cmds--;							xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]);							xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]);						}						xfree(m->waiting_ack->dest);						xfree(m->waiting_ack);						m->waiting_ack = NULL;					} else {						debug_msg("Got ACK %d but wanted %d\n", a, m->waiting_ack->seqnum);					}				} else {					debug_msg("Got ACK %d but wasn't expecting it\n", a);				}			}			mbus_parse_done(mp2);			/* ...if an ACK was requested, send one... */			if (strcmp(r, "R") == 0) {				char 		*newsrc = (char *) xmalloc(strlen(src) + 3);				struct timeval	 t;				sprintf(newsrc, "(%s)", src);	/* Yes, this is a kludge. */				gettimeofday(&t, NULL);				mb_header(++m->seqnum, (int) t.tv_sec, 'U', m->addr, newsrc, seq);				mb_send(m);				xfree(newsrc);			} else if (strcmp(r, "U") == 0) {				/* Unreliable message.... not need to do anything */			} else {				debug_msg("Message with invalid reliability field \"%s\" ignored\n", r);			}			/* ...and process the commands contained in the message */			while (mbus_parse_sym(mp, &cmd)) {				if (mbus_parse_lst(mp, &param)) {					char 		*newsrc = (char *) xmalloc(strlen(src) + 3);					sprintf(newsrc, "(%s)", src);	/* Yes, this is a kludge. */					/* Finally, we snoop on the message we just passed to the application, */					/* to do housekeeping of our list of known mbus sources...             */					if (strcmp(cmd, "mbus.bye") == 0) {						remove_other_addr(m, newsrc);					} 					if (strcmp(cmd, "mbus.hello") == 0) {						/* Mark this source as activ. We remove dead sources in mbus_heartbeat */						store_other_addr(m, newsrc);					}					m->cmd_handler(newsrc, cmd, param, data);					xfree(newsrc);				} else {					debug_msg("Unable to parse mbus command:\n");					debug_msg("cmd = %s\n", cmd);					debug_msg("arg = %s\n", param);					break;				}			}		}		mbus_parse_done(mp);	}	return rx;}#define RZ_HANDLE_WAITING 1#define RZ_HANDLE_GO      2struct mbus_rz {	char		*peer;	char		*token;	struct mbus	*m;	void		*data;	int		 mode;	void (*cmd_handler)(char *src, char *cmd, char *args, void *data);};static void rz_handler(char *src, char *cmd, char *args, void *data){	struct mbus_rz		*r = (struct mbus_rz *) data;	struct mbus_parser	*mp;	if ((r->mode == RZ_HANDLE_WAITING) && (strcmp(cmd, "mbus.waiting") == 0)) {		char	*t;		mp = mbus_parse_init(args);		mbus_parse_str(mp, &t);		if (strcmp(mbus_decode_str(t), r->token) == 0) {                        if (r->peer != NULL) xfree(r->peer);			r->peer = xstrdup(src);		}		mbus_parse_done(mp);	} else if ((r->mode == RZ_HANDLE_GO) && (strcmp(cmd, "mbus.go") == 0)) {		char	*t;		mp = mbus_parse_init(args);		mbus_parse_str(mp, &t);		if (strcmp(mbus_decode_str(t), r->token) == 0) {                        if (r->peer != NULL) xfree(r->peer);			r->peer = xstrdup(src);		}		mbus_parse_done(mp);	} else {		r->cmd_handler(src, cmd, args, r->data);	}}char *mbus_rendezvous_waiting(struct mbus *m, char *addr, char *token, void *data){	/* Loop, sending mbus.waiting(token) to "addr", until we get mbus.go(token) */	/* back from our peer. Any other mbus commands received whilst waiting are  */	/* processed in the normal manner, as if mbus_recv() had been called.       */	char		*token_e, *peer;	struct timeval	 timeout;	struct mbus_rz	*r;	mbus_validate(m);	r = (struct mbus_rz *) xmalloc(sizeof(struct mbus_rz));	r->peer        = NULL;	r->token       = token;	r->m           = m;	r->data        = data;	r->mode        = RZ_HANDLE_GO;	r->cmd_handler = m->cmd_handler;	m->cmd_handler = rz_handler;	token_e        = mbus_encode_str(token);	while (r->peer == NULL) {		timeout.tv_sec  = 0;		timeout.tv_usec = 100000;		mbus_heartbeat(m, 1);		mbus_qmsgf(m, addr, FALSE, "mbus.waiting", "%s", token_e);		mbus_send(m);		mbus_recv(m, r, &timeout);		mbus_retransmit(m);	}	m->cmd_handler = r->cmd_handler;	peer = r->peer;	xfree(r);	xfree(token_e);	return peer;}char *mbus_rendezvous_go(struct mbus *m, char *token, void *data){	/* Wait until we receive mbus.waiting(token), then send mbus.go(token) back to   */	/* the sender of that message. Whilst waiting, other mbus commands are processed */	/* in the normal manner as if mbus_recv() had been called.                       */	char		*token_e, *peer;	struct timeval	 timeout;	struct mbus_rz	*r;	mbus_validate(m);	r = (struct mbus_rz *) xmalloc(sizeof(struct mbus_rz));	r->peer        = NULL;	r->token       = token;	r->m           = m;	r->data        = data;	r->mode        = RZ_HANDLE_WAITING;	r->cmd_handler = m->cmd_handler;	m->cmd_handler = rz_handler;	token_e        = mbus_encode_str(token);	while (r->peer == NULL) {		timeout.tv_sec  = 0;		timeout.tv_usec = 100000;		mbus_heartbeat(m, 1);		mbus_send(m);		mbus_recv(m, r, &timeout);		mbus_retransmit(m);	}	mbus_qmsgf(m, r->peer, TRUE, "mbus.go", "%s", token_e);	do {		mbus_heartbeat(m, 1);		mbus_retransmit(m);		mbus_send(m);		timeout.tv_sec  = 0;		timeout.tv_usec = 100000;		mbus_recv(m, r, &timeout);	} while (!mbus_sent_all(m));	m->cmd_handler = r->cmd_handler;	peer = r->peer;	xfree(r);	xfree(token_e);	return peer;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -