mnesia_controller.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 2,103 行 · 第 1/5 页

ERL
2,103
字号
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %%     $Id$%%%% The mnesia_init process loads tables from local disc or from%% another nodes. It also coordinates updates of the info about%% where we can read and write tables.%%%% Tables may need to be loaded initially at startup of the local%% node or when other nodes announces that they already have loaded%% tables that we also want.%%%% Initially we set the load request queue to those tables that we%% safely can load locally, i.e. tables where we have the last%% consistent replica and we have received mnesia_down from all%% other nodes holding the table. Then we let the mnesia_init%% process enter its normal working state.%%%% When we need to load a table we append a request to the load%% request queue. All other requests are regarded as high priority%% and are processed immediately (e.g. update table whereabouts).%% We processes the load request queue as a "background" job..-module(mnesia_controller).-behaviour(gen_server).%% Mnesia internal stuff-export([	 start/0,	 i_have_tab/1,	 info/0,	 get_info/1,	 get_workers/1,	 force_load_table/1,	 async_dump_log/1,	 sync_dump_log/1,	 connect_nodes/1,	 wait_for_schema_commit_lock/0,	 release_schema_commit_lock/0,	 create_table/1,	 get_disc_copy/1,	 get_cstructs/0,	 sync_and_block_table_whereabouts/4,	 sync_del_table_copy_whereabouts/2,	 block_table/1,	 unblock_table/1,	 block_controller/0,	 unblock_controller/0,	 unannounce_add_table_copy/2,	 master_nodes_updated/2,	 mnesia_down/1,	 add_active_replica/2,	 add_active_replica/3,	 add_active_replica/4,	 update/1,	 change_table_access_mode/1,	 del_active_replica/2,	 wait_for_tables/2,	 get_network_copy/2,	 merge_schema/0,	 start_remote_sender/4,	 schedule_late_disc_load/2	]).%% gen_server callbacks-export([init/1,	 handle_call/3,	 handle_cast/2,	 handle_info/2,	 terminate/2,	 code_change/3]).%% Module internal stuff-export([call/1,	 cast/1,	 dump_and_reply/2,	 load_and_reply/2,	 send_and_reply/2,	 wait_for_tables_init/2,	 connect_nodes2/2	]).-import(mnesia_lib, [set/2, add/2]).-import(mnesia_lib, [fatal/2, error/2, verbose/2, dbg_out/2]).-include("mnesia.hrl").-define(SERVER_NAME, ?MODULE).  -record(state, {supervisor,		schema_is_merged = false,		early_msgs = [],		loader_pid = [],     %% Was Pid is now [{Pid,Work}|..] 		loader_queue,        %% Was list is now gb_tree		sender_pid = [],     %% Was a pid or undef is now [{Pid,Work}|..] 		sender_queue =  [],		late_loader_queue,   %% Was list is now gb_tree		dumper_pid,          %% Dumper or schema commit pid		dumper_queue = [],   %% Dumper or schema commit queue		others = [],         %% Processes that needs the copier_done msg		dump_log_timer_ref,		is_stopping = false	       }).%% Backwards Comp. Sender_pid is now a list of senders..get_senders(#state{sender_pid = Pids}) when list(Pids) -> Pids.    %% Backwards Comp. loader_pid is now a list of loaders..get_loaders(#state{loader_pid = Pids}) when list(Pids) -> Pids.    max_loaders() ->    case ?catch_val(no_table_loaders) of	{'EXIT', _} -> 	    mnesia_lib:set(no_table_loaders,1),	    1;	Val -> Val    end.-record(schema_commit_lock, {owner}).-record(block_controller, {owner}).-record(dump_log, {initiated_by,		   opt_reply_to		  }).-record(net_load, {table,		   reason,		   opt_reply_to,		   cstruct = unknown		  }).-record(send_table, {table,		     receiver_pid,		     remote_storage		    }).-record(disc_load, {table, 		    reason,		    opt_reply_to		   }).-record(late_load, {table,		    reason,		    opt_reply_to,		    loaders		   }).-record(loader_done, {worker_pid,		      is_loaded,		      table_name,		      needs_announce,		      needs_sync,		      needs_reply,		      reply_to,		      reply}).-record(sender_done, {worker_pid,		      worker_res,		      table_name		     }).-record(dumper_done, {worker_pid,		      worker_res		     }).val(Var) ->    case ?catch_val(Var) of	{'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason); 	Value -> Value    end.start() ->    gen_server:start_link({local, ?SERVER_NAME}, ?MODULE, [self()],			  [{timeout, infinity}			   %% ,{debug, [trace]}			  ]).sync_dump_log(InitBy) ->    call({sync_dump_log, InitBy}).async_dump_log(InitBy) ->    ?SERVER_NAME ! {async_dump_log, InitBy}.    %% Wait for tables to be active%% If needed, we will wait for Mnesia to start%% If Mnesia stops, we will wait for Mnesia to restart%% We will wait even if the list of tables is empty%%wait_for_tables(Tabs, Timeout) when list(Tabs), Timeout == infinity ->    do_wait_for_tables(Tabs, Timeout);wait_for_tables(Tabs, Timeout) when list(Tabs),                                    integer(Timeout), Timeout >= 0 ->    do_wait_for_tables(Tabs, Timeout);wait_for_tables(Tabs, Timeout) ->    {error, {badarg, Tabs, Timeout}}.do_wait_for_tables(Tabs, 0) ->    reply_wait(Tabs);do_wait_for_tables(Tabs, Timeout) ->    Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]),    receive	{?SERVER_NAME, Pid, Res} ->	    Res;	{'EXIT', Pid, _} ->	    reply_wait(Tabs)    after Timeout ->	    unlink(Pid),	    exit(Pid, timeout),	    reply_wait(Tabs)    end.	reply_wait(Tabs) ->    case catch mnesia_lib:active_tables() of	{'EXIT', _} ->	    {error, {node_not_running, node()}};	Active when list(Active) ->	    case Tabs -- Active of		[] ->		    ok;		BadTabs ->		    {timeout, BadTabs}	    end    end.wait_for_tables_init(From, Tabs) ->    process_flag(trap_exit, true),    Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)),    From ! {?SERVER_NAME, self(), Res},    unlink(From),    exit(normal).wait_for_init(From, Tabs, Init) ->    case catch link(Init) of	{'EXIT', _} ->	    %% Mnesia is not started	    {error, {node_not_running, node()}};	true when pid(Init) ->	    cast({sync_tabs, Tabs, self()}),	    rec_tabs(Tabs, Tabs, From, Init)    end.sync_reply(Waiter, Tab) ->    Waiter ! {?SERVER_NAME, {tab_synced, Tab}}.rec_tabs([Tab | Tabs], AllTabs, From, Init) ->    receive	{?SERVER_NAME, {tab_synced, Tab}} ->	    rec_tabs(Tabs, AllTabs, From, Init);	{'EXIT', From, _} ->	    %% This will trigger an exit signal	    %% to mnesia_init	    exit(wait_for_tables_timeout);		{'EXIT', Init, _} ->	    %% Oops, mnesia_init stopped,	    exit(mnesia_stopped)    end;rec_tabs([], _, _, Init) ->    unlink(Init),    ok.get_cstructs() ->    call(get_cstructs).update(Fun) ->    call({update,Fun}).mnesia_down(Node) ->    case cast({mnesia_down, Node}) of	{error, _} -> mnesia_monitor:mnesia_down(?SERVER_NAME, Node);	_Pid ->  ok    end.wait_for_schema_commit_lock() ->    link(whereis(?SERVER_NAME)),    unsafe_call(wait_for_schema_commit_lock).block_controller() ->    call(block_controller).unblock_controller() ->    cast(unblock_controller).release_schema_commit_lock() ->    cast({release_schema_commit_lock, self()}),    unlink(whereis(?SERVER_NAME)).%% Special for preparation of add table copyget_network_copy(Tab, Cs) ->%   We can't let the controller queue this one%   because that may cause a deadlock between schema_operations%   and initial tableloadings which both takes schema locks.%   But we have to get copier_done msgs when the other side %   goes down.    call({add_other, self()}),    Reason = {dumper,add_table_copy},    Work = #net_load{table = Tab,reason = Reason,cstruct = Cs},    %% I'll need this cause it's linked trough the subscriber    %% might be solved by using monitor in subscr instead.    process_flag(trap_exit, true),    Load = load_table_fun(Work),    Res = (catch Load()),    process_flag(trap_exit, false),    call({del_other, self()}),    case Res of 	#loader_done{is_loaded = true} -> 	    Tab = Res#loader_done.table_name, 	    case Res#loader_done.needs_announce of 		true -> 		    i_have_tab(Tab); 		false -> 		    ignore 	    end, 	    Res#loader_done.reply; 	#loader_done{} ->  	    Res#loader_done.reply; 	Else -> 	    {not_loaded, Else}    end.%% This functions is invoked from the dumper%% %% There are two cases here:%% startup ->%%   no need for sync, since mnesia_controller not started yet%% schema_trans ->%%   already synced with mnesia_controller since the dumper%%   is syncronously started from mnesia_controllercreate_table(Tab) ->    {loaded, ok} = mnesia_loader:disc_load_table(Tab, {dumper,create_table}).get_disc_copy(Tab) ->    disc_load_table(Tab, {dumper,change_table_copy_type}, undefined).%% Returns ok instead of yesforce_load_table(Tab) when atom(Tab), Tab /= schema ->    case ?catch_val({Tab, storage_type}) of	ram_copies ->	    do_force_load_table(Tab);	disc_copies ->	    do_force_load_table(Tab);	disc_only_copies ->	    do_force_load_table(Tab);	unknown ->	    set({Tab, load_by_force}, true),	    cast({force_load_updated, Tab}),	    wait_for_tables([Tab], infinity);	{'EXIT', _} ->	    {error, {no_exists, Tab}}    end;force_load_table(Tab) ->    {error, {bad_type, Tab}}.    do_force_load_table(Tab) ->    Loaded = ?catch_val({Tab, load_reason}),    case Loaded of	unknown -> 	    set({Tab, load_by_force}, true),	    mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),	    wait_for_tables([Tab], infinity);	{'EXIT', _} ->	    set({Tab, load_by_force}, true),	    mnesia_late_loader:async_late_disc_load(node(), [Tab], forced_by_user),	    wait_for_tables([Tab], infinity);	_ ->	    ok    end.    master_nodes_updated(schema, _Masters) ->    ignore;master_nodes_updated(Tab, Masters) ->    cast({master_nodes_updated, Tab, Masters}).schedule_late_disc_load(Tabs, Reason) ->    MsgTag = late_disc_load,    try_schedule_late_disc_load(Tabs, Reason, MsgTag).try_schedule_late_disc_load(Tabs, _Reason, MsgTag)  when Tabs == [], MsgTag /= schema_is_merged ->    ignore;try_schedule_late_disc_load(Tabs, Reason, MsgTag) ->    GetIntents =	fun() ->		Item = mnesia_late_disc_load,		Nodes = val({current, db_nodes}),		mnesia:lock({global, Item, Nodes}, write),		case multicall(Nodes -- [node()], disc_load_intents) of		    {Replies, []} ->			call({MsgTag, Tabs, Reason, Replies}),			done;		    {_, BadNodes} ->			%% Some nodes did not respond, lets try again			{retry, BadNodes}		end	end,    case mnesia:transaction(GetIntents) of	{atomic, done} ->	    done;	{atomic, {retry, BadNodes}} ->	    verbose("Retry late_load_tables because bad nodes: ~p~n",		    [BadNodes]),	    try_schedule_late_disc_load(Tabs, Reason, MsgTag);	{aborted, AbortReason} ->	    fatal("Cannot late_load_tables~p: ~p~n",		  [[Tabs, Reason, MsgTag], AbortReason])    end.connect_nodes(Ns) ->        case mnesia:system_info(is_running) of 

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?