📄 mnemosyne_catalog.erl
字号:
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %% $Id$%%-module(mnemosyne_catalog).%%%================================================================%%%%%% INTRODUCTION%%%%%% This is the catalog function for the database query language%%% Mnemosyne in conjunction with the DBMS Mnesia.%%%%%% The purpose of the catalog is to maintain statistics about the%%% database. This statistics is used by the query optimizer when%%% reordering the query goals at query setup.%%%%%% %%% ABOUT THE CATALOG SERVER%%%%%% The catalog server is supervised by the mnesia application.%%%%%% The statistics is in form of one so called image per table which%%% is returned to the asker on request. The image "appears" in the%%% server in either of two ways:%%% %%% 1) When there is a request from the optimizer but there is no%%% image available for that table. In this case we will wait%%% maximum "?max_wait" ms for the image to be produced. It%%% this time limit fails, the "null_image" is returned but the%%% image will be entered in the catalog when it is available.%%%%%% 2) When a mnesia node has produced an image it is broadcasted%%% to all other running mnesia nodes.%%% %%% When there is a request for an image, the catalog subscribes on%%% table events for that table from mnesia. There is a counter per%%% table which is decremented by one for each update event that is%%% received from mnesia. When the counter reaches 0, a new image is%%% produced unless there have passed less than "?min_upd_interval"%%% seconds since the last update (either localy or broadcasted from%%% an other node). The counter is initialised to "?upd_limit" percent%%% of the number of records in the table.%%%%%% The updating is performed in low process priority.%%%%%% If a table has no local copy (neither disk nor ram) there is no%%% image produced, instead the updating is handed over to the node%%% given by "mnesia:table_info(Table,where_to_read)" unless that%%% node has a disk_only copy of the table. In that case we try to%%% find (in order) a node with disc_copies, ram_copies and finally%%% the disk_only copy.%%%%%% If the schema is changed in some serious way (for example the%%% arity of a table), the image is removed for that table.%%% %%% ERRORS%%%%%% On errors usually the "null_image" is returned since an absent or%%% even erroneous image does not infuence the correctness nor the%%% completness of a query, just the execution time.%%%%%%================================================================%%% external interface:-export([start/0, image/1, clear_statistics/1, clear_statistics/0, set_parameter/3, get_parameter/2 ]).%%% Administrative functions-export([is_running/0, info/0, infoS/0 ]).-behaviour(gen_server).%%% gen_server callbacks:-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).%%% doing apply on these:-export([mk_image/1, mk_insert_image/1, mk_insert_image_send/2 ]).-define(SERVER_NAME, ?MODULE). %% The registered name of the catalog server-define(CATALOG, ?SERVER_NAME). %% The name of the named ets catalog table%%%----------------------------------------------------------------%%% DEFAULT VALUES of parameters setable by users%%% -define(default_values, [ %% If we shall update this table on this node (yes|no) {do_local_upd, yes, ""}, %% Seconds after an update before next one is allowed: {min_upd_interval, 10, "s"}, %% Percentage of each table which may be updated without any %% recalculation: {upd_limit, 10, "%"}, %% Max time to wait for an image to be produced (milli seconds): {max_wait, 1000, "ms"} ]).%%%----------------------------------------------------------------%%-define(debug,1).-include("mnemosyne_debug.hrl").%%%================================================================%%% User Exportsstart() -> gen_server:start_link({local,?SERVER_NAME}, ?MODULE, [], []).%% returns {Cardinality, {I1,I2...Iarity}, I1*I2*...*Iarity}image(Table) -> WhereToRead = mnesia:table_info(Table,where_to_read), case catch read_image(Table) of none when WhereToRead==node() -> impatient_mk_insert_image(Table, read_parameter(Table,max_wait)); none when WhereToRead=/=nowhere -> %% Not local, ask someone else RemoteNode = case lists:member(WhereToRead, mnesia:table_info(Table,disc_only_copies))of true -> %% WhereToRead has a slow copy, try to find another select_remote_node(Table, [WhereToRead]); false -> WhereToRead end, case rpc:call(RemoteNode, ?MODULE, image, [Table]) of {badrpc, Reason} -> %% Serious problems, but not for the catalog! null_image(Table); I -> I end; none when WhereToRead==nowhere -> %% The returned null_image is the callers least problem now... null_image(Table); {'EXIT', _} -> exit(mnemosyne_not_running); I when WhereToRead==node() -> subscribe(Table), I; I -> I end.set_parameter(Table, Name, Value) -> case lists:keymember(Name,1,?default_values) of false -> {error, {no_such_parameter,Name}}; _ -> case catch ets:first(?CATALOG) of {'EXIT',_} -> {error, {node_not_running,node()}}; _ -> OldValue = read_parameter(Table, Name), insert_parameter(Table, Name, Value), %% Actions when updated case Name of upd_limit -> NewUpdLimit = update_limit(Table), Cntr = read_counter(Table,limit), if Cntr<NewUpdLimit -> ?SERVER_NAME ! {mnesia_table_event,{'',{Table},''}}; true -> %% should reduce NewUpdLimit by %% decremented amount insert_counter(Table,limit,NewUpdLimit) end; do_local_upd when Value==no -> gen_server:cast(?SERVER_NAME,{unsubscribe,Table}); do_local_upd when Value==yes -> gen_server:cast(?SERVER_NAME,{subscribe,Table}); _ -> ok end, OldValue end end.get_parameter(Table, Name) -> case lists:keymember(Name,1,?default_values) of false -> {error, {no_such_parameter,Name}}; _ -> case catch ets:first(?CATALOG) of {'EXIT',_} -> parameter_default_value(Name); _ -> read_parameter(Table, Name) end end.%%%----------------------------------------------------------------%%% Localsubscribe(Table) -> case read_parameter(Table,do_local_upd) of yes -> gen_server:cast(?SERVER_NAME, {subscribe,Table}); no -> ok end.%%%----------------------------------------------------------------%%% T is table or a list of tablesclear_statistics(Tab) when is_atom(Tab) -> ets:match_delete(?CATALOG, {{'_',Tab},'_'});clear_statistics(Tabs) when is_list(Tabs) -> lists:foreach(fun(Tab) -> clear_statistics(Tab) end, Tabs).clear_statistics() -> ets:match_delete(?CATALOG, '_'). %%%---------------- Adm ----------------is_running() -> whereis(?SERVER_NAME) =/= undefined.info() -> io:format('~s\n', [infoS()]).infoS() -> lists:flatten([ io_lib:format("Mnemosyne catalog on ~w ", [node()]), case length(element(2,element(2,lists:keysearch(messages,1,process_info(whereis(?SERVER_NAME)))))) of N when is_integer(N), N>0 -> io_lib:format(', ~w messages in queue\n', [N]); _ -> io_lib:nl() end, case mnemosyne_catalog:is_running() of true -> tbl_info(); false -> io_lib:format(" *** Not running!\n", []) end ]). %%%================================================================%%% gen_server callbacks-record(state, { }).-record(stat, %% statistics {latest = 0, sum = 0, n = 0 }).-record(stats, {time, gc, recl }).init(_) -> ets:new(?CATALOG, [set,public,named_table]), tell_friends({hi,node()}, nodes()), mnesia:subscribe({table,schema}), process_flag(priority, low), {ok, #state{}}.handle_call(_,_,State) -> {noreply,State}.handle_cast({hi,RemoteNode}, State) -> %% A new node has joined us. Welcome him/her by sending the images lists:foreach( fun(Tab) -> gen_server:cast({?SERVER_NAME,RemoteNode}, {remote_data, Tab, read_image(Tab), read_time_stamp(Tab)}) end, checked_tables()), {noreply, State};handle_cast({remote_data,Table,RemoteImage,RemoteTimeStamp}, State) -> %% An image from an other node case {RemoteTimeStamp,read_time_stamp(Table)} of {{_,_,_,Remote},{_,_,_,Local}} when Remote<Local -> %% We received a older image than our own do_nothing; {_,LocalTimeStamp} -> %% Assuming that the received TimeStamp is =/= undefined. update_counter(Table, n_upd_remote, 1, 1), insert_image(Table, RemoteImage), insert_time_stamp(Table, RemoteTimeStamp), insert_counter(Table, limit, update_limit(Table,RemoteImage)) end, {noreply, State};handle_cast({subscribe,Table}, State) -> mnesia:subscribe({table,Table}), {noreply,State};handle_cast({unsubscribe,Table}, State) -> mnesia:unsubscribe({table,Table}), {noreply,State};handle_cast(_,State) -> {noreply,State}.handle_info({mnesia_table_event,{delete,R,_}}, State) when element(1,R)==schema, is_atom(element(2,R)) -> clear_statistics (element (2,R)), {noreply,State};handle_info({mnesia_table_event,{_,R,_}}, State) when element(1,R)==schema -> %% Remove data for any of our checked tables that are changed %% in any important way lists:foreach( fun(Tab) -> OldArity = size(element(2,read_image(Tab))), NewArity = mnesia:table_info(Tab,arity) - 1, if OldArity =/= NewArity -> clear_statistics(Tab); true -> ok end end, checked_tables()), {noreply,State};handle_info({mnesia_table_event,{_,R,_}}, State) when is_atom(element(1,R)) -> Table = element(1,R), %% Updating a Mnesia table. Shall we re-calculate? case read_parameter(Table,do_local_upd) of yes -> case update_counter(Table,limit,-1) of N when is_integer(N),N=<0 -> %% Passed the update limit NowSecs = now_in_secs(), MinInterv = read_parameter(Table,min_upd_interval), case read_time_stamp(Table) of {_,_,_,Tupd} when (NowSecs-Tupd)<MinInterv -> %% Too early, but do not forget this SecsLeft = MinInterv - (NowSecs-Tupd), timer:send_after(SecsLeft*1000, {delayed_upd,Table}); _ -> %% enough time has passed since last update mk_insert_image(Table) end; _ -> no end; no -> mnesia:unsubscribe({table,Table}) end, {noreply,State};handle_info({delayed_upd,Table}, State) -> NowSecs = now_in_secs(), MinInterv = read_parameter(Table,min_upd_interval), case read_time_stamp(Table) of {_,_,_,Tupd} when (NowSecs-Tupd)<MinInterv -> %% Too early. Someone else must have updated this. Just skip it. skip; _ -> mk_insert_image(Table) end, {noreply,State};handle_info(_,State) -> {noreply,State}.terminate(_,State) -> ok.code_change(_,State,_) -> {ok,State}.%%%----------------------------------------------------------------checked_tables() -> lists:foldl( fun({{image,Table},_},Acc) -> ordsets:add_element(Table,Acc); (_,Acc) -> Acc end, ordsets:new(), ets:match_object(?CATALOG,{{image,'_'}, '_'})).%%%-----------------------------------------------------------------define(read(What, Table, DefaultValue), case ets:lookup(?CATALOG, {What,Table}) of [{_,Value}] -> Value; [] -> DefaultValue end ).-define(insert(What, Table, Value), ets:insert(?CATALOG, {{What,Table},Value}),Value ).read_image(Table) -> ?read(image, Table, none).insert_image(Table, Image) -> ?insert(image, Table, Image).read_time_stamp(Table) -> ?read(time_stamp, Table, undefined).insert_time_stamp(Table, TimeStamp) -> case read_time_stamp(Table) of {_,_,_,Tupd} -> Tpassed = element(4,TimeStamp) - Tupd, insert_stat(Table, upd_stat, Tpassed); _ -> ok end, ?insert(time_stamp, Table, TimeStamp).read_counter(Table, Name) -> ?read({counter,Name}, Table, undefined).insert_counter(Table, Name, Value) -> ?insert({counter,Name}, Table, Value).update_counter(Table, Name, Amount) -> case catch ets:update_counter(?CATALOG,{{counter,Name},Table},Amount) of {'EXIT',{badarg,{ets,update_counter,_}}} -> undefined; Val -> Val end.update_counter(Table, Name, Amount, Val_if_undefined) -> case update_counter(Table, Name, Amount) of undefined -> insert_counter(Table, Name, Val_if_undefined); N -> N end.read_parameter(Table, Name) -> ?read({parameter,Name}, Table, parameter_default_value(Name)).insert_parameter(Table, Name, Value) -> ?insert({parameter,Name}, Table, Value).read_exec_stat(Table) -> ?read(exec_stat, Table, #stats{}).insert_exec_stat(Table, Time, Ngc, Recl) -> S0 = read_exec_stat(Table), S = S0#stats{time = upd_stat(S0#stats.time, Time), gc = upd_stat(S0#stats.gc, Ngc), recl = upd_stat(S0#stats.recl, Recl) },
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -