📄 ctdb_daemon.c
字号:
/* ctdb daemon code Copyright (C) Andrew Tridgell 2006 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, see <http://www.gnu.org/licenses/>.*/#include "includes.h"#include "lib/tdb/include/tdb.h"#include "lib/events/events.h"#include "lib/util/dlinklist.h"#include "system/network.h"#include "system/filesys.h"#include "system/wait.h"#include "../include/ctdb.h"#include "../include/ctdb_private.h"static void daemon_incoming_packet(void *, struct ctdb_req_header *);/* handler for when a node changes its flags*/static void flag_change_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *private_data){ struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr; if (data.dsize != sizeof(*c) || !ctdb_validate_vnn(ctdb, c->vnn)) { DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n")); return; } if (!ctdb_validate_vnn(ctdb, c->vnn)) { DEBUG(0,("Bad vnn %u in flag_change_handler\n", c->vnn)); return; } /* don't get the disconnected flag from the other node */ ctdb->nodes[c->vnn]->flags = (ctdb->nodes[c->vnn]->flags&NODE_FLAGS_DISCONNECTED) | (c->flags & ~NODE_FLAGS_DISCONNECTED); DEBUG(2,("Node flags for node %u are now 0x%x\n", c->vnn, ctdb->nodes[c->vnn]->flags)); /* make sure we don't hold any IPs when we shouldn't */ if (c->vnn == ctdb->vnn && (ctdb->nodes[c->vnn]->flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_BANNED))) { ctdb_release_all_ips(ctdb); }}/* called when the "startup" event script has finished */static void ctdb_start_transport(struct ctdb_context *ctdb, int status, void *p){ if (status != 0) { DEBUG(0,("startup event failed!\n")); ctdb_fatal(ctdb, "startup event script failed"); } /* start the transport running */ if (ctdb->methods->start(ctdb) != 0) { DEBUG(0,("transport failed to start!\n")); ctdb_fatal(ctdb, "transport failed to start"); } /* start the recovery daemon process */ if (ctdb_start_recoverd(ctdb) != 0) { DEBUG(0,("Failed to start recovery daemon\n")); exit(11); } /* a handler for when nodes are disabled/enabled */ ctdb_register_message_handler(ctdb, ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, flag_change_handler, NULL); /* start monitoring for dead nodes */ ctdb_start_monitoring(ctdb);}/* go into main ctdb loop */static void ctdb_main_loop(struct ctdb_context *ctdb){ int ret = -1; if (strcmp(ctdb->transport, "tcp") == 0) { int ctdb_tcp_init(struct ctdb_context *); ret = ctdb_tcp_init(ctdb); }#ifdef USE_INFINIBAND if (strcmp(ctdb->transport, "ib") == 0) { int ctdb_ibw_init(struct ctdb_context *); ret = ctdb_ibw_init(ctdb); }#endif if (ret != 0) { DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport)); return; } /* initialise the transport */ if (ctdb->methods->initialise(ctdb) != 0) { DEBUG(0,("transport failed to initialise!\n")); ctdb_fatal(ctdb, "transport failed to initialise"); } /* tell all other nodes we've just started up */ ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0, CTDB_CONTROL_STARTUP, 0, CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL); /* release any IPs we hold from previous runs of the daemon */ ctdb_release_all_ips(ctdb); ret = ctdb_event_script_callback(ctdb, timeval_zero(), ctdb, ctdb_start_transport, NULL, "startup"); if (ret != 0) { DEBUG(0,("Failed startup event script\n")); return; } /* go into a wait loop to allow other nodes to complete */ event_loop_wait(ctdb->ev); DEBUG(0,("event_loop_wait() returned. this should not happen\n")); exit(1);}static void block_signal(int signum){ struct sigaction act; memset(&act, 0, sizeof(act)); act.sa_handler = SIG_IGN; sigemptyset(&act.sa_mask); sigaddset(&act.sa_mask, signum); sigaction(signum, &act, NULL);}/* send a packet to a client */static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr){ client->ctdb->statistics.client_packets_sent++; return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);}/* message handler for when we are in daemon mode. This redirects the message to the right client */static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *private_data){ struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client); struct ctdb_req_message *r; int len; /* construct a message to send to the client containing the data */ len = offsetof(struct ctdb_req_message, data) + data.dsize; r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, len, struct ctdb_req_message); CTDB_NO_MEMORY_VOID(ctdb, r); talloc_set_name_const(r, "req_message packet"); r->srvid = srvid; r->datalen = data.dsize; memcpy(&r->data[0], data.dptr, data.dsize); daemon_queue_send(client, &r->hdr); talloc_free(r);} /* this is called when the ctdb daemon received a ctdb request to set the srvid from the client */int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid){ struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); int res; if (client == NULL) { DEBUG(0,("Bad client_id in daemon_request_register_message_handler\n")); return -1; } res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client); if (res != 0) { DEBUG(0,(__location__ " Failed to register handler %llu in daemon\n", (unsigned long long)srvid)); } else { DEBUG(2,(__location__ " Registered message handler for srvid=%llu\n", (unsigned long long)srvid)); } /* this is a hack for Samba - we now know the pid of the Samba client */ if ((srvid & 0xFFFFFFFF) == srvid && kill(srvid, 0) == 0) { client->pid = srvid; DEBUG(0,(__location__ " Registered PID %u for client %u\n", (unsigned)client->pid, client_id)); } return res;}/* this is called when the ctdb daemon received a ctdb request to remove a srvid from the client */int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid){ struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); if (client == NULL) { DEBUG(0,("Bad client_id in daemon_request_deregister_message_handler\n")); return -1; } return ctdb_deregister_message_handler(ctdb, srvid, client);}/* destroy a ctdb_client*/static int ctdb_client_destructor(struct ctdb_client *client){ ctdb_takeover_client_destructor_hook(client); ctdb_reqid_remove(client->ctdb, client->client_id); client->ctdb->statistics.num_clients--; return 0;}/* this is called when the ctdb daemon received a ctdb request message from a local client over the unix domain socket */static void daemon_request_message_from_client(struct ctdb_client *client, struct ctdb_req_message *c){ TDB_DATA data; int res; /* maybe the message is for another client on this node */ if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) { ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c); return; } /* its for a remote node */ data.dptr = &c->data[0]; data.dsize = c->datalen; res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode, c->srvid, data); if (res != 0) { DEBUG(0,(__location__ " Failed to send message to remote node %u\n", c->hdr.destnode)); }}struct daemon_call_state { struct ctdb_client *client; uint32_t reqid; struct ctdb_call *call; struct timeval start_time;};/* complete a call from a client */static void daemon_call_from_client_callback(struct ctdb_call_state *state){ struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, struct daemon_call_state); struct ctdb_reply_call *r; int res; uint32_t length; struct ctdb_client *client = dstate->client; talloc_steal(client, dstate); talloc_steal(dstate, dstate->call); res = ctdb_daemon_call_recv(state, dstate->call); if (res != 0) { DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n")); client->ctdb->statistics.pending_calls--; ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time); return; } length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize; r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, length, struct ctdb_reply_call); if (r == NULL) { DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n")); client->ctdb->statistics.pending_calls--; ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time); return; } r->hdr.reqid = dstate->reqid; r->datalen = dstate->call->reply_data.dsize; memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen); res = daemon_queue_send(client, &r->hdr); if (res != 0) { DEBUG(0, (__location__ " Failed to queue packet from daemon to client\n")); } ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time); talloc_free(dstate); client->ctdb->statistics.pending_calls--;}static void daemon_request_call_from_client(struct ctdb_client *client, struct ctdb_req_call *c);/* this is called when the ctdb daemon received a ctdb request call from a local client over the unix domain socket */static void daemon_request_call_from_client(struct ctdb_client *client, struct ctdb_req_call *c){ struct ctdb_call_state *state; struct ctdb_db_context *ctdb_db; struct daemon_call_state *dstate; struct ctdb_call *call; struct ctdb_ltdb_header header; TDB_DATA key, data; int ret; struct ctdb_context *ctdb = client->ctdb; ctdb->statistics.total_calls++; ctdb->statistics.pending_calls++; ctdb_db = find_ctdb_db(client->ctdb, c->db_id); if (!ctdb_db) { DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x", c->db_id)); ctdb->statistics.pending_calls--; return; } key.dptr = c->data; key.dsize = c->keylen; ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, (struct ctdb_req_header *)c, &data, daemon_incoming_packet, client, True); if (ret == -2) { /* will retry later */ ctdb->statistics.pending_calls--; return; } if (ret != 0) { DEBUG(0,(__location__ " Unable to fetch record\n")); ctdb->statistics.pending_calls--; return; } dstate = talloc(client, struct daemon_call_state); if (dstate == NULL) { ctdb_ltdb_unlock(ctdb_db, key); DEBUG(0,(__location__ " Unable to allocate dstate\n")); ctdb->statistics.pending_calls--; return; } dstate->start_time = timeval_current(); dstate->client = client; dstate->reqid = c->hdr.reqid; talloc_steal(dstate, data.dptr); call = dstate->call = talloc_zero(dstate, struct ctdb_call); if (call == NULL) { ctdb_ltdb_unlock(ctdb_db, key); DEBUG(0,(__location__ " Unable to allocate call\n")); ctdb->statistics.pending_calls--; ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time); return; } call->call_id = c->callid; call->key = key; call->call_data.dptr = c->data + c->keylen; call->call_data.dsize = c->calldatalen; call->flags = c->flags; if (header.dmaster == ctdb->vnn) { state = ctdb_call_local_send(ctdb_db, call, &header, &data); } else { state = ctdb_daemon_call_send_remote(ctdb_db, call, &header); } ctdb_ltdb_unlock(ctdb_db, key); if (state == NULL) { DEBUG(0,(__location__ " Unable to setup call send\n")); ctdb->statistics.pending_calls--; ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time); return; } talloc_steal(state, dstate); talloc_steal(client, state); state->async.fn = daemon_call_from_client_callback; state->async.private_data = dstate;}static void daemon_request_control_from_client(struct ctdb_client *client, struct ctdb_req_control *c);/* data contains a packet from the client */static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr){ struct ctdb_client *client = talloc_get_type(p, struct ctdb_client); TALLOC_CTX *tmp_ctx; struct ctdb_context *ctdb = client->ctdb; /* place the packet as a child of a tmp_ctx. We then use talloc_free() below to free it. If any of the calls want to keep it, then they will steal it somewhere else, and the talloc_free() will be a no-op */ tmp_ctx = talloc_new(client); talloc_steal(tmp_ctx, hdr); if (hdr->ctdb_magic != CTDB_MAGIC) { ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n"); goto done; } if (hdr->ctdb_version != CTDB_VERSION) { ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version); goto done; } switch (hdr->operation) { case CTDB_REQ_CALL: ctdb->statistics.client.req_call++; daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr); break; case CTDB_REQ_MESSAGE: ctdb->statistics.client.req_message++; daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr); break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -