📄 messaging.c
字号:
/* create the messaging directory if needed */ mkdir(dir, 0700); msg->base_path = talloc_reference(msg, dir); msg->path = messaging_path(msg, server_id); msg->server_id = server_id; msg->iconv_convenience = iconv_convenience; msg->idr = idr_init(msg); msg->dispatch_tree = idr_init(msg); msg->start_time = timeval_current(); status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0); if (!NT_STATUS_IS_OK(status)) { talloc_free(msg); return NULL; } /* by stealing here we ensure that the socket is cleaned up (and even deleted) on exit */ talloc_steal(msg, msg->sock); path = socket_address_from_strings(msg, msg->sock->backend_name, msg->path, 0); if (!path) { talloc_free(msg); return NULL; } status = socket_listen(msg->sock, path, 50, 0); if (!NT_STATUS_IS_OK(status)) { DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status))); talloc_free(msg); return NULL; } /* it needs to be non blocking for sends */ set_blocking(socket_get_fd(msg->sock), false); msg->event.ev = talloc_reference(msg, ev); msg->event.fde = event_add_fd(ev, msg, socket_get_fd(msg->sock), EVENT_FD_READ, messaging_handler, msg); talloc_set_destructor(msg, messaging_destructor); messaging_register(msg, NULL, MSG_PING, ping_message); messaging_register(msg, NULL, MSG_IRPC, irpc_handler); IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg); return msg;}/* A hack, for the short term until we get 'client only' messaging in place */struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx, const char *dir, struct smb_iconv_convenience *iconv_convenience, struct event_context *ev){ struct server_id id; ZERO_STRUCT(id); id.id = random() % 0x10000000; return messaging_init(mem_ctx, dir, id, iconv_convenience, ev);}/* a list of registered irpc server functions*/struct irpc_list { struct irpc_list *next, *prev; struct GUID uuid; const struct ndr_interface_table *table; int callnum; irpc_function_t fn; void *private;};/* register a irpc server function*/NTSTATUS irpc_register(struct messaging_context *msg_ctx, const struct ndr_interface_table *table, int callnum, irpc_function_t fn, void *private){ struct irpc_list *irpc; /* override an existing handler, if any */ for (irpc=msg_ctx->irpc; irpc; irpc=irpc->next) { if (irpc->table == table && irpc->callnum == callnum) { break; } } if (irpc == NULL) { irpc = talloc(msg_ctx, struct irpc_list); NT_STATUS_HAVE_NO_MEMORY(irpc); DLIST_ADD(msg_ctx->irpc, irpc); } irpc->table = table; irpc->callnum = callnum; irpc->fn = fn; irpc->private = private; irpc->uuid = irpc->table->syntax_id.uuid; return NT_STATUS_OK;}/* handle an incoming irpc reply message*/static void irpc_handler_reply(struct messaging_context *msg_ctx, struct irpc_message *m){ struct irpc_request *irpc; enum ndr_err_code ndr_err; irpc = (struct irpc_request *)idr_find(msg_ctx->idr, m->header.callid); if (irpc == NULL) return; /* parse the reply data */ ndr_err = irpc->table->calls[irpc->callnum].ndr_pull(m->ndr, NDR_OUT, irpc->r); if (NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { irpc->status = m->header.status; talloc_steal(irpc->mem_ctx, m); } else { irpc->status = ndr_map_error2ntstatus(ndr_err); talloc_steal(irpc, m); } irpc->done = true; if (irpc->async.fn) { irpc->async.fn(irpc); }}/* send a irpc reply*/NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status){ struct ndr_push *push; DATA_BLOB packet; enum ndr_err_code ndr_err; m->header.status = status; /* setup the reply */ push = ndr_push_init_ctx(m->ndr, m->msg_ctx->iconv_convenience); if (push == NULL) { status = NT_STATUS_NO_MEMORY; goto failed; } m->header.flags |= IRPC_FLAG_REPLY; /* construct the packet */ ndr_err = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, &m->header); if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { status = ndr_map_error2ntstatus(ndr_err); goto failed; } ndr_err = m->irpc->table->calls[m->irpc->callnum].ndr_push(push, NDR_OUT, m->data); if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { status = ndr_map_error2ntstatus(ndr_err); goto failed; } /* send the reply message */ packet = ndr_push_blob(push); status = messaging_send(m->msg_ctx, m->from, MSG_IRPC, &packet); if (!NT_STATUS_IS_OK(status)) goto failed;failed: talloc_free(m); return status;}/* handle an incoming irpc request message*/static void irpc_handler_request(struct messaging_context *msg_ctx, struct irpc_message *m){ struct irpc_list *i; void *r; enum ndr_err_code ndr_err; for (i=msg_ctx->irpc; i; i=i->next) { if (GUID_equal(&i->uuid, &m->header.uuid) && i->table->syntax_id.if_version == m->header.if_version && i->callnum == m->header.callnum) { break; } } if (i == NULL) { /* no registered handler for this message */ talloc_free(m); return; } /* allocate space for the structure */ r = talloc_zero_size(m->ndr, i->table->calls[m->header.callnum].struct_size); if (r == NULL) goto failed; /* parse the request data */ ndr_err = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r); if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed; /* make the call */ m->private = i->private; m->defer_reply = false; m->msg_ctx = msg_ctx; m->irpc = i; m->data = r; m->ev = msg_ctx->event.ev; m->header.status = i->fn(m, r); if (m->defer_reply) { /* the server function has asked to defer the reply to later */ talloc_steal(msg_ctx, m); return; } irpc_send_reply(m, m->header.status); return;failed: talloc_free(m);}/* handle an incoming irpc message*/static void irpc_handler(struct messaging_context *msg_ctx, void *private, uint32_t msg_type, struct server_id src, DATA_BLOB *packet){ struct irpc_message *m; enum ndr_err_code ndr_err; m = talloc(msg_ctx, struct irpc_message); if (m == NULL) goto failed; m->from = src; m->ndr = ndr_pull_init_blob(packet, m, msg_ctx->iconv_convenience); if (m->ndr == NULL) goto failed; m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC; ndr_err = ndr_pull_irpc_header(m->ndr, NDR_BUFFERS|NDR_SCALARS, &m->header); if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed; if (m->header.flags & IRPC_FLAG_REPLY) { irpc_handler_reply(msg_ctx, m); } else { irpc_handler_request(msg_ctx, m); } return;failed: talloc_free(m);}/* destroy a irpc request*/static int irpc_destructor(struct irpc_request *irpc){ if (irpc->callid != -1) { idr_remove(irpc->msg_ctx->idr, irpc->callid); irpc->callid = -1; } if (irpc->reject_free) { return -1; } return 0;}/* timeout a irpc request*/static void irpc_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private){ struct irpc_request *irpc = talloc_get_type(private, struct irpc_request); irpc->status = NT_STATUS_IO_TIMEOUT; irpc->done = true; if (irpc->async.fn) { irpc->async.fn(irpc); }}/* make a irpc call - async send*/struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx, struct server_id server_id, const struct ndr_interface_table *table, int callnum, void *r, TALLOC_CTX *ctx){ struct irpc_header header; struct ndr_push *ndr; NTSTATUS status; DATA_BLOB packet; struct irpc_request *irpc; enum ndr_err_code ndr_err; irpc = talloc(msg_ctx, struct irpc_request); if (irpc == NULL) goto failed; irpc->msg_ctx = msg_ctx; irpc->table = table; irpc->callnum = callnum; irpc->callid = idr_get_new(msg_ctx->idr, irpc, UINT16_MAX); if (irpc->callid == -1) goto failed; irpc->r = r; irpc->done = false; irpc->async.fn = NULL; irpc->mem_ctx = ctx; irpc->reject_free = false; talloc_set_destructor(irpc, irpc_destructor); /* setup the header */ header.uuid = table->syntax_id.uuid; header.if_version = table->syntax_id.if_version; header.callid = irpc->callid; header.callnum = callnum; header.flags = 0; header.status = NT_STATUS_OK; /* construct the irpc packet */ ndr = ndr_push_init_ctx(irpc, msg_ctx->iconv_convenience); if (ndr == NULL) goto failed; ndr_err = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header); if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed; ndr_err = table->calls[callnum].ndr_push(ndr, NDR_IN, r); if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed; /* and send it */ packet = ndr_push_blob(ndr); status = messaging_send(msg_ctx, server_id, MSG_IRPC, &packet); if (!NT_STATUS_IS_OK(status)) goto failed; event_add_timed(msg_ctx->event.ev, irpc, timeval_current_ofs(IRPC_CALL_TIMEOUT, 0), irpc_timeout, irpc); talloc_free(ndr); return irpc;failed: talloc_free(irpc); return NULL;}/* wait for a irpc reply*/NTSTATUS irpc_call_recv(struct irpc_request *irpc){ NTSTATUS status; NT_STATUS_HAVE_NO_MEMORY(irpc); irpc->reject_free = true; while (!irpc->done) { if (event_loop_once(irpc->msg_ctx->event.ev) != 0) { return NT_STATUS_CONNECTION_DISCONNECTED; } } irpc->reject_free = false; status = irpc->status; talloc_free(irpc); return status;}/* perform a synchronous irpc request*/NTSTATUS irpc_call(struct messaging_context *msg_ctx, struct server_id server_id, const struct ndr_interface_table *table, int callnum, void *r, TALLOC_CTX *mem_ctx){ struct irpc_request *irpc = irpc_call_send(msg_ctx, server_id, table, callnum, r, mem_ctx); return irpc_call_recv(irpc);}/* open the naming database*/static struct tdb_wrap *irpc_namedb_open(struct messaging_context *msg_ctx){ struct tdb_wrap *t; char *path = talloc_asprintf(msg_ctx, "%s/names.tdb", msg_ctx->base_path); if (path == NULL) { return NULL; } t = tdb_wrap_open(msg_ctx, path, 0, 0, O_RDWR|O_CREAT, 0660); talloc_free(path); return t;} /* add a string name that this irpc server can be called on*/NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name){ struct tdb_wrap *t; TDB_DATA rec; int count; NTSTATUS status = NT_STATUS_OK; t = irpc_namedb_open(msg_ctx); NT_STATUS_HAVE_NO_MEMORY(t); if (tdb_lock_bystring(t->tdb, name) != 0) { talloc_free(t); return NT_STATUS_LOCK_NOT_GRANTED; } rec = tdb_fetch_bystring(t->tdb, name); count = rec.dsize / sizeof(struct server_id); rec.dptr = (unsigned char *)realloc_p(rec.dptr, struct server_id, count+1); rec.dsize += sizeof(struct server_id); if (rec.dptr == NULL) { tdb_unlock_bystring(t->tdb, name); talloc_free(t); return NT_STATUS_NO_MEMORY; } ((struct server_id *)rec.dptr)[count] = msg_ctx->server_id; if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) { status = NT_STATUS_INTERNAL_ERROR; } free(rec.dptr); tdb_unlock_bystring(t->tdb, name); talloc_free(t); msg_ctx->names = str_list_add(msg_ctx->names, name); talloc_steal(msg_ctx, msg_ctx->names); return status;}/* return a list of server ids for a server name*/struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx, TALLOC_CTX *mem_ctx, const char *name){ struct tdb_wrap *t; TDB_DATA rec; int count, i; struct server_id *ret; t = irpc_namedb_open(msg_ctx); if (t == NULL) { return NULL; } if (tdb_lock_bystring(t->tdb, name) != 0) { talloc_free(t); return NULL; } rec = tdb_fetch_bystring(t->tdb, name); if (rec.dptr == NULL) { tdb_unlock_bystring(t->tdb, name); talloc_free(t); return NULL; } count = rec.dsize / sizeof(struct server_id); ret = talloc_array(mem_ctx, struct server_id, count+1); if (ret == NULL) { tdb_unlock_bystring(t->tdb, name); talloc_free(t); return NULL; } for (i=0;i<count;i++) { ret[i] = ((struct server_id *)rec.dptr)[i]; } ret[i] = cluster_id(0, 0); free(rec.dptr); tdb_unlock_bystring(t->tdb, name); talloc_free(t); return ret;}/* remove a name from a messaging context*/void irpc_remove_name(struct messaging_context *msg_ctx, const char *name){ struct tdb_wrap *t; TDB_DATA rec; int count, i; struct server_id *ids; str_list_remove(msg_ctx->names, name); t = irpc_namedb_open(msg_ctx); if (t == NULL) { return; } if (tdb_lock_bystring(t->tdb, name) != 0) { talloc_free(t); return; } rec = tdb_fetch_bystring(t->tdb, name); if (rec.dptr == NULL) { tdb_unlock_bystring(t->tdb, name); talloc_free(t); return; } count = rec.dsize / sizeof(struct server_id); if (count == 0) { free(rec.dptr); tdb_unlock_bystring(t->tdb, name); talloc_free(t); return; } ids = (struct server_id *)rec.dptr; for (i=0;i<count;i++) { if (cluster_id_equal(&ids[i], &msg_ctx->server_id)) { if (i < count-1) { memmove(ids+i, ids+i+1, sizeof(struct server_id) * (count-(i+1))); } rec.dsize -= sizeof(struct server_id); break; } } tdb_store_bystring(t->tdb, name, rec, 0); free(rec.dptr); tdb_unlock_bystring(t->tdb, name); talloc_free(t);}struct server_id messaging_get_server_id(struct messaging_context *msg_ctx){ return msg_ctx->server_id;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -