⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messaging.c

📁 samba最新软件
💻 C
📖 第 1 页 / 共 2 页
字号:
/*    Unix SMB/CIFS implementation.   Samba internal messaging functions   Copyright (C) Andrew Tridgell 2004      This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 3 of the License, or   (at your option) any later version.      This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.      You should have received a copy of the GNU General Public License   along with this program.  If not, see <http://www.gnu.org/licenses/>.*/#include "includes.h"#include "lib/events/events.h"#include "system/filesys.h"#include "messaging/messaging.h"#include "lib/util/dlinklist.h"#include "lib/socket/socket.h"#include "librpc/gen_ndr/ndr_irpc.h"#include "lib/messaging/irpc.h"#include "tdb_wrap.h"#include "lib/util/unix_privs.h"#include "librpc/rpc/dcerpc.h"#include "lib/tdb/include/tdb.h"#include "lib/util/util_tdb.h"#include "lib/util/util_tdb.h"#include "cluster/cluster.h"#include "param/param.h"/* change the message version with any incompatible changes in the protocol */#define MESSAGING_VERSION 1struct messaging_context {	struct server_id server_id;	struct socket_context *sock;	const char *base_path;	const char *path;	struct dispatch_fn **dispatch;	uint32_t num_types;	struct idr_context *dispatch_tree;	struct messaging_rec *pending;	struct messaging_rec *retry_queue;	struct smb_iconv_convenience *iconv_convenience;	struct irpc_list *irpc;	struct idr_context *idr;	const char **names;	struct timeval start_time;	struct timed_event *retry_te;	struct {		struct event_context *ev;		struct fd_event *fde;	} event;};/* we have a linked list of dispatch handlers for each msg_type that   this messaging server can deal with */struct dispatch_fn {	struct dispatch_fn *next, *prev;	uint32_t msg_type;	void *private;	msg_callback_t fn;};/* an individual message */struct messaging_rec {	struct messaging_rec *next, *prev;	struct messaging_context *msg;	const char *path;	struct messaging_header {		uint32_t version;		uint32_t msg_type;		struct server_id from;		struct server_id to;		uint32_t length;	} *header;	DATA_BLOB packet;	uint32_t retries;};static void irpc_handler(struct messaging_context *, void *, 			 uint32_t, struct server_id, DATA_BLOB *);/* A useful function for testing the message system.*/static void ping_message(struct messaging_context *msg, void *private, 			 uint32_t msg_type, struct server_id src, DATA_BLOB *data){	DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n",		 (uint_t)src.node, (uint_t)src.id, (int)data->length, 		 data->data?(const char *)data->data:""));	messaging_send(msg, src, MSG_PONG, data);}/*  return uptime of messaging server via irpc*/static NTSTATUS irpc_uptime(struct irpc_message *msg, 			    struct irpc_uptime *r){	struct messaging_context *ctx = talloc_get_type(msg->private, struct messaging_context);	*r->out.start_time = timeval_to_nttime(&ctx->start_time);	return NT_STATUS_OK;}/*    return the path to a messaging socket*/static char *messaging_path(struct messaging_context *msg, struct server_id server_id){	return talloc_asprintf(msg, "%s/msg.%s", msg->base_path, 			       cluster_id_string(msg, server_id));}/*  dispatch a fully received message  note that this deliberately can match more than one message handler  per message. That allows a single messasging context to register  (for example) a debug handler for more than one piece of code*/static void messaging_dispatch(struct messaging_context *msg, struct messaging_rec *rec){	struct dispatch_fn *d, *next;	/* temporary IDs use an idtree, the rest use a array of pointers */	if (rec->header->msg_type >= MSG_TMP_BASE) {		d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 						   rec->header->msg_type);	} else if (rec->header->msg_type < msg->num_types) {		d = msg->dispatch[rec->header->msg_type];	} else {		d = NULL;	}	for (; d; d = next) {		DATA_BLOB data;		next = d->next;		data.data = rec->packet.data + sizeof(*rec->header);		data.length = rec->header->length;		d->fn(msg, d->private, d->msg_type, rec->header->from, &data);	}	rec->header->length = 0;}/*  handler for messages that arrive from other nodes in the cluster*/static void cluster_message_handler(struct messaging_context *msg, DATA_BLOB packet){	struct messaging_rec *rec;	rec = talloc(msg, struct messaging_rec);	if (rec == NULL) {		smb_panic("Unable to allocate messaging_rec");	}	rec->msg           = msg;	rec->path          = msg->path;	rec->header        = (struct messaging_header *)packet.data;	rec->packet        = packet;	rec->retries       = 0;	if (packet.length != sizeof(*rec->header) + rec->header->length) {		DEBUG(0,("messaging: bad message header size %d should be %d\n", 			 rec->header->length, (int)(packet.length - sizeof(*rec->header))));		talloc_free(rec);		return;	}	messaging_dispatch(msg, rec);	talloc_free(rec);}/*  try to send the message*/static NTSTATUS try_send(struct messaging_rec *rec){	struct messaging_context *msg = rec->msg;	size_t nsent;	void *priv;	NTSTATUS status;	struct socket_address *path;	/* rec->path is the path of the *other* socket, where we want	 * this to end up */	path = socket_address_from_strings(msg, msg->sock->backend_name, 					   rec->path, 0);	if (!path) {		return NT_STATUS_NO_MEMORY;	}	/* we send with privileges so messages work from any context */	priv = root_privileges();	status = socket_sendto(msg->sock, &rec->packet, &nsent, path);	talloc_free(path);	talloc_free(priv);	return status;}/*  retry backed off messages*/static void msg_retry_timer(struct event_context *ev, struct timed_event *te, 			    struct timeval t, void *private){	struct messaging_context *msg = talloc_get_type(private, 							struct messaging_context);	msg->retry_te = NULL;	/* put the messages back on the main queue */	while (msg->retry_queue) {		struct messaging_rec *rec = msg->retry_queue;		DLIST_REMOVE(msg->retry_queue, rec);		DLIST_ADD_END(msg->pending, rec, struct messaging_rec *);	}	EVENT_FD_WRITEABLE(msg->event.fde);	}/*  handle a socket write event*/static void messaging_send_handler(struct messaging_context *msg){	while (msg->pending) {		struct messaging_rec *rec = msg->pending;		NTSTATUS status;		status = try_send(rec);		if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {			rec->retries++;			if (rec->retries > 3) {				/* we're getting continuous write errors -				   backoff this record */				DLIST_REMOVE(msg->pending, rec);				DLIST_ADD_END(msg->retry_queue, rec, 					      struct messaging_rec *);				if (msg->retry_te == NULL) {					msg->retry_te = 						event_add_timed(msg->event.ev, msg, 								timeval_current_ofs(1, 0), 								msg_retry_timer, msg);				}			}			break;		}		rec->retries = 0;		if (!NT_STATUS_IS_OK(status)) {			DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n", 				 cluster_id_string(debug_ctx(), rec->header->from), 				 cluster_id_string(debug_ctx(), rec->header->to), 				 rec->header->msg_type, 				 nt_errstr(status)));		}		DLIST_REMOVE(msg->pending, rec);		talloc_free(rec);	}	if (msg->pending == NULL) {		EVENT_FD_NOT_WRITEABLE(msg->event.fde);	}}/*  handle a new incoming packet*/static void messaging_recv_handler(struct messaging_context *msg){	struct messaging_rec *rec;	NTSTATUS status;	DATA_BLOB packet;	size_t msize;	/* see how many bytes are in the next packet */	status = socket_pending(msg->sock, &msize);	if (!NT_STATUS_IS_OK(status)) {		DEBUG(0,("socket_pending failed in messaging - %s\n", 			 nt_errstr(status)));		return;	}		packet = data_blob_talloc(msg, NULL, msize);	if (packet.data == NULL) {		/* assume this is temporary and retry */		return;	}	    	status = socket_recv(msg->sock, packet.data, msize, &msize);	if (!NT_STATUS_IS_OK(status)) {		data_blob_free(&packet);		return;	}	if (msize < sizeof(*rec->header)) {		DEBUG(0,("messaging: bad message of size %d\n", (int)msize));		data_blob_free(&packet);		return;	}	rec = talloc(msg, struct messaging_rec);	if (rec == NULL) {		smb_panic("Unable to allocate messaging_rec");	}	talloc_steal(rec, packet.data);	rec->msg           = msg;	rec->path          = msg->path;	rec->header        = (struct messaging_header *)packet.data;	rec->packet        = packet;	rec->retries       = 0;	if (msize != sizeof(*rec->header) + rec->header->length) {		DEBUG(0,("messaging: bad message header size %d should be %d\n", 			 rec->header->length, (int)(msize - sizeof(*rec->header))));		talloc_free(rec);		return;	}	messaging_dispatch(msg, rec);	talloc_free(rec);}/*  handle a socket event*/static void messaging_handler(struct event_context *ev, struct fd_event *fde, 			      uint16_t flags, void *private){	struct messaging_context *msg = talloc_get_type(private, 							struct messaging_context);	if (flags & EVENT_FD_WRITE) {		messaging_send_handler(msg);	}	if (flags & EVENT_FD_READ) {		messaging_recv_handler(msg);	}}/*  Register a dispatch function for a particular message type.*/NTSTATUS messaging_register(struct messaging_context *msg, void *private,			    uint32_t msg_type, msg_callback_t fn){	struct dispatch_fn *d;	/* possibly expand dispatch array */	if (msg_type >= msg->num_types) {		struct dispatch_fn **dp;		int i;		dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1);		NT_STATUS_HAVE_NO_MEMORY(dp);		msg->dispatch = dp;		for (i=msg->num_types;i<=msg_type;i++) {			msg->dispatch[i] = NULL;		}		msg->num_types = msg_type+1;	}	d = talloc_zero(msg->dispatch, struct dispatch_fn);	NT_STATUS_HAVE_NO_MEMORY(d);	d->msg_type = msg_type;	d->private = private;	d->fn = fn;	DLIST_ADD(msg->dispatch[msg_type], d);	return NT_STATUS_OK;}/*  register a temporary message handler. The msg_type is allocated  above MSG_TMP_BASE*/NTSTATUS messaging_register_tmp(struct messaging_context *msg, void *private,				msg_callback_t fn, uint32_t *msg_type){	struct dispatch_fn *d;	int id;	d = talloc_zero(msg->dispatch, struct dispatch_fn);	NT_STATUS_HAVE_NO_MEMORY(d);	d->private = private;	d->fn = fn;	id = idr_get_new_above(msg->dispatch_tree, d, MSG_TMP_BASE, UINT16_MAX);	if (id == -1) {		talloc_free(d);		return NT_STATUS_TOO_MANY_CONTEXT_IDS;	}	d->msg_type = (uint32_t)id;	(*msg_type) = d->msg_type;	return NT_STATUS_OK;}/*  De-register the function for a particular message type.*/void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void *private){	struct dispatch_fn *d, *next;	if (msg_type >= msg->num_types) {		d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 						   msg_type);		if (!d) return;		idr_remove(msg->dispatch_tree, msg_type);		talloc_free(d);		return;	}	for (d = msg->dispatch[msg_type]; d; d = next) {		next = d->next;		if (d->private == private) {			DLIST_REMOVE(msg->dispatch[msg_type], d);			talloc_free(d);		}	}}/*  Send a message to a particular server*/NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server, 			uint32_t msg_type, DATA_BLOB *data){	struct messaging_rec *rec;	NTSTATUS status;	size_t dlength = data?data->length:0;	rec = talloc(msg, struct messaging_rec);	if (rec == NULL) {		return NT_STATUS_NO_MEMORY;	}	rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength);	if (rec->packet.data == NULL) {		talloc_free(rec);		return NT_STATUS_NO_MEMORY;	}	rec->retries       = 0;	rec->msg              = msg;	rec->header           = (struct messaging_header *)rec->packet.data;	/* zero padding */	ZERO_STRUCTP(rec->header);	rec->header->version  = MESSAGING_VERSION;	rec->header->msg_type = msg_type;	rec->header->from     = msg->server_id;	rec->header->to       = server;	rec->header->length   = dlength;	if (dlength != 0) {		memcpy(rec->packet.data + sizeof(*rec->header), 		       data->data, dlength);	}	if (!cluster_node_equal(&msg->server_id, &server)) {		/* the destination is on another node - dispatch via		   the cluster layer */		status = cluster_message_send(server, &rec->packet);		talloc_free(rec);		return status;	}	rec->path = messaging_path(msg, server);	talloc_steal(rec, rec->path);	if (msg->pending != NULL) {		status = STATUS_MORE_ENTRIES;	} else {		status = try_send(rec);	}	if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {		if (msg->pending == NULL) {			EVENT_FD_WRITEABLE(msg->event.fde);		}		DLIST_ADD_END(msg->pending, rec, struct messaging_rec *);		return NT_STATUS_OK;	}	talloc_free(rec);	return status;}/*  Send a message to a particular server, with the message containing a single pointer*/NTSTATUS messaging_send_ptr(struct messaging_context *msg, struct server_id server, 			    uint32_t msg_type, void *ptr){	DATA_BLOB blob;	blob.data = (uint8_t *)&ptr;	blob.length = sizeof(void *);	return messaging_send(msg, server, msg_type, &blob);}/*  destroy the messaging context*/static int messaging_destructor(struct messaging_context *msg){	unlink(msg->path);	while (msg->names && msg->names[0]) {		irpc_remove_name(msg, msg->names[0]);	}	return 0;}/*  create the listening socket and setup the dispatcher*/struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 					 const char *dir,					 struct server_id server_id, 					 struct smb_iconv_convenience *iconv_convenience,					 struct event_context *ev){	struct messaging_context *msg;	NTSTATUS status;	struct socket_address *path;	if (ev == NULL) {		return NULL;	}	msg = talloc_zero(mem_ctx, struct messaging_context);	if (msg == NULL) {		return NULL;	}	/* setup a handler for messages from other cluster nodes, if appropriate */	status = cluster_message_init(msg, server_id, cluster_message_handler);	if (!NT_STATUS_IS_OK(status)) {		talloc_free(msg);		return NULL;	}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -