📄 ctdb_daemon.c
字号:
case CTDB_REQ_CONTROL: ctdb->statistics.client.req_control++; daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr); break; default: DEBUG(0,(__location__ " daemon: unrecognized operation %u\n", hdr->operation)); }done: talloc_free(tmp_ctx);}/* called when the daemon gets a incoming packet */static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args){ struct ctdb_client *client = talloc_get_type(args, struct ctdb_client); struct ctdb_req_header *hdr; if (cnt == 0) { talloc_free(client); return; } client->ctdb->statistics.client_packets_recv++; if (cnt < sizeof(*hdr)) { ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", (unsigned)cnt); return; } hdr = (struct ctdb_req_header *)data; if (cnt != hdr->length) { ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", (unsigned)hdr->length, (unsigned)cnt); return; } if (hdr->ctdb_magic != CTDB_MAGIC) { ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n"); return; } if (hdr->ctdb_version != CTDB_VERSION) { ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version); return; } DEBUG(3,(__location__ " client request %u of type %u length %u from " "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length, hdr->srcnode, hdr->destnode)); /* it is the responsibility of the incoming packet function to free 'data' */ daemon_incoming_packet(client, hdr);}static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, uint16_t flags, void *private_data){ struct sockaddr_in addr; socklen_t len; int fd; struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context); struct ctdb_client *client; memset(&addr, 0, sizeof(addr)); len = sizeof(addr); fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len); if (fd == -1) { return; } set_nonblocking(fd); set_close_on_exec(fd); client = talloc_zero(ctdb, struct ctdb_client); client->ctdb = ctdb; client->fd = fd; client->client_id = ctdb_reqid_new(ctdb, client); ctdb->statistics.num_clients++; client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, ctdb_daemon_read_cb, client); talloc_set_destructor(client, ctdb_client_destructor);}/* create a unix domain socket and bind it return a file descriptor open on the socket */static int ux_socket_bind(struct ctdb_context *ctdb){ struct sockaddr_un addr; ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0); if (ctdb->daemon.sd == -1) { return -1; } set_nonblocking(ctdb->daemon.sd); set_close_on_exec(ctdb->daemon.sd);#if 0 /* AIX doesn't like this :( */ if (fchown(ctdb->daemon.sd, geteuid(), getegid()) != 0 || fchmod(ctdb->daemon.sd, 0700) != 0) { DEBUG(0,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n")); goto failed; }#endif set_nonblocking(ctdb->daemon.sd); memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)); if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { DEBUG(0,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name)); goto failed; } if (listen(ctdb->daemon.sd, 10) != 0) { DEBUG(0,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name)); goto failed; } return 0;failed: close(ctdb->daemon.sd); ctdb->daemon.sd = -1; return -1; }/* delete the socket on exit - called on destruction of autofree context */static int unlink_destructor(const char *name){ unlink(name); return 0;}/* start the protocol going as a daemon*/int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork){ int res; struct fd_event *fde; const char *domain_socket_name; /* get rid of any old sockets */ unlink(ctdb->daemon.name); /* create a unix domain stream socket to listen to */ res = ux_socket_bind(ctdb); if (res!=0) { DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n")); exit(10); } if (do_fork && fork()) { return 0; } tdb_reopen_all(False); if (do_fork) { setsid(); } block_signal(SIGPIPE); /* try to set us up as realtime */ ctdb_set_realtime(true); /* ensure the socket is deleted on exit of the daemon */ domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name); talloc_set_destructor(domain_socket_name, unlink_destructor); ctdb->ev = event_context_init(NULL); /* start frozen, then let the first election sort things out */ if (!ctdb_blocking_freeze(ctdb)) { DEBUG(0,("Failed to get initial freeze\n")); exit(12); } /* force initial recovery for election */ ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE; /* now start accepting clients, only can do this once frozen */ fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, ctdb_accept_client, ctdb); ctdb_main_loop(ctdb); return 0;}/* allocate a packet for use in daemon<->daemon communication */struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, enum ctdb_operation operation, size_t length, size_t slength, const char *type){ int size; struct ctdb_req_header *hdr; length = MAX(length, slength); size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size); if (hdr == NULL) { DEBUG(0,("Unable to allocate transport packet for operation %u of length %u\n", operation, (unsigned)length)); return NULL; } talloc_set_name_const(hdr, type); memset(hdr, 0, slength); hdr->length = length; hdr->operation = operation; hdr->ctdb_magic = CTDB_MAGIC; hdr->ctdb_version = CTDB_VERSION; hdr->generation = ctdb->vnn_map->generation; hdr->srcnode = ctdb->vnn; return hdr; }struct daemon_control_state { struct daemon_control_state *next, *prev; struct ctdb_client *client; struct ctdb_req_control *c; uint32_t reqid; struct ctdb_node *node;};/* callback when a control reply comes in */static void daemon_control_callback(struct ctdb_context *ctdb, int32_t status, TDB_DATA data, const char *errormsg, void *private_data){ struct daemon_control_state *state = talloc_get_type(private_data, struct daemon_control_state); struct ctdb_client *client = state->client; struct ctdb_reply_control *r; size_t len; /* construct a message to send to the client containing the data */ len = offsetof(struct ctdb_reply_control, data) + data.dsize; if (errormsg) { len += strlen(errormsg); } r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, struct ctdb_reply_control); CTDB_NO_MEMORY_VOID(ctdb, r); r->hdr.reqid = state->reqid; r->status = status; r->datalen = data.dsize; r->errorlen = 0; memcpy(&r->data[0], data.dptr, data.dsize); if (errormsg) { r->errorlen = strlen(errormsg); memcpy(&r->data[r->datalen], errormsg, r->errorlen); } daemon_queue_send(client, &r->hdr); talloc_free(state);}/* fail all pending controls to a disconnected node */void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node){ struct daemon_control_state *state; while ((state = node->pending_controls)) { DLIST_REMOVE(node->pending_controls, state); daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, "node is disconnected", state); }}/* destroy a daemon_control_state */static int daemon_control_destructor(struct daemon_control_state *state){ if (state->node) { DLIST_REMOVE(state->node->pending_controls, state); } return 0;}/* this is called when the ctdb daemon received a ctdb request control from a local client over the unix domain socket */static void daemon_request_control_from_client(struct ctdb_client *client, struct ctdb_req_control *c){ TDB_DATA data; int res; struct daemon_control_state *state; TALLOC_CTX *tmp_ctx = talloc_new(client); if (c->hdr.destnode == CTDB_CURRENT_NODE) { c->hdr.destnode = client->ctdb->vnn; } state = talloc(client, struct daemon_control_state); CTDB_NO_MEMORY_VOID(client->ctdb, state); state->client = client; state->c = talloc_steal(state, c); state->reqid = c->hdr.reqid; if (ctdb_validate_vnn(client->ctdb, c->hdr.destnode)) { state->node = client->ctdb->nodes[c->hdr.destnode]; DLIST_ADD(state->node->pending_controls, state); } else { state->node = NULL; } talloc_set_destructor(state, daemon_control_destructor); if (c->flags & CTDB_CTRL_FLAG_NOREPLY) { talloc_steal(tmp_ctx, state); } data.dptr = &c->data[0]; data.dsize = c->datalen; res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode, c->srvid, c->opcode, client->client_id, c->flags, data, daemon_control_callback, state); if (res != 0) { DEBUG(0,(__location__ " Failed to send control to remote node %u\n", c->hdr.destnode)); } talloc_free(tmp_ctx);}/* register a call function*/int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id, ctdb_fn_t fn, int id){ struct ctdb_registered_call *call; struct ctdb_db_context *ctdb_db; ctdb_db = find_ctdb_db(ctdb, db_id); if (ctdb_db == NULL) { return -1; } call = talloc(ctdb_db, struct ctdb_registered_call); call->fn = fn; call->id = id; DLIST_ADD(ctdb_db->calls, call); return 0;}/* this local messaging handler is ugly, but is needed to prevent recursion in ctdb_send_message() when the destination node is the same as the source node */struct ctdb_local_message { struct ctdb_context *ctdb; uint64_t srvid; TDB_DATA data;};static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data){ struct ctdb_local_message *m = talloc_get_type(private_data, struct ctdb_local_message); int res; res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data); if (res != 0) { DEBUG(0, (__location__ " Failed to dispatch message for srvid=%llu\n", (unsigned long long)m->srvid)); } talloc_free(m);}static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data){ struct ctdb_local_message *m; m = talloc(ctdb, struct ctdb_local_message); CTDB_NO_MEMORY(ctdb, m); m->ctdb = ctdb; m->srvid = srvid; m->data = data; m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize); if (m->data.dptr == NULL) { talloc_free(m); return -1; } /* this needs to be done as an event to prevent recursion */ event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m); return 0;}/* send a ctdb message*/int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn, uint64_t srvid, TDB_DATA data){ struct ctdb_req_message *r; int len; /* see if this is a message to ourselves */ if (vnn == ctdb->vnn) { return ctdb_local_message(ctdb, srvid, data); } len = offsetof(struct ctdb_req_message, data) + data.dsize; r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len, struct ctdb_req_message); CTDB_NO_MEMORY(ctdb, r); r->hdr.destnode = vnn; r->srvid = srvid; r->datalen = data.dsize; memcpy(&r->data[0], data.dptr, data.dsize); ctdb_queue_packet(ctdb, &r->hdr); talloc_free(r); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -