mnesia_checkpoint.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 1,289 行 · 第 1/3 页

ERL
1,289
字号
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %%     $Id$%%-module(mnesia_checkpoint).%% TM callback interface-export([	 tm_add_copy/2,	 tm_change_table_copy_type/3,	 tm_del_copy/2,	 tm_mnesia_down/1,	 tm_prepare/1,	 tm_retain/4,	 tm_retain/5,	 tm_enter_pending/1,	 tm_enter_pending/3,	 tm_exit_pending/1,	 convert_cp_record/1	]).%% Public interface-export([	 activate/1,	 checkpoints/0,	 deactivate/1,	 deactivate/2,	 iterate/6,	 most_local_node/2,	 really_retain/2,	 stop/0,	 stop_iteration/1,	 tables_and_cookie/1	]).%% Internal-export([	 call/2,	 cast/2,	 init/1,	 remote_deactivate/1,	 start/1	]).%% sys callback interface-export([	 system_code_change/4,	 system_continue/3,	 system_terminate/4	]).-include("mnesia.hrl").-import(mnesia_lib, [add/2, del/2, set/2, unset/1]).-import(mnesia_lib, [dbg_out/2]).-record(checkpoint_args, {name = {now(), node()},			  allow_remote = true,			  ram_overrides_dump = false,			  nodes = [],			  node = node(),			  now = now(),			  cookie = ?unique_cookie,			  min = [],			  max = [],			  pending_tab,			  wait_for_old, % Initially undefined then List			  is_activated = false,			  ignore_new = [],			  retainers = [],			  iterators = [],			  supervisor,			  pid			 }).%% Old record definition-record(checkpoint, {name,		     allow_remote,		     ram_overrides_dump,		     nodes,		     node,		     now,		     min,		     max,		     pending_tab,		     wait_for_old,		     is_activated,		     ignore_new,		     retainers,		     iterators,		     supervisor,		     pid		    }).-record(retainer, {cp_name, tab_name, store, writers = [], really_retain = true}).-record(iter, {tab_name, oid_tab, main_tab, retainer_tab, source, val, pid}).-record(pending, {tid, disc_nodes = [], ram_nodes = []}).%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% TM callback functionsstop() ->    lists:foreach(fun(Name) -> call(Name, stop) end,		  checkpoints()),    ok.tm_prepare(Cp) when record(Cp, checkpoint_args) ->    Name = Cp#checkpoint_args.name,    case lists:member(Name, checkpoints()) of	false ->	    start_retainer(Cp);	true ->	    {error, {already_exists, Name, node()}}    end;tm_prepare(Cp) when record(Cp, checkpoint) ->    %% Node with old protocol sent an old checkpoint record    %% and we have to convert it    case convert_cp_record(Cp) of	{ok, NewCp} ->	    tm_prepare(NewCp);	{error, Reason} ->	    {error, Reason}    end.tm_mnesia_down(Node) ->    lists:foreach(fun(Name) -> cast(Name, {mnesia_down, Node}) end,		  checkpoints()).%% Returns pendingtm_enter_pending(Tid, DiscNs, RamNs) ->    Pending = #pending{tid = Tid, disc_nodes = DiscNs, ram_nodes = RamNs},    tm_enter_pending(Pending).tm_enter_pending(Pending) ->    PendingTabs = val(pending_checkpoints),    tm_enter_pending(PendingTabs, Pending).tm_enter_pending([], Pending) ->    Pending;tm_enter_pending([Tab | Tabs], Pending) ->    catch ?ets_insert(Tab, Pending),    tm_enter_pending(Tabs, Pending).tm_exit_pending(Tid) ->    Pids = val(pending_checkpoint_pids),    tm_exit_pending(Pids, Tid).    tm_exit_pending([], Tid) ->    Tid;tm_exit_pending([Pid | Pids], Tid) ->    Pid ! {self(), {exit_pending, Tid}},    tm_exit_pending(Pids, Tid).enter_still_pending([Tid | Tids], Tab) ->    ?ets_insert(Tab, #pending{tid = Tid}),    enter_still_pending(Tids, Tab);enter_still_pending([], _Tab) ->    ok.%% Looks up checkpoints for functions in mnesia_tm.tm_retain(Tid, Tab, Key, Op) ->    case val({Tab, commit_work}) of	[{checkpoints, Checkpoints} | _ ] ->	    tm_retain(Tid, Tab, Key, Op, Checkpoints);	_ -> 	    undefined    end.    tm_retain(Tid, Tab, Key, Op, Checkpoints) ->    case Op of	clear_table ->	    OldRecs = mnesia_lib:db_match_object(Tab, '_'),	    send_group_retain(OldRecs, Checkpoints, Tid, Tab, []),	    OldRecs;	_ ->	    OldRecs = mnesia_lib:db_get(Tab, Key),	    send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}),	    OldRecs    end.send_group_retain([Rec | Recs], Checkpoints, Tid, Tab, [PrevRec | PrevRecs])  when element(2, Rec) /= element(2, PrevRec) ->    Key = element(2, PrevRec),    OldRecs = lists:reverse([PrevRec | PrevRecs]),    send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}),    send_group_retain(Recs, Checkpoints, Tid, Tab, [Rec]);send_group_retain([Rec | Recs], Checkpoints, Tid, Tab, Acc) ->    send_group_retain(Recs, Checkpoints, Tid, Tab, [Rec | Acc]);send_group_retain([], Checkpoints, Tid, Tab, [PrevRec | PrevRecs]) ->    Key = element(2, PrevRec),    OldRecs = lists:reverse([PrevRec | PrevRecs]),    send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}),    ok;send_group_retain([], _Checkpoints, _Tid, _Tab, []) ->    ok.send_retain([Name | Names], Msg) ->    cast(Name, Msg),    send_retain(Names, Msg);send_retain([], _Msg) ->    ok.    tm_add_copy(Tab, Node) when Node /= node() ->    case val({Tab, commit_work}) of	[{checkpoints, Checkpoints} | _ ] ->	    Fun = fun(Name) -> call(Name, {add_copy, Tab, Node}) end,	    map_call(Fun, Checkpoints, ok);	_  -> 	    ok    end.tm_del_copy(Tab, Node) when Node == node() ->    mnesia_subscr:unsubscribe_table(Tab),    case val({Tab, commit_work}) of	[{checkpoints, Checkpoints} | _ ] ->	    	    Fun = fun(Name) -> call(Name, {del_copy, Tab, Node}) end,	    map_call(Fun, Checkpoints, ok);	_ ->	    ok    end.tm_change_table_copy_type(Tab, From, To) ->    case val({Tab, commit_work}) of	[{checkpoints, Checkpoints} | _ ] ->	    Fun = fun(Name) -> call(Name, {change_copy, Tab, From, To}) end,	    map_call(Fun, Checkpoints, ok);	_ -> 	    ok    end.map_call(Fun, [Name | Names], Res) ->    case Fun(Name) of	 ok ->	    map_call(Fun, Names, Res);	{error, {no_exists, Name}} ->	    map_call(Fun, Names, Res);	{error, Reason} ->	    %% BUGBUG: We may end up with some checkpoint retainers	    %% too much in the add_copy case. How do we remove them?	    map_call(Fun, Names, {error, Reason})    end;map_call(_Fun, [], Res) ->    Res.%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Public functionsdeactivate(Name) ->    case call(Name, get_checkpoint) of	{error, Reason} ->	    {error, Reason};	Cp ->	    deactivate(Cp#checkpoint_args.nodes, Name)    end.deactivate(Nodes, Name) ->    rpc:multicall(Nodes, ?MODULE, remote_deactivate, [Name]),    ok.remote_deactivate(Name) ->    call(Name, deactivate).checkpoints() -> val(checkpoints).tables_and_cookie(Name) ->    case call(Name, get_checkpoint) of	{error, Reason} ->	    {error, Reason};	Cp ->	    Tabs = Cp#checkpoint_args.min ++ Cp#checkpoint_args.max,	    Cookie = Cp#checkpoint_args.cookie,	    {ok, Tabs, Cookie}    end.most_local_node(Name, Tab) ->    case ?catch_val({Tab, {retainer, Name}}) of	{'EXIT', _} -> 	    {error, {"No retainer attached to table", [Tab, Name]}};	R -> 	    	    Writers = R#retainer.writers,	    LocalWriter = lists:member(node(), Writers),	    if		LocalWriter == true ->		    {ok, node()};		Writers /= [] ->		    {ok, hd(Writers)};		true  ->		    {error, {"No retainer attached to table", [Tab, Name]}}	    end    end.really_retain(Name, Tab) ->    R = val({Tab, {retainer, Name}}),    R#retainer.really_retain. %% Activate a checkpoint.%%%% A checkpoint is a transaction consistent state that may be used to%% perform a distributed backup or to rollback the involved tables to%% their old state. Backups may also be used to restore tables to%% their old state. Args is a list of the following tuples:%%%% {name, Name}%%    Name of checkpoint. Each checkpoint must have a name which%%    is unique on the reachable nodes. The name may be reused when%%    the checkpoint has been deactivated.%%    By default a probably unique name is generated.%%    Multiple checkpoints may be set on the same table.%%%% {allow_remote, Bool}%%   false means that all retainers must be local. If the%%   table does not reside locally, the checkpoint fails.%%   true allows retainers on other nodes.%%%% {min, MinTabs}%%   Minimize redundancy and only keep checkpoint info together with%%   one replica, preferrably at the local node. If any node involved%%   the checkpoint goes down, the checkpoint is deactivated.%%%% {max, MaxTabs}%%    Maximize redundancy and keep checkpoint info together with all%%    replicas. The checkpoint becomes more fault tolerant if the%%    tables has several replicas. When new replicas are added, they%%    will also get a retainer attached to them.%%%% {ram_overrides_dump, Bool}%% {ram_overrides_dump, Tabs}%%   Only applicable for ram_copies. Bool controls which versions of%%   the records that should be included in the checkpoint state.%%   true means that the latest comitted records in ram (i.e. the%%   records that the application accesses) should be included%%   in the checkpoint. false means that the records dumped to%%   dat-files (the records that will be loaded at startup) should%%   be included in the checkpoint. Tabs is a list of tables.%%   Default is false.%%%% {ignore_new, TidList}%%   Normally we wait for all pending transactions to complete%%   before we allow iteration over the checkpoint. But in order%%   to cope with checkpoint activation inside a transaction that%%   currently prepares commit (mnesia_init:get_net_work_copy) we%%   need to have the ability to ignore the enclosing transaction.%%   We do not wait for the transactions in TidList to end. The%%   transactions in TidList are regarded as newer than the checkpoint.activate(Args) ->    case args2cp(Args) of	{ok, Cp} ->	    do_activate(Cp);	{error, Reason} ->	    {error, Reason}    end.args2cp(Args) when list(Args)->    case catch lists:foldl(fun check_arg/2, #checkpoint_args{}, Args) of	{'EXIT', Reason} ->	    {error, Reason};	Cp ->	    case check_tables(Cp) of		{error, Reason} ->		    {error, Reason};		{ok, Overriders, AllTabs} ->		    arrange_retainers(Cp, Overriders, AllTabs)	    end    end;args2cp(Args) ->    {error, {badarg, Args}}.check_arg({name, Name}, Cp) ->    case lists:member(Name, checkpoints()) of	true ->	    exit({already_exists, Name});	false ->	    case catch tab2retainer({foo, Name}) of		List when list(List) ->		    Cp#checkpoint_args{name = Name};		_ ->		    exit({badarg, Name})	    end    end;check_arg({allow_remote, true}, Cp) ->    Cp#checkpoint_args{allow_remote = true};check_arg({allow_remote, false}, Cp) ->    Cp#checkpoint_args{allow_remote = false};check_arg({ram_overrides_dump, true}, Cp) ->    Cp#checkpoint_args{ram_overrides_dump = true};check_arg({ram_overrides_dump, false}, Cp) ->    Cp#checkpoint_args{ram_overrides_dump = false};check_arg({ram_overrides_dump, Tabs}, Cp) when list(Tabs) ->    Cp#checkpoint_args{ram_overrides_dump = Tabs};check_arg({min, Tabs}, Cp) when list(Tabs) ->    Cp#checkpoint_args{min = Tabs};check_arg({max, Tabs}, Cp) when list(Tabs) ->    Cp#checkpoint_args{max = Tabs};check_arg({ignore_new, Tids}, Cp) when list(Tids) ->    Cp#checkpoint_args{ignore_new = Tids};check_arg(Arg, _) ->    exit({badarg, Arg}).check_tables(Cp) ->    Min = Cp#checkpoint_args.min,    Max = Cp#checkpoint_args.max,    AllTabs = Min ++ Max,    DoubleTabs = [T || T <- Min, lists:member(T, Max)],    Overriders = Cp#checkpoint_args.ram_overrides_dump,    if	DoubleTabs /= [] ->	    {error, {combine_error, Cp#checkpoint_args.name,		     [{min, DoubleTabs}, {max, DoubleTabs}]}};	Min == [], Max == [] ->	    {error, {combine_error, Cp#checkpoint_args.name,		     [{min, Min}, {max, Max}]}};	Overriders == false ->	    {ok, [], AllTabs};

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?