mnesia_checkpoint.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 1,289 行 · 第 1/3 页
ERL
1,289 行
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %% $Id$%%-module(mnesia_checkpoint).%% TM callback interface-export([ tm_add_copy/2, tm_change_table_copy_type/3, tm_del_copy/2, tm_mnesia_down/1, tm_prepare/1, tm_retain/4, tm_retain/5, tm_enter_pending/1, tm_enter_pending/3, tm_exit_pending/1, convert_cp_record/1 ]).%% Public interface-export([ activate/1, checkpoints/0, deactivate/1, deactivate/2, iterate/6, most_local_node/2, really_retain/2, stop/0, stop_iteration/1, tables_and_cookie/1 ]).%% Internal-export([ call/2, cast/2, init/1, remote_deactivate/1, start/1 ]).%% sys callback interface-export([ system_code_change/4, system_continue/3, system_terminate/4 ]).-include("mnesia.hrl").-import(mnesia_lib, [add/2, del/2, set/2, unset/1]).-import(mnesia_lib, [dbg_out/2]).-record(checkpoint_args, {name = {now(), node()}, allow_remote = true, ram_overrides_dump = false, nodes = [], node = node(), now = now(), cookie = ?unique_cookie, min = [], max = [], pending_tab, wait_for_old, % Initially undefined then List is_activated = false, ignore_new = [], retainers = [], iterators = [], supervisor, pid }).%% Old record definition-record(checkpoint, {name, allow_remote, ram_overrides_dump, nodes, node, now, min, max, pending_tab, wait_for_old, is_activated, ignore_new, retainers, iterators, supervisor, pid }).-record(retainer, {cp_name, tab_name, store, writers = [], really_retain = true}).-record(iter, {tab_name, oid_tab, main_tab, retainer_tab, source, val, pid}).-record(pending, {tid, disc_nodes = [], ram_nodes = []}).%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% TM callback functionsstop() -> lists:foreach(fun(Name) -> call(Name, stop) end, checkpoints()), ok.tm_prepare(Cp) when record(Cp, checkpoint_args) -> Name = Cp#checkpoint_args.name, case lists:member(Name, checkpoints()) of false -> start_retainer(Cp); true -> {error, {already_exists, Name, node()}} end;tm_prepare(Cp) when record(Cp, checkpoint) -> %% Node with old protocol sent an old checkpoint record %% and we have to convert it case convert_cp_record(Cp) of {ok, NewCp} -> tm_prepare(NewCp); {error, Reason} -> {error, Reason} end.tm_mnesia_down(Node) -> lists:foreach(fun(Name) -> cast(Name, {mnesia_down, Node}) end, checkpoints()).%% Returns pendingtm_enter_pending(Tid, DiscNs, RamNs) -> Pending = #pending{tid = Tid, disc_nodes = DiscNs, ram_nodes = RamNs}, tm_enter_pending(Pending).tm_enter_pending(Pending) -> PendingTabs = val(pending_checkpoints), tm_enter_pending(PendingTabs, Pending).tm_enter_pending([], Pending) -> Pending;tm_enter_pending([Tab | Tabs], Pending) -> catch ?ets_insert(Tab, Pending), tm_enter_pending(Tabs, Pending).tm_exit_pending(Tid) -> Pids = val(pending_checkpoint_pids), tm_exit_pending(Pids, Tid). tm_exit_pending([], Tid) -> Tid;tm_exit_pending([Pid | Pids], Tid) -> Pid ! {self(), {exit_pending, Tid}}, tm_exit_pending(Pids, Tid).enter_still_pending([Tid | Tids], Tab) -> ?ets_insert(Tab, #pending{tid = Tid}), enter_still_pending(Tids, Tab);enter_still_pending([], _Tab) -> ok.%% Looks up checkpoints for functions in mnesia_tm.tm_retain(Tid, Tab, Key, Op) -> case val({Tab, commit_work}) of [{checkpoints, Checkpoints} | _ ] -> tm_retain(Tid, Tab, Key, Op, Checkpoints); _ -> undefined end. tm_retain(Tid, Tab, Key, Op, Checkpoints) -> case Op of clear_table -> OldRecs = mnesia_lib:db_match_object(Tab, '_'), send_group_retain(OldRecs, Checkpoints, Tid, Tab, []), OldRecs; _ -> OldRecs = mnesia_lib:db_get(Tab, Key), send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}), OldRecs end.send_group_retain([Rec | Recs], Checkpoints, Tid, Tab, [PrevRec | PrevRecs]) when element(2, Rec) /= element(2, PrevRec) -> Key = element(2, PrevRec), OldRecs = lists:reverse([PrevRec | PrevRecs]), send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}), send_group_retain(Recs, Checkpoints, Tid, Tab, [Rec]);send_group_retain([Rec | Recs], Checkpoints, Tid, Tab, Acc) -> send_group_retain(Recs, Checkpoints, Tid, Tab, [Rec | Acc]);send_group_retain([], Checkpoints, Tid, Tab, [PrevRec | PrevRecs]) -> Key = element(2, PrevRec), OldRecs = lists:reverse([PrevRec | PrevRecs]), send_retain(Checkpoints, {retain, Tid, Tab, Key, OldRecs}), ok;send_group_retain([], _Checkpoints, _Tid, _Tab, []) -> ok.send_retain([Name | Names], Msg) -> cast(Name, Msg), send_retain(Names, Msg);send_retain([], _Msg) -> ok. tm_add_copy(Tab, Node) when Node /= node() -> case val({Tab, commit_work}) of [{checkpoints, Checkpoints} | _ ] -> Fun = fun(Name) -> call(Name, {add_copy, Tab, Node}) end, map_call(Fun, Checkpoints, ok); _ -> ok end.tm_del_copy(Tab, Node) when Node == node() -> mnesia_subscr:unsubscribe_table(Tab), case val({Tab, commit_work}) of [{checkpoints, Checkpoints} | _ ] -> Fun = fun(Name) -> call(Name, {del_copy, Tab, Node}) end, map_call(Fun, Checkpoints, ok); _ -> ok end.tm_change_table_copy_type(Tab, From, To) -> case val({Tab, commit_work}) of [{checkpoints, Checkpoints} | _ ] -> Fun = fun(Name) -> call(Name, {change_copy, Tab, From, To}) end, map_call(Fun, Checkpoints, ok); _ -> ok end.map_call(Fun, [Name | Names], Res) -> case Fun(Name) of ok -> map_call(Fun, Names, Res); {error, {no_exists, Name}} -> map_call(Fun, Names, Res); {error, Reason} -> %% BUGBUG: We may end up with some checkpoint retainers %% too much in the add_copy case. How do we remove them? map_call(Fun, Names, {error, Reason}) end;map_call(_Fun, [], Res) -> Res.%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Public functionsdeactivate(Name) -> case call(Name, get_checkpoint) of {error, Reason} -> {error, Reason}; Cp -> deactivate(Cp#checkpoint_args.nodes, Name) end.deactivate(Nodes, Name) -> rpc:multicall(Nodes, ?MODULE, remote_deactivate, [Name]), ok.remote_deactivate(Name) -> call(Name, deactivate).checkpoints() -> val(checkpoints).tables_and_cookie(Name) -> case call(Name, get_checkpoint) of {error, Reason} -> {error, Reason}; Cp -> Tabs = Cp#checkpoint_args.min ++ Cp#checkpoint_args.max, Cookie = Cp#checkpoint_args.cookie, {ok, Tabs, Cookie} end.most_local_node(Name, Tab) -> case ?catch_val({Tab, {retainer, Name}}) of {'EXIT', _} -> {error, {"No retainer attached to table", [Tab, Name]}}; R -> Writers = R#retainer.writers, LocalWriter = lists:member(node(), Writers), if LocalWriter == true -> {ok, node()}; Writers /= [] -> {ok, hd(Writers)}; true -> {error, {"No retainer attached to table", [Tab, Name]}} end end.really_retain(Name, Tab) -> R = val({Tab, {retainer, Name}}), R#retainer.really_retain. %% Activate a checkpoint.%%%% A checkpoint is a transaction consistent state that may be used to%% perform a distributed backup or to rollback the involved tables to%% their old state. Backups may also be used to restore tables to%% their old state. Args is a list of the following tuples:%%%% {name, Name}%% Name of checkpoint. Each checkpoint must have a name which%% is unique on the reachable nodes. The name may be reused when%% the checkpoint has been deactivated.%% By default a probably unique name is generated.%% Multiple checkpoints may be set on the same table.%%%% {allow_remote, Bool}%% false means that all retainers must be local. If the%% table does not reside locally, the checkpoint fails.%% true allows retainers on other nodes.%%%% {min, MinTabs}%% Minimize redundancy and only keep checkpoint info together with%% one replica, preferrably at the local node. If any node involved%% the checkpoint goes down, the checkpoint is deactivated.%%%% {max, MaxTabs}%% Maximize redundancy and keep checkpoint info together with all%% replicas. The checkpoint becomes more fault tolerant if the%% tables has several replicas. When new replicas are added, they%% will also get a retainer attached to them.%%%% {ram_overrides_dump, Bool}%% {ram_overrides_dump, Tabs}%% Only applicable for ram_copies. Bool controls which versions of%% the records that should be included in the checkpoint state.%% true means that the latest comitted records in ram (i.e. the%% records that the application accesses) should be included%% in the checkpoint. false means that the records dumped to%% dat-files (the records that will be loaded at startup) should%% be included in the checkpoint. Tabs is a list of tables.%% Default is false.%%%% {ignore_new, TidList}%% Normally we wait for all pending transactions to complete%% before we allow iteration over the checkpoint. But in order%% to cope with checkpoint activation inside a transaction that%% currently prepares commit (mnesia_init:get_net_work_copy) we%% need to have the ability to ignore the enclosing transaction.%% We do not wait for the transactions in TidList to end. The%% transactions in TidList are regarded as newer than the checkpoint.activate(Args) -> case args2cp(Args) of {ok, Cp} -> do_activate(Cp); {error, Reason} -> {error, Reason} end.args2cp(Args) when list(Args)-> case catch lists:foldl(fun check_arg/2, #checkpoint_args{}, Args) of {'EXIT', Reason} -> {error, Reason}; Cp -> case check_tables(Cp) of {error, Reason} -> {error, Reason}; {ok, Overriders, AllTabs} -> arrange_retainers(Cp, Overriders, AllTabs) end end;args2cp(Args) -> {error, {badarg, Args}}.check_arg({name, Name}, Cp) -> case lists:member(Name, checkpoints()) of true -> exit({already_exists, Name}); false -> case catch tab2retainer({foo, Name}) of List when list(List) -> Cp#checkpoint_args{name = Name}; _ -> exit({badarg, Name}) end end;check_arg({allow_remote, true}, Cp) -> Cp#checkpoint_args{allow_remote = true};check_arg({allow_remote, false}, Cp) -> Cp#checkpoint_args{allow_remote = false};check_arg({ram_overrides_dump, true}, Cp) -> Cp#checkpoint_args{ram_overrides_dump = true};check_arg({ram_overrides_dump, false}, Cp) -> Cp#checkpoint_args{ram_overrides_dump = false};check_arg({ram_overrides_dump, Tabs}, Cp) when list(Tabs) -> Cp#checkpoint_args{ram_overrides_dump = Tabs};check_arg({min, Tabs}, Cp) when list(Tabs) -> Cp#checkpoint_args{min = Tabs};check_arg({max, Tabs}, Cp) when list(Tabs) -> Cp#checkpoint_args{max = Tabs};check_arg({ignore_new, Tids}, Cp) when list(Tids) -> Cp#checkpoint_args{ignore_new = Tids};check_arg(Arg, _) -> exit({badarg, Arg}).check_tables(Cp) -> Min = Cp#checkpoint_args.min, Max = Cp#checkpoint_args.max, AllTabs = Min ++ Max, DoubleTabs = [T || T <- Min, lists:member(T, Max)], Overriders = Cp#checkpoint_args.ram_overrides_dump, if DoubleTabs /= [] -> {error, {combine_error, Cp#checkpoint_args.name, [{min, DoubleTabs}, {max, DoubleTabs}]}}; Min == [], Max == [] -> {error, {combine_error, Cp#checkpoint_args.name, [{min, Min}, {max, Max}]}}; Overriders == false -> {ok, [], AllTabs};
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?