mnesia_loader.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 821 行 · 第 1/2 页
ERL
821 行
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %% $Id$%%%%% Purpose : Loads tables from local disc or from remote node-module(mnesia_loader).%% Mnesia internal stuff-export([disc_load_table/2, net_load_table/4, send_table/3]).-export([old_node_init_table/6]). %% Spawned old node protocol conversion hack-export([spawned_receiver/8]). %% Spawned lock taking process-import(mnesia_lib, [set/2, fatal/2, verbose/2, dbg_out/2]).-include("mnesia.hrl").val(Var) -> case ?catch_val(Var) of {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason); Value -> Value end.%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Load a table from local discdisc_load_table(Tab, Reason) -> Storage = val({Tab, storage_type}), Type = val({Tab, setorbag}), dbg_out("Getting table ~p (~p) from disc: ~p~n", [Tab, Storage, Reason]), ?eval_debug_fun({?MODULE, do_get_disc_copy}, [{tab, Tab}, {reason, Reason}, {storage, Storage}, {type, Type}]), do_get_disc_copy2(Tab, Reason, Storage, Type).do_get_disc_copy2(Tab, _Reason, Storage, _Type) when Storage == unknown -> verbose("Local table copy of ~p has recently been deleted, ignored.~n", [Tab]), {loaded, ok}; %% ?do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_copies -> %% NOW we create the actual table Repair = mnesia_monitor:get_env(auto_repair), Args = [{keypos, 2}, public, named_table, Type], case Reason of {dumper, _} -> %% Resources allready allocated ignore; _ -> mnesia_monitor:mktab(Tab, Args), Count = mnesia_log:dcd2ets(Tab, Repair), case ets:info(Tab, size) of X when X < Count * 4 -> ok = mnesia_log:ets2dcd(Tab); _ -> ignore end end, mnesia_index:init_index(Tab, Storage), snmpify(Tab, Storage), set({Tab, load_node}, node()), set({Tab, load_reason}, Reason), {loaded, ok};do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == ram_copies -> Args = [{keypos, 2}, public, named_table, Type], case Reason of {dumper, _} -> %% Resources allready allocated ignore; _ -> mnesia_monitor:mktab(Tab, Args), Fname = mnesia_lib:tab2dcd(Tab), Datname = mnesia_lib:tab2dat(Tab), Repair = mnesia_monitor:get_env(auto_repair), case mnesia_monitor:use_dir() of true -> case mnesia_lib:exists(Fname) of true -> mnesia_log:dcd2ets(Tab, Repair); false -> case mnesia_lib:exists(Datname) of true -> mnesia_lib:dets_to_ets(Tab, Tab, Datname, Type, Repair, no); false -> false end end; false -> false end end, mnesia_index:init_index(Tab, Storage), snmpify(Tab, Storage), set({Tab, load_node}, node()), set({Tab, load_reason}, Reason), {loaded, ok};do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_only_copies -> Args = [{file, mnesia_lib:tab2dat(Tab)}, {type, mnesia_lib:disk_type(Tab, Type)}, {keypos, 2}, {repair, mnesia_monitor:get_env(auto_repair)}], case Reason of {dumper, _} -> mnesia_index:init_index(Tab, Storage), snmpify(Tab, Storage), set({Tab, load_node}, node()), set({Tab, load_reason}, Reason), {loaded, ok}; _ -> case mnesia_monitor:open_dets(Tab, Args) of {ok, _} -> mnesia_index:init_index(Tab, Storage), snmpify(Tab, Storage), set({Tab, load_node}, node()), set({Tab, load_reason}, Reason), {loaded, ok}; {error, Error} -> {not_loaded, {"Failed to create dets table", Error}} end end.%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Load a table from a remote node%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Receiver Sender%% -------- ------%% Grab schema lock on table%% Determine table size%% Create empty pre-grown table%% Grab read lock on table%% Let receiver subscribe on updates done on sender node%% Disable rehashing of table%% Release read lock on table%% Send table to receiver in chunks%% %% Grab read lock on table%% Block dirty updates%% Update wherabouts%% %% Cancel the update subscription%% Process the subscription events%% Optionally dump to disc%% Unblock dirty updates%% Release read lock on table%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%-define(MAX_TRANSFER_SIZE, 7500). -define(MAX_RAM_FILE_SIZE, 1000000).-define(MAX_RAM_TRANSFERS, (?MAX_RAM_FILE_SIZE div ?MAX_TRANSFER_SIZE) + 1).-define(MAX_NOPACKETS, 20).net_load_table(Tab, Reason, Ns, Cs) when Reason == {dumper,add_table_copy} -> try_net_load_table(Tab, Reason, Ns, Cs);net_load_table(Tab, Reason, Ns, _Cs) -> try_net_load_table(Tab, Reason, Ns, val({Tab, cstruct})).try_net_load_table(Tab, _Reason, [], _Cs) -> verbose("Copy failed. No active replicas of ~p are available.~n", [Tab]), {not_loaded, none_active};try_net_load_table(Tab, Reason, Ns, Cs) -> Storage = mnesia_lib:cs_to_storage_type(node(), Cs), do_get_network_copy(Tab, Reason, Ns, Storage, Cs).do_get_network_copy(Tab, _Reason, _Ns, unknown, _Cs) -> verbose("Local table copy of ~p has recently been deleted, ignored.~n", [Tab]), {not_loaded, storage_unknown};do_get_network_copy(Tab, Reason, Ns, Storage, Cs) -> [Node | Tail] = Ns, case lists:member(Node,val({current, db_nodes})) of true -> dbg_out("Getting table ~p (~p) from node ~p: ~p~n", [Tab, Storage, Node, Reason]), ?eval_debug_fun({?MODULE, do_get_network_copy}, [{tab, Tab}, {reason, Reason}, {nodes, Ns}, {storage, Storage}]), case init_receiver(Node, Tab, Storage, Cs, Reason) of ok -> set({Tab, load_node}, Node), set({Tab, load_reason}, Reason), mnesia_controller:i_have_tab(Tab), dbg_out("Table ~p copied from ~p to ~p~n", [Tab, Node, node()]), {loaded, ok}; Err = {error, _} when element(1, Reason) == dumper -> {not_loaded,Err}; restart -> try_net_load_table(Tab, Reason, Tail ++ [Node], Cs); down -> try_net_load_table(Tab, Reason, Tail, Cs) end; false -> try_net_load_table(Tab, Reason, Tail, Cs) end.snmpify(Tab, Storage) -> do_snmpify(Tab, val({Tab, snmp}), Storage).do_snmpify(_Tab, [], _Storage) -> ignore;do_snmpify(Tab, Us, Storage) -> Snmp = mnesia_snmp_hook:create_table(Us, Tab, Storage), set({Tab, {index, snmp}}, Snmp).%% Start the recieiver init_receiver(Node, Tab, Storage, Cs, Reas={dumper,add_table_copy}) -> case start_remote_sender(Node, Tab, Storage) of {SenderPid, TabSize, DetsData} -> start_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,Reas); Else -> Else end;init_receiver(Node, Tab,Storage,Cs,Reason) -> %% Grab a schema lock to avoid deadlock between table_loader and schema_commit dumping. %% Both may grab tables-locks in different order. Load = fun() -> {_,Tid,Ts} = get(mnesia_activity_state), mnesia_locker:rlock(Tid, Ts#tidstore.store, {schema, Tab}), %% Check that table still exists Active = val({Tab, active_replicas}), %% Check that we havn't loaded it already case val({Tab,where_to_read}) == node() of true -> ok; _ -> %% And that sender still got a copy %% (something might have happend while %% we where waiting for the lock) true = lists:member(Node, Active), {SenderPid, TabSize, DetsData} = start_remote_sender(Node,Tab,Storage), Init = table_init_fun(SenderPid), Args = [self(),Tab,Storage,Cs,SenderPid, TabSize,DetsData,Init], Pid = spawn_link(?MODULE, spawned_receiver, Args), put(mnesia_real_loader, Pid), wait_on_load_complete(Pid) end end, Res = case mnesia:transaction(Load, 20) of {atomic, {error,Result}} when element(1,Reason) == dumper -> {error,Result}; {atomic, {error,Result}} -> fatal("Cannot create table ~p: ~p~n", [[Tab, Storage], Result]); {atomic, Result} -> Result; {aborted, nomore} -> restart; {aborted, _Reas} -> verbose("Receiver failed on ~p from ~p:~nReason: ~p~n", [Tab,Node,_Reas]), down %% either this node or sender is dying end, unlink(whereis(mnesia_tm)), %% Avoid late unlink from tm Res.start_remote_sender(Node,Tab,Storage) -> mnesia_controller:start_remote_sender(Node, Tab, self(), Storage), put(mnesia_table_sender_node, {Tab, Node}), receive {SenderPid, {first, TabSize}} -> {SenderPid, TabSize, false}; {SenderPid, {first, TabSize, DetsData}} -> {SenderPid, TabSize, DetsData}; %% Protocol conversion hack {copier_done, Node} -> verbose("Sender of table ~p crashed on node ~p ~n", [Tab, Node]), down(Tab, Storage) end.table_init_fun(SenderPid) -> PConv = mnesia_monitor:needs_protocol_conversion(node(SenderPid)), MeMyselfAndI = self(), fun(read) -> Receiver = if PConv == true -> MeMyselfAndI ! {actual_tabrec, self()}, MeMyselfAndI; %% Old mnesia PConv == false -> self() end, SenderPid ! {Receiver, more}, get_data(SenderPid, Receiver) end.%% Add_table_copy get's it's own locks.start_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,{dumper,add_table_copy}) -> Init = table_init_fun(SenderPid), case do_init_table(Tab,Storage,Cs,SenderPid,TabSize,DetsData,self(), Init) of Err = {error, _} -> SenderPid ! {copier_done, node()}, Err; Else -> Else end.spawned_receiver(ReplyTo,Tab,Storage,Cs, SenderPid,TabSize,DetsData, Init) -> process_flag(trap_exit, true), Done = do_init_table(Tab,Storage,Cs, SenderPid,TabSize,DetsData, ReplyTo, Init), ReplyTo ! {self(),Done}, unlink(ReplyTo), unlink(whereis(mnesia_controller)), exit(normal).wait_on_load_complete(Pid) -> receive {Pid, Res} -> Res; {'EXIT', Pid, Reason} -> exit(Reason); Else -> Pid ! Else, wait_on_load_complete(Pid) end.do_init_table(Tab,Storage,Cs,SenderPid, TabSize,DetsInfo,OrigTabRec,Init) -> case create_table(Tab, TabSize, Storage, Cs) of {Storage,Tab} -> %% Debug info Node = node(SenderPid), put(mnesia_table_receiver, {Tab, Node, SenderPid}), mnesia_tm:block_tab(Tab), PConv = mnesia_monitor:needs_protocol_conversion(Node), case init_table(Tab,Storage,Init,PConv,DetsInfo,SenderPid) of ok -> tab_receiver(Node,Tab,Storage,Cs,PConv,OrigTabRec); Reason -> Msg = "[d]ets:init table failed", verbose("~s: ~p: ~p~n", [Msg, Tab, Reason]), down(Tab, Storage) end; Error -> Error end.create_table(Tab, TabSize, Storage, Cs) -> if Storage == disc_only_copies -> mnesia_lib:lock_table(Tab), Tmp = mnesia_lib:tab2tmp(Tab), Size = lists:max([TabSize, 256]), Args = [{file, Tmp}, {keypos, 2},%% {ram_file, true}, {estimated_no_objects, Size}, {repair, mnesia_monitor:get_env(auto_repair)}, {type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)}], file:delete(Tmp), case mnesia_lib:dets_sync_open(Tab, Args) of {ok, _} -> mnesia_lib:unlock_table(Tab), {Storage, Tab}; Else -> mnesia_lib:unlock_table(Tab), Else end; (Storage == ram_copies) or (Storage == disc_copies) -> Args = [{keypos, 2}, public, named_table, Cs#cstruct.type], case mnesia_monitor:unsafe_mktab(Tab, Args) of Tab -> {Storage, Tab}; Else -> Else end end.tab_receiver(Node, Tab, Storage, Cs, PConv, OrigTabRec) -> receive {SenderPid, {no_more, DatBin}} when PConv == false -> finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec); %% Protocol conversion hack {SenderPid, {no_more, DatBin}} when pid(PConv) -> PConv ! {SenderPid, no_more}, receive {old_init_table_complete, ok} -> finish_copy(Storage, Tab, Cs, SenderPid, DatBin,OrigTabRec); {old_init_table_complete, Reason} -> Msg = "OLD: [d]ets:init table failed", verbose("~s: ~p: ~p~n", [Msg, Tab, Reason]), down(Tab, Storage) end; {actual_tabrec, Pid} -> tab_receiver(Node, Tab, Storage, Cs, Pid,OrigTabRec); {SenderPid, {more, [Recs]}} when pid(PConv) -> PConv ! {SenderPid, {more, Recs}}, %% Forward Msg to OldNodes
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?