mnesia_controller.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 2,103 行 · 第 1/5 页
ERL
2,103 行
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %% $Id$%%%% The mnesia_init process loads tables from local disc or from%% another nodes. It also coordinates updates of the info about%% where we can read and write tables.%%%% Tables may need to be loaded initially at startup of the local%% node or when other nodes announces that they already have loaded%% tables that we also want.%%%% Initially we set the load request queue to those tables that we%% safely can load locally, i.e. tables where we have the last%% consistent replica and we have received mnesia_down from all%% other nodes holding the table. Then we let the mnesia_init%% process enter its normal working state.%%%% When we need to load a table we append a request to the load%% request queue. All other requests are regarded as high priority%% and are processed immediately (e.g. update table whereabouts).%% We processes the load request queue as a "background" job..-module(mnesia_controller).-behaviour(gen_server).%% Mnesia internal stuff-export([ start/0, i_have_tab/1, info/0, get_info/1, get_workers/1, force_load_table/1, async_dump_log/1, sync_dump_log/1, connect_nodes/1, wait_for_schema_commit_lock/0, release_schema_commit_lock/0, create_table/1, get_disc_copy/1, get_cstructs/0, sync_and_block_table_whereabouts/4, sync_del_table_copy_whereabouts/2, block_table/1, unblock_table/1, block_controller/0, unblock_controller/0, unannounce_add_table_copy/2, master_nodes_updated/2, mnesia_down/1, add_active_replica/2, add_active_replica/3, add_active_replica/4, update/1, change_table_access_mode/1, del_active_replica/2, wait_for_tables/2, get_network_copy/2, merge_schema/0, start_remote_sender/4, schedule_late_disc_load/2 ]).%% gen_server callbacks-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).%% Module internal stuff-export([call/1, cast/1, dump_and_reply/2, load_and_reply/2, send_and_reply/2, wait_for_tables_init/2, connect_nodes2/2 ]).-import(mnesia_lib, [set/2, add/2]).-import(mnesia_lib, [fatal/2, error/2, verbose/2, dbg_out/2]).-include("mnesia.hrl").-define(SERVER_NAME, ?MODULE). -record(state, {supervisor, schema_is_merged = false, early_msgs = [], loader_pid = [], %% Was Pid is now [{Pid,Work}|..] loader_queue, %% Was list is now gb_tree sender_pid = [], %% Was a pid or undef is now [{Pid,Work}|..] sender_queue = [], late_loader_queue, %% Was list is now gb_tree dumper_pid, %% Dumper or schema commit pid dumper_queue = [], %% Dumper or schema commit queue others = [], %% Processes that needs the copier_done msg dump_log_timer_ref, is_stopping = false }).%% Backwards Comp. Sender_pid is now a list of senders..get_senders(#state{sender_pid = Pids}) when list(Pids) -> Pids. %% Backwards Comp. loader_pid is now a list of loaders..get_loaders(#state{loader_pid = Pids}) when list(Pids) -> Pids. max_loaders() -> case ?catch_val(no_table_loaders) of {'EXIT', _} -> mnesia_lib:set(no_table_loaders,1), 1; Val -> Val end.-record(schema_commit_lock, {owner}).-record(block_controller, {owner}).-record(dump_log, {initiated_by, opt_reply_to }).-record(net_load, {table, reason, opt_reply_to, cstruct = unknown }).-record(send_table, {table, receiver_pid, remote_storage }).-record(disc_load, {table, reason, opt_reply_to }).-record(late_load, {table, reason, opt_reply_to, loaders }).-record(loader_done, {worker_pid, is_loaded, table_name, needs_announce, needs_sync, needs_reply, reply_to, reply}).-record(sender_done, {worker_pid, worker_res, table_name }).-record(dumper_done, {worker_pid, worker_res }).val(Var) -> case ?catch_val(Var) of {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason); Value -> Value end.start() -> gen_server:start_link({local, ?SERVER_NAME}, ?MODULE, [self()], [{timeout, infinity} %% ,{debug, [trace]} ]).sync_dump_log(InitBy) -> call({sync_dump_log, InitBy}).async_dump_log(InitBy) -> ?SERVER_NAME ! {async_dump_log, InitBy}. %% Wait for tables to be active%% If needed, we will wait for Mnesia to start%% If Mnesia stops, we will wait for Mnesia to restart%% We will wait even if the list of tables is empty%%wait_for_tables(Tabs, Timeout) when list(Tabs), Timeout == infinity -> do_wait_for_tables(Tabs, Timeout);wait_for_tables(Tabs, Timeout) when list(Tabs), integer(Timeout), Timeout >= 0 -> do_wait_for_tables(Tabs, Timeout);wait_for_tables(Tabs, Timeout) -> {error, {badarg, Tabs, Timeout}}.do_wait_for_tables(Tabs, 0) -> reply_wait(Tabs);do_wait_for_tables(Tabs, Timeout) -> Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]), receive {?SERVER_NAME, Pid, Res} -> Res; {'EXIT', Pid, _} -> reply_wait(Tabs) after Timeout -> unlink(Pid), exit(Pid, timeout), reply_wait(Tabs) end. reply_wait(Tabs) -> case catch mnesia_lib:active_tables() of {'EXIT', _} -> {error, {node_not_running, node()}}; Active when list(Active) -> case Tabs -- Active of [] -> ok; BadTabs -> {timeout, BadTabs} end end.wait_for_tables_init(From, Tabs) -> process_flag(trap_exit, true), Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)), From ! {?SERVER_NAME, self(), Res}, unlink(From), exit(normal).wait_for_init(From, Tabs, Init) -> case catch link(Init) of {'EXIT', _} -> %% Mnesia is not started {error, {node_not_running, node()}}; true when pid(Init) -> cast({sync_tabs, Tabs, self()}), rec_tabs(Tabs, Tabs, From, Init) end.sync_reply(Waiter, Tab) -> Waiter ! {?SERVER_NAME, {tab_synced, Tab}}.rec_tabs([Tab | Tabs], AllTabs, From, Init) -> receive {?SERVER_NAME, {tab_synced, Tab}} -> rec_tabs(Tabs, AllTabs, From, Init); {'EXIT', From, _} -> %% This will trigger an exit signal %% to mnesia_init exit(wait_for_tables_timeout); {'EXIT', Init, _} -> %% Oops, mnesia_init stopped, exit(mnesia_stopped) end;rec_tabs([], _, _, Init) -> unlink(Init), ok.get_cstructs() -> call(get_cstructs).update(Fun) -> call({update,Fun}).mnesia_down(Node) -> case cast({mnesia_down, Node}) of {error, _} -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node); _Pid -> ok end.wait_for_schema_commit_lock() -> link(whereis(?SERVER_NAME)), unsafe_call(wait_for_schema_commit_lock).block_controller() -> call(block_controller).unblock_controller() -> cast(unblock_controller).release_schema_commit_lock() -> cast({release_schema_commit_lock, self()}), unlink(whereis(?SERVER_NAME)).%% Special for preparation of add table copyget_network_copy(Tab, Cs) ->% We can't let the controller queue this one% because that may cause a deadlock between schema_operations% and initial tableloadings which both takes schema locks.% But we have to get copier_done msgs when the other side % goes down. call({add_other, self()}), Reason = {dumper,add_table_copy}, Work = #net_load{table = Tab,reason = Reason,cstruct = Cs}, %% I'll need this cause it's linked trough the subscriber %% might be solved by using monitor in subscr instead. process_flag(trap_exit, true), Load = load_table_fun(Work), Res = (catch Load()), process_flag(trap_exit, false), call({del_other, self()}), case Res of #loader_done{is_loaded = true} -> Tab = Res#loader_done.table_name, case Res#loader_done.needs_announce of true -> i_have_tab(Tab); false -> ignore end, Res#loader_done.reply; #loader_done{} -> Res#loader_done.reply; Else -> {not_loaded, Else} end.%% This functions is invoked from the dumper%% %% There are two cases here:%% startup ->%% no need for sync, since mnesia_controller not started yet%% schema_trans ->%% already synced with mnesia_controller since the dumper%% is syncronously started from mnesia_controllercreate_table(Tab) -> {loaded, ok} = mnesia_loader:disc_load_table(Tab, {dumper,create_table}).get_disc_copy(Tab) -> disc_load_table(Tab, {dumper,change_table_copy_type}, undefined).%% Returns ok instead of yesforce_load_table(Tab) when atom(Tab), Tab /= schema -> case ?catch_val({Tab, storage_type}) of ram_copies -> do_force_load_table(Tab); disc_copies -> do_force_load_table(Tab); disc_only_copies -> do_force_load_table(Tab); unknown -> set({Tab, load_by_force}, true), cast({force_load_updated, Tab}), wait_for_tables([Tab], infinity); {'EXIT', _} -> {error, {no_exists, Tab}} end;force_load_table(Tab) -> {error, {bad_type, Tab}}. do_force_load_table(Tab) -> Loaded = ?catch_val({Tab, load_reason}), case Loaded of unknown -> set({Tab, load_by_force}, true), mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user), wait_for_tables([Tab], infinity); {'EXIT', _} -> set({Tab, load_by_force}, true), mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user), wait_for_tables([Tab], infinity); _ -> ok end. master_nodes_updated(schema, _Masters) -> ignore;master_nodes_updated(Tab, Masters) -> cast({master_nodes_updated, Tab, Masters}).schedule_late_disc_load(Tabs, Reason) -> MsgTag = late_disc_load, try_schedule_late_disc_load(Tabs, Reason, MsgTag).try_schedule_late_disc_load(Tabs, _Reason, MsgTag) when Tabs == [], MsgTag /= schema_is_merged -> ignore;try_schedule_late_disc_load(Tabs, Reason, MsgTag) -> GetIntents = fun() -> Item = mnesia_late_disc_load, Nodes = val({current, db_nodes}), mnesia:lock({global, Item, Nodes}, write), case multicall(Nodes -- [node()], disc_load_intents) of {Replies, []} -> call({MsgTag, Tabs, Reason, Replies}), done; {_, BadNodes} -> %% Some nodes did not respond, lets try again {retry, BadNodes} end end, case mnesia:transaction(GetIntents) of {atomic, done} -> done; {atomic, {retry, BadNodes}} -> verbose("Retry late_load_tables because bad nodes: ~p~n", [BadNodes]), try_schedule_late_disc_load(Tabs, Reason, MsgTag); {aborted, AbortReason} -> fatal("Cannot late_load_tables~p: ~p~n", [[Tabs, Reason, MsgTag], AbortReason]) end.connect_nodes(Ns) -> case mnesia:system_info(is_running) of
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?