mnesia_loader.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 821 行 · 第 1/2 页

ERL
821
字号
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %%     $Id$%%%%% Purpose : Loads tables from local disc or from remote node-module(mnesia_loader).%% Mnesia internal stuff-export([disc_load_table/2,	 net_load_table/4,	 send_table/3]).-export([old_node_init_table/6]). %% Spawned old node protocol conversion hack-export([spawned_receiver/8]).    %% Spawned lock taking process-import(mnesia_lib, [set/2, fatal/2, verbose/2, dbg_out/2]).-include("mnesia.hrl").val(Var) ->    case ?catch_val(Var) of	{'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason); 	Value -> Value    end.%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Load a table from local discdisc_load_table(Tab, Reason) ->    Storage =  val({Tab, storage_type}),    Type = val({Tab, setorbag}),    dbg_out("Getting table ~p (~p) from disc: ~p~n",	    [Tab, Storage, Reason]),    ?eval_debug_fun({?MODULE, do_get_disc_copy},		    [{tab, Tab},		     {reason, Reason},		     {storage, Storage}, 		     {type, Type}]),    do_get_disc_copy2(Tab, Reason, Storage, Type).do_get_disc_copy2(Tab, _Reason, Storage, _Type) when Storage == unknown ->    verbose("Local table copy of ~p has recently been deleted, ignored.~n",	    [Tab]),    {loaded, ok};  %% ?do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_copies ->    %% NOW we create the actual table    Repair = mnesia_monitor:get_env(auto_repair),    Args = [{keypos, 2}, public, named_table, Type],    case Reason of 	{dumper, _} -> %% Resources allready allocated	    ignore;	_ ->	    mnesia_monitor:mktab(Tab, Args),	    Count = mnesia_log:dcd2ets(Tab, Repair),	    	    case ets:info(Tab, size) of		X when X < Count * 4 ->		    ok = mnesia_log:ets2dcd(Tab); 		_ ->		    ignore	    end    end,     mnesia_index:init_index(Tab, Storage),    snmpify(Tab, Storage),    set({Tab, load_node}, node()),    set({Tab, load_reason}, Reason),    {loaded, ok};do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == ram_copies ->    Args = [{keypos, 2}, public, named_table, Type],    case Reason of 	{dumper, _} -> %% Resources allready allocated	    ignore;	_ ->	    mnesia_monitor:mktab(Tab, Args),	    Fname = mnesia_lib:tab2dcd(Tab),	    Datname = mnesia_lib:tab2dat(Tab),	    Repair = mnesia_monitor:get_env(auto_repair),	    case mnesia_monitor:use_dir() of		true ->		    case mnesia_lib:exists(Fname) of 			true -> mnesia_log:dcd2ets(Tab, Repair);			false ->			    case mnesia_lib:exists(Datname) of				true ->				    mnesia_lib:dets_to_ets(Tab, Tab, Datname, 							   Type, Repair, no);				false ->				    false			    end		    end;		false ->		    false	    end    end,    mnesia_index:init_index(Tab, Storage),    snmpify(Tab, Storage),    set({Tab, load_node}, node()),    set({Tab, load_reason}, Reason),    {loaded, ok};do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_only_copies ->    Args = [{file, mnesia_lib:tab2dat(Tab)},	    {type, mnesia_lib:disk_type(Tab, Type)},	    {keypos, 2},	    {repair, mnesia_monitor:get_env(auto_repair)}],    case Reason of	{dumper, _} ->	    mnesia_index:init_index(Tab, Storage),	    snmpify(Tab, Storage),	    set({Tab, load_node}, node()),	    set({Tab, load_reason}, Reason),	    {loaded, ok};	_ ->	    case mnesia_monitor:open_dets(Tab, Args) of		{ok, _} ->		    mnesia_index:init_index(Tab, Storage),		    snmpify(Tab, Storage),		    set({Tab, load_node}, node()),		    set({Tab, load_reason}, Reason),		    {loaded, ok};		{error, Error} ->		    {not_loaded, {"Failed to create dets table", Error}}	    end    end.%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Load a table from a remote node%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Receiver                             Sender%% --------                             ------%% Grab schema lock on table%%                                      Determine table size%% Create empty pre-grown table%%                                      Grab read lock on table%%                                      Let receiver subscribe on updates done on sender node%%                                      Disable rehashing of table%%                                      Release read lock on table%%                                      Send table to receiver in chunks%% %%                                      Grab read lock on table%% Block dirty updates%%                                      Update wherabouts%% %%                                      Cancel the update subscription%% Process the subscription events%% Optionally dump to disc%% Unblock dirty updates%%                                      Release read lock on table%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%-define(MAX_TRANSFER_SIZE, 7500). -define(MAX_RAM_FILE_SIZE, 1000000).-define(MAX_RAM_TRANSFERS, (?MAX_RAM_FILE_SIZE div ?MAX_TRANSFER_SIZE) + 1).-define(MAX_NOPACKETS, 20).net_load_table(Tab, Reason, Ns, Cs)        when Reason == {dumper,add_table_copy} ->    try_net_load_table(Tab, Reason, Ns, Cs);net_load_table(Tab, Reason, Ns, _Cs) ->    try_net_load_table(Tab, Reason, Ns, val({Tab, cstruct})).try_net_load_table(Tab, _Reason, [], _Cs) ->    verbose("Copy failed. No active replicas of ~p are available.~n", [Tab]),    {not_loaded, none_active};try_net_load_table(Tab, Reason, Ns, Cs) ->    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),    do_get_network_copy(Tab, Reason, Ns, Storage, Cs).do_get_network_copy(Tab, _Reason, _Ns, unknown, _Cs) ->    verbose("Local table copy of ~p has recently been deleted, ignored.~n", [Tab]),    {not_loaded, storage_unknown};do_get_network_copy(Tab, Reason, Ns, Storage, Cs) ->        [Node | Tail] = Ns,    case lists:member(Node,val({current, db_nodes})) of	true ->	    dbg_out("Getting table ~p (~p) from node ~p: ~p~n",		    [Tab, Storage, Node, Reason]),	    ?eval_debug_fun({?MODULE, do_get_network_copy},			    [{tab, Tab}, {reason, Reason}, 			     {nodes, Ns}, {storage, Storage}]),	    case init_receiver(Node, Tab, Storage, Cs, Reason) of		ok ->		    set({Tab, load_node}, Node),		    set({Tab, load_reason}, Reason),		    mnesia_controller:i_have_tab(Tab),		    dbg_out("Table ~p copied from ~p to ~p~n", [Tab, Node, node()]),		    {loaded, ok};		Err = {error, _} when element(1, Reason) == dumper ->		    {not_loaded,Err};		restart ->		    try_net_load_table(Tab, Reason, Tail ++ [Node], Cs);		down ->		    try_net_load_table(Tab, Reason, Tail, Cs) 	    end;	false ->	    try_net_load_table(Tab, Reason, Tail, Cs)    end.snmpify(Tab, Storage) ->    do_snmpify(Tab, val({Tab, snmp}), Storage).do_snmpify(_Tab, [], _Storage) ->    ignore;do_snmpify(Tab, Us, Storage) ->    Snmp = mnesia_snmp_hook:create_table(Us, Tab, Storage),    set({Tab, {index, snmp}}, Snmp).%% Start the recieiver init_receiver(Node, Tab, Storage, Cs, Reas={dumper,add_table_copy}) ->    case start_remote_sender(Node, Tab, Storage) of	{SenderPid, TabSize, DetsData} -> 	    start_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,Reas);	Else ->	    Else    end;init_receiver(Node, Tab,Storage,Cs,Reason) ->    %% Grab a schema lock to avoid deadlock between table_loader and schema_commit dumping.    %% Both may grab tables-locks in different order.    Load = 	fun() -> 		{_,Tid,Ts} = get(mnesia_activity_state), 		mnesia_locker:rlock(Tid, Ts#tidstore.store, {schema, Tab}),		%% Check that table still exists				Active = val({Tab, active_replicas}),		%% Check that we havn't loaded it already		case val({Tab,where_to_read}) == node() of		    true -> ok;		    _ ->			%% And that sender still got a copy 			%% (something might have happend while 			%% we where waiting for the lock)			true = lists:member(Node, Active),			{SenderPid, TabSize, DetsData} = 			    start_remote_sender(Node,Tab,Storage),			Init = table_init_fun(SenderPid),			Args = [self(),Tab,Storage,Cs,SenderPid,				TabSize,DetsData,Init],			Pid = spawn_link(?MODULE, spawned_receiver, Args),			put(mnesia_real_loader, Pid),			wait_on_load_complete(Pid)		end	end,    Res = 	case mnesia:transaction(Load, 20) of	    {atomic, {error,Result}} when 		  element(1,Reason) == dumper -> 		{error,Result};	    {atomic, {error,Result}} -> 		fatal("Cannot create table ~p: ~p~n",		      [[Tab, Storage], Result]);	    {atomic,  Result} -> Result;	    {aborted, nomore} -> restart;	    {aborted, _Reas} -> 		verbose("Receiver failed on ~p from ~p:~nReason: ~p~n", 			[Tab,Node,_Reas]),		down  %% either this node or sender is dying	end,    unlink(whereis(mnesia_tm)),  %% Avoid late unlink from tm    Res.start_remote_sender(Node,Tab,Storage) ->    mnesia_controller:start_remote_sender(Node, Tab, self(), Storage),    put(mnesia_table_sender_node, {Tab, Node}),    receive 	{SenderPid, {first, TabSize}} ->	    {SenderPid, TabSize, false};	{SenderPid, {first, TabSize, DetsData}} ->	    {SenderPid, TabSize, DetsData};	%% Protocol conversion hack	{copier_done, Node} ->	    verbose("Sender of table ~p crashed on node ~p ~n", [Tab, Node]),	    down(Tab, Storage)    end.table_init_fun(SenderPid) ->    PConv = mnesia_monitor:needs_protocol_conversion(node(SenderPid)),    MeMyselfAndI = self(),    fun(read) ->	    Receiver = 		if 		    PConv == true -> 			MeMyselfAndI ! {actual_tabrec, self()},			MeMyselfAndI; %% Old mnesia		    PConv == false -> self()		end,	    SenderPid ! {Receiver, more},	    get_data(SenderPid, Receiver)    end.%% Add_table_copy get's it's own locks.start_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,{dumper,add_table_copy}) ->        Init = table_init_fun(SenderPid),    case do_init_table(Tab,Storage,Cs,SenderPid,TabSize,DetsData,self(), Init) of	Err = {error, _} ->	    SenderPid ! {copier_done, node()},	    Err;	Else ->	    Else    end.spawned_receiver(ReplyTo,Tab,Storage,Cs, SenderPid,TabSize,DetsData, Init) ->    process_flag(trap_exit, true),     Done = do_init_table(Tab,Storage,Cs, 			 SenderPid,TabSize,DetsData,			 ReplyTo, Init),    ReplyTo ! {self(),Done},    unlink(ReplyTo),    unlink(whereis(mnesia_controller)),    exit(normal).wait_on_load_complete(Pid) ->    receive 	{Pid, Res} -> 	    Res;	{'EXIT', Pid, Reason} -> 	    exit(Reason);	Else -> 	    Pid ! Else,	    wait_on_load_complete(Pid)    end.do_init_table(Tab,Storage,Cs,SenderPid, 	      TabSize,DetsInfo,OrigTabRec,Init) ->    case create_table(Tab, TabSize, Storage, Cs) of	{Storage,Tab} ->	    %% Debug info	    Node = node(SenderPid),	    put(mnesia_table_receiver, {Tab, Node, SenderPid}),	    mnesia_tm:block_tab(Tab),	    PConv = mnesia_monitor:needs_protocol_conversion(Node),	    case init_table(Tab,Storage,Init,PConv,DetsInfo,SenderPid) of		ok -> 		    tab_receiver(Node,Tab,Storage,Cs,PConv,OrigTabRec);		Reason ->		    Msg = "[d]ets:init table failed",		    verbose("~s: ~p: ~p~n", [Msg, Tab, Reason]),		    down(Tab, Storage)	    end;	Error ->	    Error    end.create_table(Tab, TabSize, Storage, Cs) ->    if 	Storage == disc_only_copies ->	    mnesia_lib:lock_table(Tab),	    Tmp = mnesia_lib:tab2tmp(Tab),	    Size = lists:max([TabSize, 256]),	    Args = [{file, Tmp},		    {keypos, 2},%%		    {ram_file, true},		    {estimated_no_objects, Size},		    {repair, mnesia_monitor:get_env(auto_repair)},		    {type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)}],	    file:delete(Tmp),	    case mnesia_lib:dets_sync_open(Tab, Args) of		{ok, _} ->		    mnesia_lib:unlock_table(Tab),		    {Storage, Tab};		Else ->		    mnesia_lib:unlock_table(Tab),		    Else	    end;	(Storage == ram_copies) or (Storage == disc_copies) ->	    Args = [{keypos, 2}, public, named_table, Cs#cstruct.type],	    case mnesia_monitor:unsafe_mktab(Tab, Args) of		Tab ->		    {Storage, Tab};		Else ->		    Else	    end    end.tab_receiver(Node, Tab, Storage, Cs, PConv, OrigTabRec) ->    receive	{SenderPid, {no_more, DatBin}} when PConv == false ->	    finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec);		%% Protocol conversion hack	{SenderPid, {no_more, DatBin}} when pid(PConv) ->	    PConv ! {SenderPid, no_more},	    receive 		{old_init_table_complete, ok} ->		    finish_copy(Storage, Tab, Cs, SenderPid, DatBin,OrigTabRec);		{old_init_table_complete, Reason} ->		    Msg = "OLD: [d]ets:init table failed",		    verbose("~s: ~p: ~p~n", [Msg, Tab, Reason]),		    down(Tab, Storage)	    end;	{actual_tabrec, Pid} ->		    tab_receiver(Node, Tab, Storage, Cs, Pid,OrigTabRec);		{SenderPid, {more, [Recs]}} when pid(PConv) ->  	    PConv ! {SenderPid, {more, Recs}}, %% Forward Msg to OldNodes 

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?