📄 ctdb_client.c
字号:
/* ctdb daemon code Copyright (C) Andrew Tridgell 2007 Copyright (C) Ronnie Sahlberg 2007 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, see <http://www.gnu.org/licenses/>.*/#include "includes.h"#include "tdb_wrap.h"#include "lib/tdb/include/tdb.h"#include "lib/util/dlinklist.h"#include "lib/events/events.h"#include "system/network.h"#include "system/filesys.h"#include "../include/ctdb_private.h"#include "lib/util/dlinklist.h"/* allocate a packet for use in client<->daemon communication */struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, enum ctdb_operation operation, size_t length, size_t slength, const char *type){ int size; struct ctdb_req_header *hdr; length = MAX(length, slength); size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size); if (hdr == NULL) { DEBUG(0,("Unable to allocate packet for operation %u of length %u\n", operation, (unsigned)length)); return NULL; } talloc_set_name_const(hdr, type); memset(hdr, 0, slength); hdr->length = length; hdr->operation = operation; hdr->ctdb_magic = CTDB_MAGIC; hdr->ctdb_version = CTDB_VERSION; hdr->srcnode = ctdb->vnn; if (ctdb->vnn_map) { hdr->generation = ctdb->vnn_map->generation; } return hdr;}/* local version of ctdb_call*/int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call, struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx, TDB_DATA *data, uint32_t caller){ struct ctdb_call_info *c; struct ctdb_registered_call *fn; struct ctdb_context *ctdb = ctdb_db->ctdb; c = talloc(ctdb, struct ctdb_call_info); CTDB_NO_MEMORY(ctdb, c); c->key = call->key; c->call_data = &call->call_data; c->record_data.dptr = (unsigned char *)talloc_memdup(c, data->dptr, data->dsize); c->record_data.dsize = data->dsize; CTDB_NO_MEMORY(ctdb, c->record_data.dptr); c->new_data = NULL; c->reply_data = NULL; c->status = 0; for (fn=ctdb_db->calls;fn;fn=fn->next) { if (fn->id == call->call_id) break; } if (fn == NULL) { ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id); talloc_free(c); return -1; } if (fn->fn(c) != 0) { ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id); talloc_free(c); return -1; } if (header->laccessor != caller) { header->lacount = 0; } header->laccessor = caller; header->lacount++; /* we need to force the record to be written out if this was a remote access, so that the lacount is updated */ if (c->new_data == NULL && header->laccessor != ctdb->vnn) { c->new_data = &c->record_data; } if (c->new_data) { /* XXX check that we always have the lock here? */ if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) { ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n"); talloc_free(c); return -1; } } if (c->reply_data) { call->reply_data = *c->reply_data; talloc_steal(ctdb, call->reply_data.dptr); talloc_set_name_const(call->reply_data.dptr, __location__); } else { call->reply_data.dptr = NULL; call->reply_data.dsize = 0; } call->status = c->status; talloc_free(c); return 0;}/* queue a packet for sending from client to daemon*/static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr){ return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);}/* state of a in-progress ctdb call in client*/struct ctdb_client_call_state { enum call_state state; uint32_t reqid; struct ctdb_db_context *ctdb_db; struct ctdb_call call;};/* called when a CTDB_REPLY_CALL packet comes in in the client This packet comes in response to a CTDB_REQ_CALL request packet. It contains any reply data from the call*/static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr){ struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr; struct ctdb_client_call_state *state; state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state); if (state == NULL) { DEBUG(0,(__location__ " reqid %u not found\n", hdr->reqid)); return; } if (hdr->reqid != state->reqid) { /* we found a record but it was the wrong one */ DEBUG(0, ("Dropped client call reply with reqid:%u\n",hdr->reqid)); return; } state->call.reply_data.dptr = c->data; state->call.reply_data.dsize = c->datalen; state->call.status = c->status; talloc_steal(state, c); state->state = CTDB_CALL_DONE;}static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);/* this is called in the client, when data comes in from the daemon */static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args){ struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context); struct ctdb_req_header *hdr = (struct ctdb_req_header *)data; TALLOC_CTX *tmp_ctx; /* place the packet as a child of a tmp_ctx. We then use talloc_free() below to free it. If any of the calls want to keep it, then they will steal it somewhere else, and the talloc_free() will be a no-op */ tmp_ctx = talloc_new(ctdb); talloc_steal(tmp_ctx, hdr); if (cnt == 0) { DEBUG(2,("Daemon has exited - shutting down client\n")); exit(0); } if (cnt < sizeof(*hdr)) { DEBUG(0,("Bad packet length %u in client\n", (unsigned)cnt)); goto done; } if (cnt != hdr->length) { ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", (unsigned)hdr->length, (unsigned)cnt); goto done; } if (hdr->ctdb_magic != CTDB_MAGIC) { ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n"); goto done; } if (hdr->ctdb_version != CTDB_VERSION) { ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version); goto done; } switch (hdr->operation) { case CTDB_REPLY_CALL: ctdb_client_reply_call(ctdb, hdr); break; case CTDB_REQ_MESSAGE: ctdb_request_message(ctdb, hdr); break; case CTDB_REPLY_CONTROL: ctdb_client_reply_control(ctdb, hdr); break; default: DEBUG(0,("bogus operation code:%u\n",hdr->operation)); }done: talloc_free(tmp_ctx);}/* connect to a unix domain socket*/int ctdb_socket_connect(struct ctdb_context *ctdb){ struct sockaddr_un addr; memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)); ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0); if (ctdb->daemon.sd == -1) { return -1; } set_nonblocking(ctdb->daemon.sd); set_close_on_exec(ctdb->daemon.sd); if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { close(ctdb->daemon.sd); ctdb->daemon.sd = -1; return -1; } ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, CTDB_DS_ALIGNMENT, ctdb_client_read_cb, ctdb); return 0;}struct ctdb_record_handle { struct ctdb_db_context *ctdb_db; TDB_DATA key; TDB_DATA *data; struct ctdb_ltdb_header header;};/* make a recv call to the local ctdb daemon - called from client context This is called when the program wants to wait for a ctdb_call to complete and get the results. This call will block unless the call has already completed.*/int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call){ while (state->state < CTDB_CALL_DONE) { event_loop_once(state->ctdb_db->ctdb->ev); } if (state->state != CTDB_CALL_DONE) { DEBUG(0,(__location__ " ctdb_call_recv failed\n")); talloc_free(state); return -1; } if (state->call.reply_data.dsize) { call->reply_data.dptr = (unsigned char *)talloc_memdup( state->ctdb_db, state->call.reply_data.dptr, state->call.reply_data.dsize); call->reply_data.dsize = state->call.reply_data.dsize; } else { call->reply_data.dptr = NULL; call->reply_data.dsize = 0; } call->status = state->call.status; talloc_free(state); return 0;}/* destroy a ctdb_call in client*/static int ctdb_client_call_destructor(struct ctdb_client_call_state *state) { ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid); return 0;}/* construct an event driven local ctdb_call this is used so that locally processed ctdb_call requests are processed in an event driven manner*/static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call, struct ctdb_ltdb_header *header, TDB_DATA *data){ struct ctdb_client_call_state *state; struct ctdb_context *ctdb = ctdb_db->ctdb; int ret; state = talloc_zero(ctdb_db, struct ctdb_client_call_state); CTDB_NO_MEMORY_NULL(ctdb, state); talloc_steal(state, data->dptr); state->state = CTDB_CALL_DONE; state->call = *call; state->ctdb_db = ctdb_db; ret = ctdb_call_local(ctdb_db, &state->call, header, state, data, ctdb->vnn); return state;}/* make a ctdb call to the local daemon - async send. Called from client context. This constructs a ctdb_call request and queues it for processing. This call never blocks.*/struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call){ struct ctdb_client_call_state *state; struct ctdb_context *ctdb = ctdb_db->ctdb; struct ctdb_ltdb_header header; TDB_DATA data; int ret; size_t len; struct ctdb_req_call *c; /* if the domain socket is not yet open, open it */ if (ctdb->daemon.sd==-1) { ctdb_socket_connect(ctdb); } ret = ctdb_ltdb_lock(ctdb_db, call->key); if (ret != 0) { DEBUG(0,(__location__ " Failed to get chainlock\n")); return NULL; } ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data); if (ret == 0 && header.dmaster == ctdb->vnn) { state = ctdb_client_call_local_send(ctdb_db, call, &header, &data); talloc_free(data.dptr); ctdb_ltdb_unlock(ctdb_db, call->key); return state; } ctdb_ltdb_unlock(ctdb_db, call->key); talloc_free(data.dptr); state = talloc_zero(ctdb_db, struct ctdb_client_call_state); if (state == NULL) { DEBUG(0, (__location__ " failed to allocate state\n")); return NULL; } len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize; c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call); if (c == NULL) { DEBUG(0, (__location__ " failed to allocate packet\n")); return NULL; } state->reqid = ctdb_reqid_new(ctdb, state); state->ctdb_db = ctdb_db; talloc_set_destructor(state, ctdb_client_call_destructor); c->hdr.reqid = state->reqid; c->flags = call->flags; c->db_id = ctdb_db->db_id; c->callid = call->call_id; c->hopcount = 0; c->keylen = call->key.dsize; c->calldatalen = call->call_data.dsize; memcpy(&c->data[0], call->key.dptr, call->key.dsize); memcpy(&c->data[call->key.dsize], call->call_data.dptr, call->call_data.dsize); state->call = *call; state->call.call_data.dptr = &c->data[call->key.dsize]; state->call.key.dptr = &c->data[0]; state->state = CTDB_CALL_WAIT; ctdb_client_queue_pkt(ctdb, &c->hdr); return state;}/* full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()*/int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call){ struct ctdb_client_call_state *state; state = ctdb_call_send(ctdb_db, call); return ctdb_call_recv(state, call);}/* tell the daemon what messaging srvid we will use, and register the message handler function in the client*/int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, ctdb_message_fn_t handler, void *private_data) { int res; int32_t status; res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, tdb_null, NULL, NULL, &status, NULL, NULL); if (res != 0 || status != 0) { DEBUG(0,("Failed to register srvid %llu\n", (unsigned long long)srvid)); return -1; } /* also need to register the handler with our own ctdb structure */ return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);}/* tell the daemon we no longer want a srvid*/static int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data){ int res; int32_t status; res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, tdb_null, NULL, NULL, &status, NULL, NULL); if (res != 0 || status != 0) { DEBUG(0,("Failed to deregister srvid %llu\n", (unsigned long long)srvid)); return -1; } /* also need to register the handler with our own ctdb structure */ ctdb_deregister_message_handler(ctdb, srvid, private_data); return 0;}/* send a message - from client context */int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, uint64_t srvid, TDB_DATA data){ struct ctdb_req_message *r; int len, res; len = offsetof(struct ctdb_req_message, data) + data.dsize; r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, len, struct ctdb_req_message); CTDB_NO_MEMORY(ctdb, r); r->hdr.destnode = vnn; r->srvid = srvid; r->datalen = data.dsize; memcpy(&r->data[0], data.dptr, data.dsize); res = ctdb_client_queue_pkt(ctdb, &r->hdr); if (res != 0) { return res; } talloc_free(r); return 0;}/* cancel a ctdb_fetch_lock operation, releasing the lock */static int fetch_lock_destructor(struct ctdb_record_handle *h){ ctdb_ltdb_unlock(h->ctdb_db, h->key); return 0;}/* force the migration of a record to this node */static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key){ struct ctdb_call call; ZERO_STRUCT(call); call.call_id = CTDB_NULL_FUNC; call.key = key; call.flags = CTDB_IMMEDIATE_MIGRATION; return ctdb_call(ctdb_db, &call);}/*
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -