mnesia_frag.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 1,262 行 · 第 1/3 页

ERL
1,262
字号
%%% ``The contents of this file are subject to the Erlang Public License,%%% Version 1.1, (the "License"); you may not use this file except in%%% compliance with the License. You should have received a copy of the%%% Erlang Public License along with this software. If not, it can be%%% retrieved via the world wide web at http://www.erlang.org/.%%% %%% Software distributed under the License is distributed on an "AS IS"%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%%% the License for the specific language governing rights and limitations%%% under the License.%%% %%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%%% AB. All Rights Reserved.''%%% %%%     $Id$%%%%%%----------------------------------------------------------------------%%% Purpose : Support tables so large that they need%%%           to be divided into several fragments.%%%----------------------------------------------------------------------%header_doc_include-module(mnesia_frag).%% Callback functions when accessed within an activity-export([	 lock/4,	 write/5, delete/5, delete_object/5,	 read/5, match_object/5, all_keys/4,	 select/5,select/6,select_cont/3,	 index_match_object/6, index_read/6,	 foldl/6, foldr/6,	 table_info/4       ]).%header_doc_include%% -behaviour(mnesia_access).-export([	 change_table_frag/2,	 remove_node/2,	 expand_cstruct/1,	 lookup_frag_hash/1,	 lookup_foreigners/1,	 frag_names/1,	 set_frag_hash/2,	 local_select/4,	 remote_select/4	]).-include("mnesia.hrl").-define(OLD_HASH_MOD, mnesia_frag_old_hash).-define(DEFAULT_HASH_MOD, mnesia_frag_hash).%%-define(DEFAULT_HASH_MOD, ?OLD_HASH_MOD). %%  BUGBUG: New should be default-record(frag_state,	{foreign_key,	 n_fragments,	 hash_module,	 hash_state}).%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Access functions%impl_doc_include%% Callback functions which provides transparent%% access of fragmented tables from any activity%% access context.lock(ActivityId, Opaque, {table , Tab}, LockKind) ->    case frag_names(Tab) of	[Tab] ->	    mnesia:lock(ActivityId, Opaque, {table, Tab}, LockKind);	    	Frags ->	    DeepNs = [mnesia:lock(ActivityId, Opaque, {table, F}, LockKind) ||			 F <- Frags],	    mnesia_lib:uniq(lists:append(DeepNs))    end;lock(ActivityId, Opaque, LockItem, LockKind) ->    mnesia:lock(ActivityId, Opaque, LockItem, LockKind).write(ActivityId, Opaque, Tab, Rec, LockKind) ->    Frag = record_to_frag_name(Tab, Rec),    mnesia:write(ActivityId, Opaque, Frag, Rec, LockKind).delete(ActivityId, Opaque, Tab, Key, LockKind) ->    Frag = key_to_frag_name(Tab, Key),    mnesia:delete(ActivityId, Opaque, Frag, Key, LockKind).delete_object(ActivityId, Opaque, Tab, Rec, LockKind) ->    Frag = record_to_frag_name(Tab, Rec),    mnesia:delete_object(ActivityId, Opaque, Frag, Rec, LockKind).read(ActivityId, Opaque, Tab, Key, LockKind) ->    Frag = key_to_frag_name(Tab, Key),    mnesia:read(ActivityId, Opaque, Frag, Key, LockKind).match_object(ActivityId, Opaque, Tab, HeadPat, LockKind) ->    MatchSpec = [{HeadPat, [], ['$_']}],    select(ActivityId, Opaque, Tab, MatchSpec, LockKind).select(ActivityId, Opaque, Tab, MatchSpec, LockKind) ->    do_select(ActivityId, Opaque, Tab, MatchSpec, LockKind).select(ActivityId, Opaque, Tab, MatchSpec, Limit, LockKind) ->    init_select(ActivityId, Opaque, Tab, MatchSpec, Limit, LockKind).all_keys(ActivityId, Opaque, Tab, LockKind) ->    Match = [mnesia:all_keys(ActivityId, Opaque, Frag, LockKind)	     || Frag <- frag_names(Tab)],    lists:append(Match).index_match_object(ActivityId, Opaque, Tab, Pat, Attr, LockKind) ->    Match =	[mnesia:index_match_object(ActivityId, Opaque, Frag, Pat, Attr, LockKind)	 || Frag <- frag_names(Tab)],    lists:append(Match).index_read(ActivityId, Opaque, Tab, Key, Attr, LockKind) ->    Match =	[mnesia:index_read(ActivityId, Opaque, Frag, Key, Attr, LockKind)	     || Frag <- frag_names(Tab)],    lists:append(Match).foldl(ActivityId, Opaque, Fun, Acc, Tab, LockKind) ->    Fun2 = fun(Frag, A) ->		   mnesia:foldl(ActivityId, Opaque, Fun, A, Frag, LockKind)	   end,    lists:foldl(Fun2, Acc, frag_names(Tab)).foldr(ActivityId, Opaque, Fun, Acc, Tab, LockKind) ->    Fun2 = fun(Frag, A) ->		   mnesia:foldr(ActivityId, Opaque, Fun, A, Frag, LockKind)	   end,    lists:foldr(Fun2, Acc, frag_names(Tab)).table_info(ActivityId, Opaque, {Tab, Key}, Item) ->    Frag = key_to_frag_name(Tab, Key),    table_info2(ActivityId, Opaque, Tab, Frag, Item);table_info(ActivityId, Opaque, Tab, Item) ->    table_info2(ActivityId, Opaque, Tab, Tab, Item).table_info2(ActivityId, Opaque, Tab, Frag, Item) ->    case Item of	size ->	    SumFun = fun({_, Size}, Acc) -> Acc + Size end,	    lists:foldl(SumFun, 0, frag_size(ActivityId, Opaque, Tab));	memory ->	    SumFun = fun({_, Size}, Acc) -> Acc + Size end,	    lists:foldl(SumFun, 0, frag_memory(ActivityId, Opaque, Tab));	base_table ->	    lookup_prop(Tab, base_table);	node_pool ->	    lookup_prop(Tab, node_pool);	n_fragments ->	    FH = lookup_frag_hash(Tab),	    FH#frag_state.n_fragments;	foreign_key ->	    FH = lookup_frag_hash(Tab),	    FH#frag_state.foreign_key;	foreigners ->	    lookup_foreigners(Tab);	n_ram_copies ->	    length(val({Tab, ram_copies}));	n_disc_copies ->	    length(val({Tab, disc_copies}));	n_disc_only_copies ->	    length(val({Tab, disc_only_copies}));	frag_names ->	    frag_names(Tab);	frag_dist ->	    frag_dist(Tab);	frag_size ->	    frag_size(ActivityId, Opaque, Tab);	frag_memory ->	    frag_memory(ActivityId, Opaque, Tab);	_ ->	    mnesia:table_info(ActivityId, Opaque, Frag, Item)    end.%impl_doc_includefrag_size(ActivityId, Opaque, Tab) ->    [{F, remote_table_info(ActivityId, Opaque, F, size)} || F <- frag_names(Tab)].frag_memory(ActivityId, Opaque, Tab) ->    [{F, remote_table_info(ActivityId, Opaque, F, memory)} || F <- frag_names(Tab)].      remote_table_info(ActivityId, Opaque, Tab, Item) ->    N = val({Tab, where_to_read}),    case rpc:call(N, mnesia, table_info, [ActivityId, Opaque, Tab, Item]) of	{badrpc, _} ->	    mnesia:abort({no_exists, Tab, Item});	Info ->	    Info    end.init_select(Tid,Opaque,Tab,Pat,Limit,LockKind) ->    case ?catch_val({Tab, frag_hash}) of	{'EXIT', _} ->	    mnesia:select(Tid, Opaque, Tab, Pat, Limit,LockKind);	FH ->	    FragNumbers = verify_numbers(FH,Pat), 	    Fun = fun(Num) ->			  Name = n_to_frag_name(Tab, Num),			  Node = val({Name, where_to_read}),			  Storage = mnesia_lib:storage_type_at_node(Node, Name),			  mnesia:lock(Tid, Opaque, {table, Name}, LockKind),			  {Name, Node, Storage}		  end,	    [{FTab,Node,Type}|NameNodes] = lists:map(Fun, FragNumbers),	    InitFun = fun(FixedSpec) -> mnesia:dirty_sel_init(Node,FTab,FixedSpec,Limit,Type) end,	    Res = mnesia:fun_select(Tid,Opaque,FTab,Pat,LockKind,FTab,InitFun,Limit,Node,Type),	    frag_sel_cont(Res, NameNodes, {Pat,LockKind,Limit})    end.select_cont(_Tid,_,{frag_cont, '$end_of_table', [],_}) -> '$end_of_table';select_cont(Tid,Ts,{frag_cont, '$end_of_table', [{Tab,Node,Type}|Rest],Args}) ->     {Spec,LockKind,Limit} = Args,    InitFun = fun(FixedSpec) -> mnesia:dirty_sel_init(Node,Tab,FixedSpec,Limit,Type) end,    Res = mnesia:fun_select(Tid,Ts,Tab,Spec,LockKind,Tab,InitFun,Limit,Node,Type),    frag_sel_cont(Res, Rest, Args);select_cont(Tid,Ts,{frag_cont, Cont, TabL, Args}) ->     frag_sel_cont(mnesia:select_cont(Tid,Ts,Cont),TabL,Args);select_cont(Tid,Ts,Else) ->  %% Not a fragmented table    mnesia:select_cont(Tid,Ts,Else).frag_sel_cont('$end_of_table', [],_) ->    '$end_of_table';frag_sel_cont('$end_of_table', TabL,Args) ->     {[], {frag_cont, '$end_of_table', TabL,Args}};frag_sel_cont({Recs,Cont}, TabL,Args) ->    {Recs, {frag_cont, Cont, TabL,Args}}.do_select(ActivityId, Opaque, Tab, MatchSpec, LockKind) ->    case ?catch_val({Tab, frag_hash}) of	{'EXIT', _} ->	    mnesia:select(ActivityId, Opaque, Tab, MatchSpec, LockKind);	FH ->	    FragNumbers = verify_numbers(FH,MatchSpec), 	    Fun = fun(Num) ->			  Name = n_to_frag_name(Tab, Num),			  Node = val({Name, where_to_read}),			  mnesia:lock(ActivityId, Opaque, {table, Name}, LockKind),			  {Name, Node}		  end,	    NameNodes = lists:map(Fun, FragNumbers),	    SelectAllFun =		fun(PatchedMatchSpec) ->			Match = [mnesia:dirty_select(Name, PatchedMatchSpec)				 || {Name, _Node} <- NameNodes],			lists:append(Match)		end,	    case [{Name, Node} || {Name, Node} <- NameNodes, Node /= node()] of		[] ->		    %% All fragments are local		    mnesia:fun_select(ActivityId, Opaque, Tab, MatchSpec, none, '_', SelectAllFun);		RemoteNameNodes ->		    Type = val({Tab,setorbag}),		    SelectFun =			fun(PatchedMatchSpec) ->				Ref = make_ref(),				Args = [self(), Ref, RemoteNameNodes, PatchedMatchSpec],				Pid = spawn_link(?MODULE, local_select, Args),				LocalMatch0 = [mnesia:dirty_select(Name, PatchedMatchSpec)					       || {Name, Node} <- NameNodes, Node == node()],				LocalMatch = case Type of						 ordered_set -> lists:merge(LocalMatch0);						 _ -> lists:append(LocalMatch0)					     end,				OldSelectFun = fun() -> SelectAllFun(PatchedMatchSpec) end,				local_collect(Ref, Pid, Type, LocalMatch, OldSelectFun)			end,		    mnesia:fun_select(ActivityId, Opaque, Tab, MatchSpec, none, '_', SelectFun)	    end    end.verify_numbers(FH,MatchSpec) ->    HashState = FH#frag_state.hash_state,    FragNumbers = 	case FH#frag_state.hash_module of	    HashMod when HashMod == ?DEFAULT_HASH_MOD ->		?DEFAULT_HASH_MOD:match_spec_to_frag_numbers(HashState, MatchSpec);	    HashMod ->		HashMod:match_spec_to_frag_numbers(HashState, MatchSpec)	end,    N = FH#frag_state.n_fragments,    VerifyFun = fun(F) when integer(F), F >= 1, F =< N -> false;		   (_F) -> true		end,    case catch lists:filter(VerifyFun, FragNumbers) of	[] ->	    FragNumbers;	BadFrags ->	    mnesia:abort({"match_spec_to_frag_numbers: Fragment numbers out of range",			  BadFrags, {range, 1, N}})    end.local_select(ReplyTo, Ref, RemoteNameNodes, MatchSpec) ->    RemoteNodes = mnesia_lib:uniq([Node || {_Name, Node} <- RemoteNameNodes]),    Args = [ReplyTo, Ref, RemoteNameNodes, MatchSpec],    {Replies, BadNodes} = rpc:multicall(RemoteNodes, ?MODULE, remote_select, Args),    case mnesia_lib:uniq(Replies) -- [ok] of	[] when BadNodes == [] ->	    ReplyTo ! {local_select, Ref, ok};	_ when BadNodes /= [] ->	    ReplyTo ! {local_select, Ref, {error, {node_not_running, hd(BadNodes)}}};	[{badrpc, {'EXIT', Reason}} | _] ->	    ReplyTo ! {local_select, Ref, {error, Reason}};	[Reason | _] ->	    ReplyTo ! {local_select, Ref, {error, Reason}}    end,    unlink(ReplyTo),    exit(normal).    remote_select(ReplyTo, Ref, NameNodes, MatchSpec) ->    do_remote_select(ReplyTo, Ref, NameNodes, MatchSpec).do_remote_select(ReplyTo, Ref, [{Name, Node} | NameNodes], MatchSpec) ->    if	Node == node() ->	    Res = (catch {ok, mnesia:dirty_select(Name, MatchSpec)}),	    ReplyTo ! {remote_select, Ref, Node, Res},	    do_remote_select(ReplyTo, Ref, NameNodes, MatchSpec);	true ->	    do_remote_select(ReplyTo, Ref, NameNodes, MatchSpec)    end;do_remote_select(_ReplyTo, _Ref, [], _MatchSpec) ->    ok.local_collect(Ref, Pid, Type, LocalMatch, OldSelectFun) ->    receive	{local_select, Ref, LocalRes} ->	    remote_collect(Ref, Type, LocalRes, LocalMatch, OldSelectFun);	{'EXIT', Pid, Reason} ->	    remote_collect(Ref, Type, {error, Reason}, [], OldSelectFun)    end.    remote_collect(Ref, Type, LocalRes = ok, Acc, OldSelectFun) ->    receive	{remote_select, Ref, Node, RemoteRes} ->	    case RemoteRes of		{ok, RemoteMatch} ->		    Matches = case Type of				  ordered_set -> lists:merge(RemoteMatch, Acc);				  _ -> RemoteMatch ++ Acc			      end,		    remote_collect(Ref, Type, LocalRes, Matches, OldSelectFun);		_ ->		    remote_collect(Ref, Type, {error, {node_not_running, Node}}, [], OldSelectFun)	    end    after 0 ->	    Acc    end;remote_collect(Ref, Type, LocalRes = {error, Reason}, _Acc, OldSelectFun) ->    receive	{remote_select, Ref, _Node, _RemoteRes} ->	    remote_collect(Ref, Type, LocalRes, [], OldSelectFun)    after 0 ->	    mnesia:abort(Reason)    end.%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Returns a list of cstructsexpand_cstruct(Cs) ->    expand_cstruct(Cs, create).    expand_cstruct(Cs, Mode) ->    Tab = Cs#cstruct.name,    Props = Cs#cstruct.frag_properties,    mnesia_schema:verify({alt, [nil, list]}, mnesia_lib:etype(Props),			 {badarg, Tab, Props}),     %% Verify keys    ValidKeys = [foreign_key, n_fragments, node_pool,		 n_ram_copies, n_disc_copies, n_disc_only_copies,		 hash_module, hash_state],    Keys = mnesia_schema:check_keys(Tab, Props, ValidKeys),    mnesia_schema:check_duplicates(Tab, Keys),    %% Pick fragmentation props    ForeignKey = mnesia_schema:pick(Tab, foreign_key, Props, undefined),    {ForeignKey2, N, Pool, DefaultNR, DefaultND, DefaultNDO} =	pick_props(Tab, Cs, ForeignKey),    %% Verify node_pool    BadPool = {bad_type, Tab, {node_pool, Pool}},    mnesia_schema:verify(list, mnesia_lib:etype(Pool), BadPool),    NotAtom = fun(A) when atom(A) -> false;		 (_A) -> true	      end,    mnesia_schema:verify([], [P || P <- Pool, NotAtom(P)], BadPool),    NR  = mnesia_schema:pick(Tab, n_ram_copies, Props, 0),    ND  = mnesia_schema:pick(Tab, n_disc_copies, Props, 0),    NDO = mnesia_schema:pick(Tab, n_disc_only_copies, Props, 0),        PosInt = fun(I) when integer(I), I >= 0 -> true;		(_I) -> false	     end,    mnesia_schema:verify(true, PosInt(NR),			 {bad_type, Tab, {n_ram_copies, NR}}),    mnesia_schema:verify(true, PosInt(ND),			 {bad_type, Tab, {n_disc_copies, ND}}),    mnesia_schema:verify(true, PosInt(NDO),			 {bad_type, Tab, {n_disc_only_copies, NDO}}),        %% Verify n_fragments    Cs2 = verify_n_fragments(N, Cs, Mode),        %% Verify hash callback

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?