mnesia_frag.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 1,262 行 · 第 1/3 页
ERL
1,262 行
%%% ``The contents of this file are subject to the Erlang Public License,%%% Version 1.1, (the "License"); you may not use this file except in%%% compliance with the License. You should have received a copy of the%%% Erlang Public License along with this software. If not, it can be%%% retrieved via the world wide web at http://www.erlang.org/.%%% %%% Software distributed under the License is distributed on an "AS IS"%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%%% the License for the specific language governing rights and limitations%%% under the License.%%% %%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%%% AB. All Rights Reserved.''%%% %%% $Id$%%%%%%----------------------------------------------------------------------%%% Purpose : Support tables so large that they need%%% to be divided into several fragments.%%%----------------------------------------------------------------------%header_doc_include-module(mnesia_frag).%% Callback functions when accessed within an activity-export([ lock/4, write/5, delete/5, delete_object/5, read/5, match_object/5, all_keys/4, select/5,select/6,select_cont/3, index_match_object/6, index_read/6, foldl/6, foldr/6, table_info/4 ]).%header_doc_include%% -behaviour(mnesia_access).-export([ change_table_frag/2, remove_node/2, expand_cstruct/1, lookup_frag_hash/1, lookup_foreigners/1, frag_names/1, set_frag_hash/2, local_select/4, remote_select/4 ]).-include("mnesia.hrl").-define(OLD_HASH_MOD, mnesia_frag_old_hash).-define(DEFAULT_HASH_MOD, mnesia_frag_hash).%%-define(DEFAULT_HASH_MOD, ?OLD_HASH_MOD). %% BUGBUG: New should be default-record(frag_state, {foreign_key, n_fragments, hash_module, hash_state}).%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Access functions%impl_doc_include%% Callback functions which provides transparent%% access of fragmented tables from any activity%% access context.lock(ActivityId, Opaque, {table , Tab}, LockKind) -> case frag_names(Tab) of [Tab] -> mnesia:lock(ActivityId, Opaque, {table, Tab}, LockKind); Frags -> DeepNs = [mnesia:lock(ActivityId, Opaque, {table, F}, LockKind) || F <- Frags], mnesia_lib:uniq(lists:append(DeepNs)) end;lock(ActivityId, Opaque, LockItem, LockKind) -> mnesia:lock(ActivityId, Opaque, LockItem, LockKind).write(ActivityId, Opaque, Tab, Rec, LockKind) -> Frag = record_to_frag_name(Tab, Rec), mnesia:write(ActivityId, Opaque, Frag, Rec, LockKind).delete(ActivityId, Opaque, Tab, Key, LockKind) -> Frag = key_to_frag_name(Tab, Key), mnesia:delete(ActivityId, Opaque, Frag, Key, LockKind).delete_object(ActivityId, Opaque, Tab, Rec, LockKind) -> Frag = record_to_frag_name(Tab, Rec), mnesia:delete_object(ActivityId, Opaque, Frag, Rec, LockKind).read(ActivityId, Opaque, Tab, Key, LockKind) -> Frag = key_to_frag_name(Tab, Key), mnesia:read(ActivityId, Opaque, Frag, Key, LockKind).match_object(ActivityId, Opaque, Tab, HeadPat, LockKind) -> MatchSpec = [{HeadPat, [], ['$_']}], select(ActivityId, Opaque, Tab, MatchSpec, LockKind).select(ActivityId, Opaque, Tab, MatchSpec, LockKind) -> do_select(ActivityId, Opaque, Tab, MatchSpec, LockKind).select(ActivityId, Opaque, Tab, MatchSpec, Limit, LockKind) -> init_select(ActivityId, Opaque, Tab, MatchSpec, Limit, LockKind).all_keys(ActivityId, Opaque, Tab, LockKind) -> Match = [mnesia:all_keys(ActivityId, Opaque, Frag, LockKind) || Frag <- frag_names(Tab)], lists:append(Match).index_match_object(ActivityId, Opaque, Tab, Pat, Attr, LockKind) -> Match = [mnesia:index_match_object(ActivityId, Opaque, Frag, Pat, Attr, LockKind) || Frag <- frag_names(Tab)], lists:append(Match).index_read(ActivityId, Opaque, Tab, Key, Attr, LockKind) -> Match = [mnesia:index_read(ActivityId, Opaque, Frag, Key, Attr, LockKind) || Frag <- frag_names(Tab)], lists:append(Match).foldl(ActivityId, Opaque, Fun, Acc, Tab, LockKind) -> Fun2 = fun(Frag, A) -> mnesia:foldl(ActivityId, Opaque, Fun, A, Frag, LockKind) end, lists:foldl(Fun2, Acc, frag_names(Tab)).foldr(ActivityId, Opaque, Fun, Acc, Tab, LockKind) -> Fun2 = fun(Frag, A) -> mnesia:foldr(ActivityId, Opaque, Fun, A, Frag, LockKind) end, lists:foldr(Fun2, Acc, frag_names(Tab)).table_info(ActivityId, Opaque, {Tab, Key}, Item) -> Frag = key_to_frag_name(Tab, Key), table_info2(ActivityId, Opaque, Tab, Frag, Item);table_info(ActivityId, Opaque, Tab, Item) -> table_info2(ActivityId, Opaque, Tab, Tab, Item).table_info2(ActivityId, Opaque, Tab, Frag, Item) -> case Item of size -> SumFun = fun({_, Size}, Acc) -> Acc + Size end, lists:foldl(SumFun, 0, frag_size(ActivityId, Opaque, Tab)); memory -> SumFun = fun({_, Size}, Acc) -> Acc + Size end, lists:foldl(SumFun, 0, frag_memory(ActivityId, Opaque, Tab)); base_table -> lookup_prop(Tab, base_table); node_pool -> lookup_prop(Tab, node_pool); n_fragments -> FH = lookup_frag_hash(Tab), FH#frag_state.n_fragments; foreign_key -> FH = lookup_frag_hash(Tab), FH#frag_state.foreign_key; foreigners -> lookup_foreigners(Tab); n_ram_copies -> length(val({Tab, ram_copies})); n_disc_copies -> length(val({Tab, disc_copies})); n_disc_only_copies -> length(val({Tab, disc_only_copies})); frag_names -> frag_names(Tab); frag_dist -> frag_dist(Tab); frag_size -> frag_size(ActivityId, Opaque, Tab); frag_memory -> frag_memory(ActivityId, Opaque, Tab); _ -> mnesia:table_info(ActivityId, Opaque, Frag, Item) end.%impl_doc_includefrag_size(ActivityId, Opaque, Tab) -> [{F, remote_table_info(ActivityId, Opaque, F, size)} || F <- frag_names(Tab)].frag_memory(ActivityId, Opaque, Tab) -> [{F, remote_table_info(ActivityId, Opaque, F, memory)} || F <- frag_names(Tab)]. remote_table_info(ActivityId, Opaque, Tab, Item) -> N = val({Tab, where_to_read}), case rpc:call(N, mnesia, table_info, [ActivityId, Opaque, Tab, Item]) of {badrpc, _} -> mnesia:abort({no_exists, Tab, Item}); Info -> Info end.init_select(Tid,Opaque,Tab,Pat,Limit,LockKind) -> case ?catch_val({Tab, frag_hash}) of {'EXIT', _} -> mnesia:select(Tid, Opaque, Tab, Pat, Limit,LockKind); FH -> FragNumbers = verify_numbers(FH,Pat), Fun = fun(Num) -> Name = n_to_frag_name(Tab, Num), Node = val({Name, where_to_read}), Storage = mnesia_lib:storage_type_at_node(Node, Name), mnesia:lock(Tid, Opaque, {table, Name}, LockKind), {Name, Node, Storage} end, [{FTab,Node,Type}|NameNodes] = lists:map(Fun, FragNumbers), InitFun = fun(FixedSpec) -> mnesia:dirty_sel_init(Node,FTab,FixedSpec,Limit,Type) end, Res = mnesia:fun_select(Tid,Opaque,FTab,Pat,LockKind,FTab,InitFun,Limit,Node,Type), frag_sel_cont(Res, NameNodes, {Pat,LockKind,Limit}) end.select_cont(_Tid,_,{frag_cont, '$end_of_table', [],_}) -> '$end_of_table';select_cont(Tid,Ts,{frag_cont, '$end_of_table', [{Tab,Node,Type}|Rest],Args}) -> {Spec,LockKind,Limit} = Args, InitFun = fun(FixedSpec) -> mnesia:dirty_sel_init(Node,Tab,FixedSpec,Limit,Type) end, Res = mnesia:fun_select(Tid,Ts,Tab,Spec,LockKind,Tab,InitFun,Limit,Node,Type), frag_sel_cont(Res, Rest, Args);select_cont(Tid,Ts,{frag_cont, Cont, TabL, Args}) -> frag_sel_cont(mnesia:select_cont(Tid,Ts,Cont),TabL,Args);select_cont(Tid,Ts,Else) -> %% Not a fragmented table mnesia:select_cont(Tid,Ts,Else).frag_sel_cont('$end_of_table', [],_) -> '$end_of_table';frag_sel_cont('$end_of_table', TabL,Args) -> {[], {frag_cont, '$end_of_table', TabL,Args}};frag_sel_cont({Recs,Cont}, TabL,Args) -> {Recs, {frag_cont, Cont, TabL,Args}}.do_select(ActivityId, Opaque, Tab, MatchSpec, LockKind) -> case ?catch_val({Tab, frag_hash}) of {'EXIT', _} -> mnesia:select(ActivityId, Opaque, Tab, MatchSpec, LockKind); FH -> FragNumbers = verify_numbers(FH,MatchSpec), Fun = fun(Num) -> Name = n_to_frag_name(Tab, Num), Node = val({Name, where_to_read}), mnesia:lock(ActivityId, Opaque, {table, Name}, LockKind), {Name, Node} end, NameNodes = lists:map(Fun, FragNumbers), SelectAllFun = fun(PatchedMatchSpec) -> Match = [mnesia:dirty_select(Name, PatchedMatchSpec) || {Name, _Node} <- NameNodes], lists:append(Match) end, case [{Name, Node} || {Name, Node} <- NameNodes, Node /= node()] of [] -> %% All fragments are local mnesia:fun_select(ActivityId, Opaque, Tab, MatchSpec, none, '_', SelectAllFun); RemoteNameNodes -> Type = val({Tab,setorbag}), SelectFun = fun(PatchedMatchSpec) -> Ref = make_ref(), Args = [self(), Ref, RemoteNameNodes, PatchedMatchSpec], Pid = spawn_link(?MODULE, local_select, Args), LocalMatch0 = [mnesia:dirty_select(Name, PatchedMatchSpec) || {Name, Node} <- NameNodes, Node == node()], LocalMatch = case Type of ordered_set -> lists:merge(LocalMatch0); _ -> lists:append(LocalMatch0) end, OldSelectFun = fun() -> SelectAllFun(PatchedMatchSpec) end, local_collect(Ref, Pid, Type, LocalMatch, OldSelectFun) end, mnesia:fun_select(ActivityId, Opaque, Tab, MatchSpec, none, '_', SelectFun) end end.verify_numbers(FH,MatchSpec) -> HashState = FH#frag_state.hash_state, FragNumbers = case FH#frag_state.hash_module of HashMod when HashMod == ?DEFAULT_HASH_MOD -> ?DEFAULT_HASH_MOD:match_spec_to_frag_numbers(HashState, MatchSpec); HashMod -> HashMod:match_spec_to_frag_numbers(HashState, MatchSpec) end, N = FH#frag_state.n_fragments, VerifyFun = fun(F) when integer(F), F >= 1, F =< N -> false; (_F) -> true end, case catch lists:filter(VerifyFun, FragNumbers) of [] -> FragNumbers; BadFrags -> mnesia:abort({"match_spec_to_frag_numbers: Fragment numbers out of range", BadFrags, {range, 1, N}}) end.local_select(ReplyTo, Ref, RemoteNameNodes, MatchSpec) -> RemoteNodes = mnesia_lib:uniq([Node || {_Name, Node} <- RemoteNameNodes]), Args = [ReplyTo, Ref, RemoteNameNodes, MatchSpec], {Replies, BadNodes} = rpc:multicall(RemoteNodes, ?MODULE, remote_select, Args), case mnesia_lib:uniq(Replies) -- [ok] of [] when BadNodes == [] -> ReplyTo ! {local_select, Ref, ok}; _ when BadNodes /= [] -> ReplyTo ! {local_select, Ref, {error, {node_not_running, hd(BadNodes)}}}; [{badrpc, {'EXIT', Reason}} | _] -> ReplyTo ! {local_select, Ref, {error, Reason}}; [Reason | _] -> ReplyTo ! {local_select, Ref, {error, Reason}} end, unlink(ReplyTo), exit(normal). remote_select(ReplyTo, Ref, NameNodes, MatchSpec) -> do_remote_select(ReplyTo, Ref, NameNodes, MatchSpec).do_remote_select(ReplyTo, Ref, [{Name, Node} | NameNodes], MatchSpec) -> if Node == node() -> Res = (catch {ok, mnesia:dirty_select(Name, MatchSpec)}), ReplyTo ! {remote_select, Ref, Node, Res}, do_remote_select(ReplyTo, Ref, NameNodes, MatchSpec); true -> do_remote_select(ReplyTo, Ref, NameNodes, MatchSpec) end;do_remote_select(_ReplyTo, _Ref, [], _MatchSpec) -> ok.local_collect(Ref, Pid, Type, LocalMatch, OldSelectFun) -> receive {local_select, Ref, LocalRes} -> remote_collect(Ref, Type, LocalRes, LocalMatch, OldSelectFun); {'EXIT', Pid, Reason} -> remote_collect(Ref, Type, {error, Reason}, [], OldSelectFun) end. remote_collect(Ref, Type, LocalRes = ok, Acc, OldSelectFun) -> receive {remote_select, Ref, Node, RemoteRes} -> case RemoteRes of {ok, RemoteMatch} -> Matches = case Type of ordered_set -> lists:merge(RemoteMatch, Acc); _ -> RemoteMatch ++ Acc end, remote_collect(Ref, Type, LocalRes, Matches, OldSelectFun); _ -> remote_collect(Ref, Type, {error, {node_not_running, Node}}, [], OldSelectFun) end after 0 -> Acc end;remote_collect(Ref, Type, LocalRes = {error, Reason}, _Acc, OldSelectFun) -> receive {remote_select, Ref, _Node, _RemoteRes} -> remote_collect(Ref, Type, LocalRes, [], OldSelectFun) after 0 -> mnesia:abort(Reason) end.%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Returns a list of cstructsexpand_cstruct(Cs) -> expand_cstruct(Cs, create). expand_cstruct(Cs, Mode) -> Tab = Cs#cstruct.name, Props = Cs#cstruct.frag_properties, mnesia_schema:verify({alt, [nil, list]}, mnesia_lib:etype(Props), {badarg, Tab, Props}), %% Verify keys ValidKeys = [foreign_key, n_fragments, node_pool, n_ram_copies, n_disc_copies, n_disc_only_copies, hash_module, hash_state], Keys = mnesia_schema:check_keys(Tab, Props, ValidKeys), mnesia_schema:check_duplicates(Tab, Keys), %% Pick fragmentation props ForeignKey = mnesia_schema:pick(Tab, foreign_key, Props, undefined), {ForeignKey2, N, Pool, DefaultNR, DefaultND, DefaultNDO} = pick_props(Tab, Cs, ForeignKey), %% Verify node_pool BadPool = {bad_type, Tab, {node_pool, Pool}}, mnesia_schema:verify(list, mnesia_lib:etype(Pool), BadPool), NotAtom = fun(A) when atom(A) -> false; (_A) -> true end, mnesia_schema:verify([], [P || P <- Pool, NotAtom(P)], BadPool), NR = mnesia_schema:pick(Tab, n_ram_copies, Props, 0), ND = mnesia_schema:pick(Tab, n_disc_copies, Props, 0), NDO = mnesia_schema:pick(Tab, n_disc_only_copies, Props, 0), PosInt = fun(I) when integer(I), I >= 0 -> true; (_I) -> false end, mnesia_schema:verify(true, PosInt(NR), {bad_type, Tab, {n_ram_copies, NR}}), mnesia_schema:verify(true, PosInt(ND), {bad_type, Tab, {n_disc_copies, ND}}), mnesia_schema:verify(true, PosInt(NDO), {bad_type, Tab, {n_disc_only_copies, NDO}}), %% Verify n_fragments Cs2 = verify_n_fragments(N, Cs, Mode), %% Verify hash callback
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?