⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ctdb_client.c

📁 samba最新软件
💻 C
📖 第 1 页 / 共 4 页
字号:
/*    ctdb daemon code   Copyright (C) Andrew Tridgell  2007   Copyright (C) Ronnie Sahlberg  2007   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 3 of the License, or   (at your option) any later version.      This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.      You should have received a copy of the GNU General Public License   along with this program; if not, see <http://www.gnu.org/licenses/>.*/#include "includes.h"#include "tdb_wrap.h"#include "lib/tdb/include/tdb.h"#include "lib/util/dlinklist.h"#include "lib/events/events.h"#include "system/network.h"#include "system/filesys.h"#include "../include/ctdb_private.h"#include "lib/util/dlinklist.h"/*  allocate a packet for use in client<->daemon communication */struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,					    TALLOC_CTX *mem_ctx, 					    enum ctdb_operation operation, 					    size_t length, size_t slength,					    const char *type){	int size;	struct ctdb_req_header *hdr;	length = MAX(length, slength);	size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);	hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);	if (hdr == NULL) {		DEBUG(0,("Unable to allocate packet for operation %u of length %u\n",			 operation, (unsigned)length));		return NULL;	}	talloc_set_name_const(hdr, type);	memset(hdr, 0, slength);	hdr->length       = length;	hdr->operation    = operation;	hdr->ctdb_magic   = CTDB_MAGIC;	hdr->ctdb_version = CTDB_VERSION;	hdr->srcnode      = ctdb->vnn;	if (ctdb->vnn_map) {		hdr->generation = ctdb->vnn_map->generation;	}	return hdr;}/*  local version of ctdb_call*/int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,		    struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,		    TDB_DATA *data, uint32_t caller){	struct ctdb_call_info *c;	struct ctdb_registered_call *fn;	struct ctdb_context *ctdb = ctdb_db->ctdb;		c = talloc(ctdb, struct ctdb_call_info);	CTDB_NO_MEMORY(ctdb, c);	c->key = call->key;	c->call_data = &call->call_data;	c->record_data.dptr = (unsigned char *)talloc_memdup(c, data->dptr, data->dsize);	c->record_data.dsize = data->dsize;	CTDB_NO_MEMORY(ctdb, c->record_data.dptr);	c->new_data = NULL;	c->reply_data = NULL;	c->status = 0;	for (fn=ctdb_db->calls;fn;fn=fn->next) {		if (fn->id == call->call_id) break;	}	if (fn == NULL) {		ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);		talloc_free(c);		return -1;	}	if (fn->fn(c) != 0) {		ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);		talloc_free(c);		return -1;	}	if (header->laccessor != caller) {		header->lacount = 0;	}	header->laccessor = caller;	header->lacount++;	/* we need to force the record to be written out if this was a remote access,	   so that the lacount is updated */	if (c->new_data == NULL && header->laccessor != ctdb->vnn) {		c->new_data = &c->record_data;	}	if (c->new_data) {		/* XXX check that we always have the lock here? */		if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {			ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");			talloc_free(c);			return -1;		}	}	if (c->reply_data) {		call->reply_data = *c->reply_data;		talloc_steal(ctdb, call->reply_data.dptr);		talloc_set_name_const(call->reply_data.dptr, __location__);	} else {		call->reply_data.dptr = NULL;		call->reply_data.dsize = 0;	}	call->status = c->status;	talloc_free(c);	return 0;}/*  queue a packet for sending from client to daemon*/static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr){	return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);}/*  state of a in-progress ctdb call in client*/struct ctdb_client_call_state {	enum call_state state;	uint32_t reqid;	struct ctdb_db_context *ctdb_db;	struct ctdb_call call;};/*  called when a CTDB_REPLY_CALL packet comes in in the client  This packet comes in response to a CTDB_REQ_CALL request packet. It  contains any reply data from the call*/static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr){	struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;	struct ctdb_client_call_state *state;	state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);	if (state == NULL) {		DEBUG(0,(__location__ " reqid %u not found\n", hdr->reqid));		return;	}	if (hdr->reqid != state->reqid) {		/* we found a record  but it was the wrong one */		DEBUG(0, ("Dropped client call reply with reqid:%u\n",hdr->reqid));		return;	}	state->call.reply_data.dptr = c->data;	state->call.reply_data.dsize = c->datalen;	state->call.status = c->status;	talloc_steal(state, c);	state->state = CTDB_CALL_DONE;}static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);/*  this is called in the client, when data comes in from the daemon */static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args){	struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);	struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;	TALLOC_CTX *tmp_ctx;	/* place the packet as a child of a tmp_ctx. We then use	   talloc_free() below to free it. If any of the calls want	   to keep it, then they will steal it somewhere else, and the	   talloc_free() will be a no-op */	tmp_ctx = talloc_new(ctdb);	talloc_steal(tmp_ctx, hdr);	if (cnt == 0) {		DEBUG(2,("Daemon has exited - shutting down client\n"));		exit(0);	}	if (cnt < sizeof(*hdr)) {		DEBUG(0,("Bad packet length %u in client\n", (unsigned)cnt));		goto done;	}	if (cnt != hdr->length) {		ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 			       (unsigned)hdr->length, (unsigned)cnt);		goto done;	}	if (hdr->ctdb_magic != CTDB_MAGIC) {		ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");		goto done;	}	if (hdr->ctdb_version != CTDB_VERSION) {		ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);		goto done;	}	switch (hdr->operation) {	case CTDB_REPLY_CALL:		ctdb_client_reply_call(ctdb, hdr);		break;	case CTDB_REQ_MESSAGE:		ctdb_request_message(ctdb, hdr);		break;	case CTDB_REPLY_CONTROL:		ctdb_client_reply_control(ctdb, hdr);		break;	default:		DEBUG(0,("bogus operation code:%u\n",hdr->operation));	}done:	talloc_free(tmp_ctx);}/*  connect to a unix domain socket*/int ctdb_socket_connect(struct ctdb_context *ctdb){	struct sockaddr_un addr;	memset(&addr, 0, sizeof(addr));	addr.sun_family = AF_UNIX;	strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));	ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);	if (ctdb->daemon.sd == -1) {		return -1;	}	set_nonblocking(ctdb->daemon.sd);	set_close_on_exec(ctdb->daemon.sd);		if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {		close(ctdb->daemon.sd);		ctdb->daemon.sd = -1;		return -1;	}	ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 					      CTDB_DS_ALIGNMENT, 					      ctdb_client_read_cb, ctdb);	return 0;}struct ctdb_record_handle {	struct ctdb_db_context *ctdb_db;	TDB_DATA key;	TDB_DATA *data;	struct ctdb_ltdb_header header;};/*  make a recv call to the local ctdb daemon - called from client context  This is called when the program wants to wait for a ctdb_call to complete and get the   results. This call will block unless the call has already completed.*/int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call){	while (state->state < CTDB_CALL_DONE) {		event_loop_once(state->ctdb_db->ctdb->ev);	}	if (state->state != CTDB_CALL_DONE) {		DEBUG(0,(__location__ " ctdb_call_recv failed\n"));		talloc_free(state);		return -1;	}	if (state->call.reply_data.dsize) {		call->reply_data.dptr = (unsigned char *)talloc_memdup(					state->ctdb_db,					state->call.reply_data.dptr,					state->call.reply_data.dsize);		call->reply_data.dsize = state->call.reply_data.dsize;	} else {		call->reply_data.dptr = NULL;		call->reply_data.dsize = 0;	}	call->status = state->call.status;	talloc_free(state);	return 0;}/*  destroy a ctdb_call in client*/static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)	{	ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);	return 0;}/*  construct an event driven local ctdb_call  this is used so that locally processed ctdb_call requests are processed  in an event driven manner*/static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 								  struct ctdb_call *call,								  struct ctdb_ltdb_header *header,								  TDB_DATA *data){	struct ctdb_client_call_state *state;	struct ctdb_context *ctdb = ctdb_db->ctdb;	int ret;	state = talloc_zero(ctdb_db, struct ctdb_client_call_state);	CTDB_NO_MEMORY_NULL(ctdb, state);	talloc_steal(state, data->dptr);	state->state = CTDB_CALL_DONE;	state->call = *call;	state->ctdb_db = ctdb_db;	ret = ctdb_call_local(ctdb_db, &state->call, header, state, data, ctdb->vnn);	return state;}/*  make a ctdb call to the local daemon - async send. Called from client context.  This constructs a ctdb_call request and queues it for processing.   This call never blocks.*/struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 				       struct ctdb_call *call){	struct ctdb_client_call_state *state;	struct ctdb_context *ctdb = ctdb_db->ctdb;	struct ctdb_ltdb_header header;	TDB_DATA data;	int ret;	size_t len;	struct ctdb_req_call *c;	/* if the domain socket is not yet open, open it */	if (ctdb->daemon.sd==-1) {		ctdb_socket_connect(ctdb);	}	ret = ctdb_ltdb_lock(ctdb_db, call->key);	if (ret != 0) {		DEBUG(0,(__location__ " Failed to get chainlock\n"));		return NULL;	}	ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);	if (ret == 0 && header.dmaster == ctdb->vnn) {		state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);		talloc_free(data.dptr);		ctdb_ltdb_unlock(ctdb_db, call->key);		return state;	}	ctdb_ltdb_unlock(ctdb_db, call->key);	talloc_free(data.dptr);	state = talloc_zero(ctdb_db, struct ctdb_client_call_state);	if (state == NULL) {		DEBUG(0, (__location__ " failed to allocate state\n"));		return NULL;	}	len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;	c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);	if (c == NULL) {		DEBUG(0, (__location__ " failed to allocate packet\n"));		return NULL;	}	state->reqid     = ctdb_reqid_new(ctdb, state);	state->ctdb_db = ctdb_db;	talloc_set_destructor(state, ctdb_client_call_destructor);	c->hdr.reqid     = state->reqid;	c->flags         = call->flags;	c->db_id         = ctdb_db->db_id;	c->callid        = call->call_id;	c->hopcount      = 0;	c->keylen        = call->key.dsize;	c->calldatalen   = call->call_data.dsize;	memcpy(&c->data[0], call->key.dptr, call->key.dsize);	memcpy(&c->data[call->key.dsize], 	       call->call_data.dptr, call->call_data.dsize);	state->call                = *call;	state->call.call_data.dptr = &c->data[call->key.dsize];	state->call.key.dptr       = &c->data[0];	state->state  = CTDB_CALL_WAIT;	ctdb_client_queue_pkt(ctdb, &c->hdr);	return state;}/*  full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()*/int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call){	struct ctdb_client_call_state *state;	state = ctdb_call_send(ctdb_db, call);	return ctdb_call_recv(state, call);}/*  tell the daemon what messaging srvid we will use, and register the message  handler function in the client*/int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 			     ctdb_message_fn_t handler,			     void *private_data)				    {	int res;	int32_t status;		res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 			   tdb_null, NULL, NULL, &status, NULL, NULL);	if (res != 0 || status != 0) {		DEBUG(0,("Failed to register srvid %llu\n", (unsigned long long)srvid));		return -1;	}	/* also need to register the handler with our own ctdb structure */	return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);}/*  tell the daemon we no longer want a srvid*/static int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data){	int res;	int32_t status;		res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 			   tdb_null, NULL, NULL, &status, NULL, NULL);	if (res != 0 || status != 0) {		DEBUG(0,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));		return -1;	}	/* also need to register the handler with our own ctdb structure */	ctdb_deregister_message_handler(ctdb, srvid, private_data);	return 0;}/*  send a message - from client context */int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,		      uint64_t srvid, TDB_DATA data){	struct ctdb_req_message *r;	int len, res;	len = offsetof(struct ctdb_req_message, data) + data.dsize;	r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 			       len, struct ctdb_req_message);	CTDB_NO_MEMORY(ctdb, r);	r->hdr.destnode  = vnn;	r->srvid         = srvid;	r->datalen       = data.dsize;	memcpy(&r->data[0], data.dptr, data.dsize);		res = ctdb_client_queue_pkt(ctdb, &r->hdr);	if (res != 0) {		return res;	}	talloc_free(r);	return 0;}/*  cancel a ctdb_fetch_lock operation, releasing the lock */static int fetch_lock_destructor(struct ctdb_record_handle *h){	ctdb_ltdb_unlock(h->ctdb_db, h->key);	return 0;}/*  force the migration of a record to this node */static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key){	struct ctdb_call call;	ZERO_STRUCT(call);	call.call_id = CTDB_NULL_FUNC;	call.key = key;	call.flags = CTDB_IMMEDIATE_MIGRATION;	return ctdb_call(ctdb_db, &call);}/*

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -