📄 messaging.c
字号:
/* Unix SMB/CIFS implementation. Samba internal messaging functions Copyright (C) Andrew Tridgell 2004 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>.*/#include "includes.h"#include "lib/events/events.h"#include "system/filesys.h"#include "messaging/messaging.h"#include "lib/util/dlinklist.h"#include "lib/socket/socket.h"#include "librpc/gen_ndr/ndr_irpc.h"#include "lib/messaging/irpc.h"#include "tdb_wrap.h"#include "lib/util/unix_privs.h"#include "librpc/rpc/dcerpc.h"#include "lib/tdb/include/tdb.h"#include "lib/util/util_tdb.h"#include "lib/util/util_tdb.h"#include "cluster/cluster.h"#include "param/param.h"/* change the message version with any incompatible changes in the protocol */#define MESSAGING_VERSION 1struct messaging_context { struct server_id server_id; struct socket_context *sock; const char *base_path; const char *path; struct dispatch_fn **dispatch; uint32_t num_types; struct idr_context *dispatch_tree; struct messaging_rec *pending; struct messaging_rec *retry_queue; struct smb_iconv_convenience *iconv_convenience; struct irpc_list *irpc; struct idr_context *idr; const char **names; struct timeval start_time; struct timed_event *retry_te; struct { struct event_context *ev; struct fd_event *fde; } event;};/* we have a linked list of dispatch handlers for each msg_type that this messaging server can deal with */struct dispatch_fn { struct dispatch_fn *next, *prev; uint32_t msg_type; void *private; msg_callback_t fn;};/* an individual message */struct messaging_rec { struct messaging_rec *next, *prev; struct messaging_context *msg; const char *path; struct messaging_header { uint32_t version; uint32_t msg_type; struct server_id from; struct server_id to; uint32_t length; } *header; DATA_BLOB packet; uint32_t retries;};static void irpc_handler(struct messaging_context *, void *, uint32_t, struct server_id, DATA_BLOB *);/* A useful function for testing the message system.*/static void ping_message(struct messaging_context *msg, void *private, uint32_t msg_type, struct server_id src, DATA_BLOB *data){ DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n", (uint_t)src.node, (uint_t)src.id, (int)data->length, data->data?(const char *)data->data:"")); messaging_send(msg, src, MSG_PONG, data);}/* return uptime of messaging server via irpc*/static NTSTATUS irpc_uptime(struct irpc_message *msg, struct irpc_uptime *r){ struct messaging_context *ctx = talloc_get_type(msg->private, struct messaging_context); *r->out.start_time = timeval_to_nttime(&ctx->start_time); return NT_STATUS_OK;}/* return the path to a messaging socket*/static char *messaging_path(struct messaging_context *msg, struct server_id server_id){ return talloc_asprintf(msg, "%s/msg.%s", msg->base_path, cluster_id_string(msg, server_id));}/* dispatch a fully received message note that this deliberately can match more than one message handler per message. That allows a single messasging context to register (for example) a debug handler for more than one piece of code*/static void messaging_dispatch(struct messaging_context *msg, struct messaging_rec *rec){ struct dispatch_fn *d, *next; /* temporary IDs use an idtree, the rest use a array of pointers */ if (rec->header->msg_type >= MSG_TMP_BASE) { d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, rec->header->msg_type); } else if (rec->header->msg_type < msg->num_types) { d = msg->dispatch[rec->header->msg_type]; } else { d = NULL; } for (; d; d = next) { DATA_BLOB data; next = d->next; data.data = rec->packet.data + sizeof(*rec->header); data.length = rec->header->length; d->fn(msg, d->private, d->msg_type, rec->header->from, &data); } rec->header->length = 0;}/* handler for messages that arrive from other nodes in the cluster*/static void cluster_message_handler(struct messaging_context *msg, DATA_BLOB packet){ struct messaging_rec *rec; rec = talloc(msg, struct messaging_rec); if (rec == NULL) { smb_panic("Unable to allocate messaging_rec"); } rec->msg = msg; rec->path = msg->path; rec->header = (struct messaging_header *)packet.data; rec->packet = packet; rec->retries = 0; if (packet.length != sizeof(*rec->header) + rec->header->length) { DEBUG(0,("messaging: bad message header size %d should be %d\n", rec->header->length, (int)(packet.length - sizeof(*rec->header)))); talloc_free(rec); return; } messaging_dispatch(msg, rec); talloc_free(rec);}/* try to send the message*/static NTSTATUS try_send(struct messaging_rec *rec){ struct messaging_context *msg = rec->msg; size_t nsent; void *priv; NTSTATUS status; struct socket_address *path; /* rec->path is the path of the *other* socket, where we want * this to end up */ path = socket_address_from_strings(msg, msg->sock->backend_name, rec->path, 0); if (!path) { return NT_STATUS_NO_MEMORY; } /* we send with privileges so messages work from any context */ priv = root_privileges(); status = socket_sendto(msg->sock, &rec->packet, &nsent, path); talloc_free(path); talloc_free(priv); return status;}/* retry backed off messages*/static void msg_retry_timer(struct event_context *ev, struct timed_event *te, struct timeval t, void *private){ struct messaging_context *msg = talloc_get_type(private, struct messaging_context); msg->retry_te = NULL; /* put the messages back on the main queue */ while (msg->retry_queue) { struct messaging_rec *rec = msg->retry_queue; DLIST_REMOVE(msg->retry_queue, rec); DLIST_ADD_END(msg->pending, rec, struct messaging_rec *); } EVENT_FD_WRITEABLE(msg->event.fde); }/* handle a socket write event*/static void messaging_send_handler(struct messaging_context *msg){ while (msg->pending) { struct messaging_rec *rec = msg->pending; NTSTATUS status; status = try_send(rec); if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) { rec->retries++; if (rec->retries > 3) { /* we're getting continuous write errors - backoff this record */ DLIST_REMOVE(msg->pending, rec); DLIST_ADD_END(msg->retry_queue, rec, struct messaging_rec *); if (msg->retry_te == NULL) { msg->retry_te = event_add_timed(msg->event.ev, msg, timeval_current_ofs(1, 0), msg_retry_timer, msg); } } break; } rec->retries = 0; if (!NT_STATUS_IS_OK(status)) { DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n", cluster_id_string(debug_ctx(), rec->header->from), cluster_id_string(debug_ctx(), rec->header->to), rec->header->msg_type, nt_errstr(status))); } DLIST_REMOVE(msg->pending, rec); talloc_free(rec); } if (msg->pending == NULL) { EVENT_FD_NOT_WRITEABLE(msg->event.fde); }}/* handle a new incoming packet*/static void messaging_recv_handler(struct messaging_context *msg){ struct messaging_rec *rec; NTSTATUS status; DATA_BLOB packet; size_t msize; /* see how many bytes are in the next packet */ status = socket_pending(msg->sock, &msize); if (!NT_STATUS_IS_OK(status)) { DEBUG(0,("socket_pending failed in messaging - %s\n", nt_errstr(status))); return; } packet = data_blob_talloc(msg, NULL, msize); if (packet.data == NULL) { /* assume this is temporary and retry */ return; } status = socket_recv(msg->sock, packet.data, msize, &msize); if (!NT_STATUS_IS_OK(status)) { data_blob_free(&packet); return; } if (msize < sizeof(*rec->header)) { DEBUG(0,("messaging: bad message of size %d\n", (int)msize)); data_blob_free(&packet); return; } rec = talloc(msg, struct messaging_rec); if (rec == NULL) { smb_panic("Unable to allocate messaging_rec"); } talloc_steal(rec, packet.data); rec->msg = msg; rec->path = msg->path; rec->header = (struct messaging_header *)packet.data; rec->packet = packet; rec->retries = 0; if (msize != sizeof(*rec->header) + rec->header->length) { DEBUG(0,("messaging: bad message header size %d should be %d\n", rec->header->length, (int)(msize - sizeof(*rec->header)))); talloc_free(rec); return; } messaging_dispatch(msg, rec); talloc_free(rec);}/* handle a socket event*/static void messaging_handler(struct event_context *ev, struct fd_event *fde, uint16_t flags, void *private){ struct messaging_context *msg = talloc_get_type(private, struct messaging_context); if (flags & EVENT_FD_WRITE) { messaging_send_handler(msg); } if (flags & EVENT_FD_READ) { messaging_recv_handler(msg); }}/* Register a dispatch function for a particular message type.*/NTSTATUS messaging_register(struct messaging_context *msg, void *private, uint32_t msg_type, msg_callback_t fn){ struct dispatch_fn *d; /* possibly expand dispatch array */ if (msg_type >= msg->num_types) { struct dispatch_fn **dp; int i; dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1); NT_STATUS_HAVE_NO_MEMORY(dp); msg->dispatch = dp; for (i=msg->num_types;i<=msg_type;i++) { msg->dispatch[i] = NULL; } msg->num_types = msg_type+1; } d = talloc_zero(msg->dispatch, struct dispatch_fn); NT_STATUS_HAVE_NO_MEMORY(d); d->msg_type = msg_type; d->private = private; d->fn = fn; DLIST_ADD(msg->dispatch[msg_type], d); return NT_STATUS_OK;}/* register a temporary message handler. The msg_type is allocated above MSG_TMP_BASE*/NTSTATUS messaging_register_tmp(struct messaging_context *msg, void *private, msg_callback_t fn, uint32_t *msg_type){ struct dispatch_fn *d; int id; d = talloc_zero(msg->dispatch, struct dispatch_fn); NT_STATUS_HAVE_NO_MEMORY(d); d->private = private; d->fn = fn; id = idr_get_new_above(msg->dispatch_tree, d, MSG_TMP_BASE, UINT16_MAX); if (id == -1) { talloc_free(d); return NT_STATUS_TOO_MANY_CONTEXT_IDS; } d->msg_type = (uint32_t)id; (*msg_type) = d->msg_type; return NT_STATUS_OK;}/* De-register the function for a particular message type.*/void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void *private){ struct dispatch_fn *d, *next; if (msg_type >= msg->num_types) { d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, msg_type); if (!d) return; idr_remove(msg->dispatch_tree, msg_type); talloc_free(d); return; } for (d = msg->dispatch[msg_type]; d; d = next) { next = d->next; if (d->private == private) { DLIST_REMOVE(msg->dispatch[msg_type], d); talloc_free(d); } }}/* Send a message to a particular server*/NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server, uint32_t msg_type, DATA_BLOB *data){ struct messaging_rec *rec; NTSTATUS status; size_t dlength = data?data->length:0; rec = talloc(msg, struct messaging_rec); if (rec == NULL) { return NT_STATUS_NO_MEMORY; } rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength); if (rec->packet.data == NULL) { talloc_free(rec); return NT_STATUS_NO_MEMORY; } rec->retries = 0; rec->msg = msg; rec->header = (struct messaging_header *)rec->packet.data; /* zero padding */ ZERO_STRUCTP(rec->header); rec->header->version = MESSAGING_VERSION; rec->header->msg_type = msg_type; rec->header->from = msg->server_id; rec->header->to = server; rec->header->length = dlength; if (dlength != 0) { memcpy(rec->packet.data + sizeof(*rec->header), data->data, dlength); } if (!cluster_node_equal(&msg->server_id, &server)) { /* the destination is on another node - dispatch via the cluster layer */ status = cluster_message_send(server, &rec->packet); talloc_free(rec); return status; } rec->path = messaging_path(msg, server); talloc_steal(rec, rec->path); if (msg->pending != NULL) { status = STATUS_MORE_ENTRIES; } else { status = try_send(rec); } if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) { if (msg->pending == NULL) { EVENT_FD_WRITEABLE(msg->event.fde); } DLIST_ADD_END(msg->pending, rec, struct messaging_rec *); return NT_STATUS_OK; } talloc_free(rec); return status;}/* Send a message to a particular server, with the message containing a single pointer*/NTSTATUS messaging_send_ptr(struct messaging_context *msg, struct server_id server, uint32_t msg_type, void *ptr){ DATA_BLOB blob; blob.data = (uint8_t *)&ptr; blob.length = sizeof(void *); return messaging_send(msg, server, msg_type, &blob);}/* destroy the messaging context*/static int messaging_destructor(struct messaging_context *msg){ unlink(msg->path); while (msg->names && msg->names[0]) { irpc_remove_name(msg, msg->names[0]); } return 0;}/* create the listening socket and setup the dispatcher*/struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, const char *dir, struct server_id server_id, struct smb_iconv_convenience *iconv_convenience, struct event_context *ev){ struct messaging_context *msg; NTSTATUS status; struct socket_address *path; if (ev == NULL) { return NULL; } msg = talloc_zero(mem_ctx, struct messaging_context); if (msg == NULL) { return NULL; } /* setup a handler for messages from other cluster nodes, if appropriate */ status = cluster_message_init(msg, server_id, cluster_message_handler); if (!NT_STATUS_IS_OK(status)) { talloc_free(msg); return NULL; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -