⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mnemosyne_catalog.erl

📁 OTP是开放电信平台的简称
💻 ERL
📖 第 1 页 / 共 2 页
字号:
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %%     $Id$%%-module(mnemosyne_catalog).%%%================================================================%%%%%% INTRODUCTION%%%%%% This is the catalog function for the database query language%%% Mnemosyne in conjunction with the DBMS Mnesia.%%%%%% The purpose of the catalog is to maintain statistics about the%%% database.  This statistics is used by the query optimizer when%%% reordering the query goals at query setup.%%%%%% %%% ABOUT THE CATALOG SERVER%%%%%% The catalog server is supervised by the mnesia application.%%%%%% The statistics is in form of one so called image per table which%%% is returned to the asker on request. The image "appears" in the%%% server in either of two ways:%%% %%%     1) When there is a request from the optimizer but there is no%%%        image available for that table.  In this case we will wait%%%        maximum "?max_wait" ms for the image to be produced.  It%%%        this time limit fails, the "null_image" is returned but the%%%        image will be entered in the catalog when it is available.%%%%%%     2) When a mnesia node has produced an image it is broadcasted%%%        to all other running mnesia nodes.%%% %%%  When there is a request for an image, the catalog subscribes on%%%  table events for that table from mnesia.  There is a counter per%%%  table which is decremented by one for each update event that is%%%  received from mnesia.  When the counter reaches 0, a new image is%%%  produced unless there have passed less than "?min_upd_interval"%%%  seconds since the last update (either localy or broadcasted from%%%  an other node).  The counter is initialised to "?upd_limit" percent%%%  of the number of records in the table.%%%%%%  The updating is performed in low process priority.%%%%%%  If a table has no local copy (neither disk nor ram) there is no%%%  image produced, instead the updating is handed over to the node%%%  given by "mnesia:table_info(Table,where_to_read)" unless that%%%  node has a disk_only copy of the table. In that case we try to%%%  find (in order) a node with disc_copies, ram_copies and finally%%%  the disk_only copy.%%%%%%  If the schema is changed in some serious way (for example the%%%  arity of a table), the image is removed for that table.%%%  %%%  ERRORS%%%%%%  On errors usually the "null_image" is returned since an absent or%%%  even erroneous image does not infuence the correctness nor the%%%  completness of a query, just the execution time.%%%%%%================================================================%%% external interface:-export([start/0,	 image/1, 	 clear_statistics/1, clear_statistics/0,	 set_parameter/3, get_parameter/2	]).%%% Administrative functions-export([is_running/0,	 info/0, infoS/0	 ]).-behaviour(gen_server).%%% gen_server callbacks:-export([init/1, 	 handle_call/3, handle_cast/2, handle_info/2,	 terminate/2, code_change/3]).%%% doing apply on these:-export([mk_image/1,	 mk_insert_image/1,	 mk_insert_image_send/2	]).-define(SERVER_NAME, ?MODULE).  %% The registered name of the catalog server-define(CATALOG, ?SERVER_NAME). %% The name of the named ets catalog table%%%----------------------------------------------------------------%%% DEFAULT VALUES of parameters setable by users%%% -define(default_values,	[	 %% If we shall update this table on this node (yes|no)	 {do_local_upd, yes, ""},	 %% Seconds after an update before next one is allowed:	 {min_upd_interval, 10, "s"},	 %% Percentage of each table which may be updated without any	 %% recalculation: 	 {upd_limit, 10, "%"},	 %% Max time to wait for an image to be produced (milli seconds):	 {max_wait, 1000, "ms"}	]).%%%----------------------------------------------------------------%%-define(debug,1).-include("mnemosyne_debug.hrl").%%%================================================================%%% 		User Exportsstart() -> gen_server:start_link({local,?SERVER_NAME}, ?MODULE, [], []).%% returns {Cardinality, {I1,I2...Iarity}, I1*I2*...*Iarity}image(Table) ->     WhereToRead = mnesia:table_info(Table,where_to_read),    case catch read_image(Table) of	none when WhereToRead==node() ->	    impatient_mk_insert_image(Table, read_parameter(Table,max_wait));		none when WhereToRead=/=nowhere ->		    %% Not local, ask someone else	    RemoteNode =		case lists:member(WhereToRead, 				  mnesia:table_info(Table,disc_only_copies))of		    true ->			%% WhereToRead has a slow copy, try to find another			select_remote_node(Table, [WhereToRead]);		    false ->			WhereToRead		end,	    case rpc:call(RemoteNode, ?MODULE, image, [Table]) of		{badrpc, Reason} -> 		    %% Serious problems, but not for the catalog!		    null_image(Table);		I ->		    I	    end;		none when WhereToRead==nowhere ->		    %% The returned null_image is the callers least problem now...	    null_image(Table);		{'EXIT', _} -> 	    exit(mnemosyne_not_running);	I when WhereToRead==node() ->	    subscribe(Table),	    I;		I ->	    I    end.set_parameter(Table, Name, Value) ->    case lists:keymember(Name,1,?default_values) of	false ->	    {error, {no_such_parameter,Name}};	_ ->	    case catch ets:first(?CATALOG) of		{'EXIT',_} -> {error, {node_not_running,node()}};		_ ->		    OldValue = read_parameter(Table, Name),		    insert_parameter(Table, Name, Value),		    %% Actions when updated		    case Name of			upd_limit ->			    NewUpdLimit = update_limit(Table),			    Cntr = read_counter(Table,limit),			    if 				Cntr<NewUpdLimit -> 				    ?SERVER_NAME ! 					{mnesia_table_event,{'',{Table},''}};			       true ->				    %% should reduce NewUpdLimit by				    %% decremented amount				    insert_counter(Table,limit,NewUpdLimit)			    end;			do_local_upd when Value==no ->			    gen_server:cast(?SERVER_NAME,{unsubscribe,Table});			do_local_upd when Value==yes ->			    gen_server:cast(?SERVER_NAME,{subscribe,Table});			_ -> ok		    end,		    OldValue	    end    end.get_parameter(Table, Name) ->    case lists:keymember(Name,1,?default_values) of	false ->	    {error, {no_such_parameter,Name}};	_ ->	    case catch ets:first(?CATALOG) of		{'EXIT',_} -> parameter_default_value(Name);		_ -> read_parameter(Table, Name)	    end    end.%%%----------------------------------------------------------------%%%		Localsubscribe(Table) ->    case read_parameter(Table,do_local_upd) of	yes -> gen_server:cast(?SERVER_NAME, {subscribe,Table});	no -> ok    end.%%%----------------------------------------------------------------%%% T is table or a list of tablesclear_statistics(Tab) when is_atom(Tab) ->    ets:match_delete(?CATALOG, {{'_',Tab},'_'});clear_statistics(Tabs) when is_list(Tabs) ->    lists:foreach(fun(Tab) -> clear_statistics(Tab) end, Tabs).clear_statistics() ->    ets:match_delete(?CATALOG, '_').    %%%---------------- Adm ----------------is_running() -> whereis(?SERVER_NAME) =/= undefined.info() ->    io:format('~s\n', [infoS()]).infoS() ->    lists:flatten([		   io_lib:format("Mnemosyne catalog on ~w ", [node()]),		   case length(element(2,element(2,lists:keysearch(messages,1,process_info(whereis(?SERVER_NAME)))))) of		       N when is_integer(N), N>0 -> 			   io_lib:format(', ~w messages in queue\n', [N]);		       _ -> 			   io_lib:nl()		   end,		   case mnemosyne_catalog:is_running() of		       true ->  tbl_info();		       false -> io_lib:format(" *** Not running!\n", [])		   end		  ]).    %%%================================================================%%% 		gen_server callbacks-record(state,	{	}).-record(stat, %% statistics	{latest = 0,	 sum = 0,	 n = 0	}).-record(stats,	{time,	 gc,	 recl	}).init(_) ->    ets:new(?CATALOG, [set,public,named_table]),    tell_friends({hi,node()}, nodes()),    mnesia:subscribe({table,schema}),    process_flag(priority, low),    {ok, #state{}}.handle_call(_,_,State) ->    {noreply,State}.handle_cast({hi,RemoteNode}, State) ->    %% A new node has joined us. Welcome him/her by sending the images    lists:foreach(      fun(Tab) -> 	      gen_server:cast({?SERVER_NAME,RemoteNode},			      {remote_data, Tab, read_image(Tab),			       read_time_stamp(Tab)})      end, checked_tables()),    {noreply, State};handle_cast({remote_data,Table,RemoteImage,RemoteTimeStamp}, State) ->    %% An image from an other node    case {RemoteTimeStamp,read_time_stamp(Table)} of	{{_,_,_,Remote},{_,_,_,Local}} when Remote<Local ->	    %% We received a older image than our own	    do_nothing;	{_,LocalTimeStamp} ->	    %% Assuming that the received TimeStamp is =/= undefined.	    update_counter(Table, n_upd_remote, 1, 1),	    insert_image(Table, RemoteImage),	    insert_time_stamp(Table, RemoteTimeStamp),	    insert_counter(Table, limit, update_limit(Table,RemoteImage))    end,    {noreply, State};handle_cast({subscribe,Table}, State) ->    mnesia:subscribe({table,Table}),    {noreply,State};handle_cast({unsubscribe,Table}, State) ->    mnesia:unsubscribe({table,Table}),    {noreply,State};handle_cast(_,State) ->    {noreply,State}.handle_info({mnesia_table_event,{delete,R,_}}, State) when element(1,R)==schema,							   is_atom(element(2,R))  ->    clear_statistics (element (2,R)),    {noreply,State};handle_info({mnesia_table_event,{_,R,_}}, State) when element(1,R)==schema ->    %% Remove data for any of our checked tables that are changed    %% in any important way    lists:foreach(      fun(Tab) ->	      OldArity = size(element(2,read_image(Tab))),	      NewArity = mnesia:table_info(Tab,arity) - 1,	      if		  OldArity =/= NewArity -> clear_statistics(Tab);		  true -> ok	      end      end, checked_tables()),    {noreply,State};handle_info({mnesia_table_event,{_,R,_}}, State) when is_atom(element(1,R)) ->    Table = element(1,R),    %% Updating a Mnesia table.  Shall we re-calculate?    case read_parameter(Table,do_local_upd) of	yes ->	    case update_counter(Table,limit,-1) of		N when is_integer(N),N=<0 -> %% Passed the update limit		    NowSecs = now_in_secs(),		    MinInterv = read_parameter(Table,min_upd_interval),		    case read_time_stamp(Table) of			{_,_,_,Tupd} when (NowSecs-Tupd)<MinInterv ->			    %% Too early, but do not forget this			    SecsLeft = MinInterv - (NowSecs-Tupd),			    timer:send_after(SecsLeft*1000,					     {delayed_upd,Table});			_ -> %% enough time has passed since last update			    mk_insert_image(Table)		    end;		_ ->		    no	    end;	no -> 	    mnesia:unsubscribe({table,Table})    end,    {noreply,State};handle_info({delayed_upd,Table}, State) ->    NowSecs = now_in_secs(),    MinInterv = read_parameter(Table,min_upd_interval),    case read_time_stamp(Table) of	{_,_,_,Tupd} when (NowSecs-Tupd)<MinInterv ->	    %% Too early. Someone else must have updated this. Just skip it.	    skip;	_ ->	    mk_insert_image(Table)    end,    {noreply,State};handle_info(_,State) ->    {noreply,State}.terminate(_,State) -> ok.code_change(_,State,_) -> {ok,State}.%%%----------------------------------------------------------------checked_tables() ->    lists:foldl(      fun({{image,Table},_},Acc) -> ordsets:add_element(Table,Acc);	 (_,Acc) -> Acc      end, ordsets:new(), ets:match_object(?CATALOG,{{image,'_'}, '_'})).%%%-----------------------------------------------------------------define(read(What, Table, DefaultValue),	case ets:lookup(?CATALOG, {What,Table}) of	    [{_,Value}] -> Value;	    [] -> DefaultValue	end       ).-define(insert(What, Table, Value),	ets:insert(?CATALOG, {{What,Table},Value}),Value       ).read_image(Table) ->    ?read(image, Table, none).insert_image(Table, Image) ->     ?insert(image, Table, Image).read_time_stamp(Table) ->    ?read(time_stamp, Table, undefined).insert_time_stamp(Table, TimeStamp) ->    case read_time_stamp(Table) of	{_,_,_,Tupd} ->	    Tpassed = element(4,TimeStamp) - Tupd,	    insert_stat(Table, upd_stat, Tpassed);	_ ->	    ok    end,    ?insert(time_stamp, Table, TimeStamp).read_counter(Table, Name) ->    ?read({counter,Name}, Table, undefined).insert_counter(Table, Name, Value) ->    ?insert({counter,Name}, Table, Value).update_counter(Table, Name, Amount) ->    case catch ets:update_counter(?CATALOG,{{counter,Name},Table},Amount) of	{'EXIT',{badarg,{ets,update_counter,_}}} -> undefined;	Val -> Val    end.update_counter(Table, Name, Amount, Val_if_undefined) ->    case update_counter(Table, Name, Amount) of	undefined -> insert_counter(Table, Name, Val_if_undefined);	N -> N    end.read_parameter(Table, Name) ->    ?read({parameter,Name}, Table, parameter_default_value(Name)).insert_parameter(Table, Name, Value) ->    ?insert({parameter,Name}, Table, Value).read_exec_stat(Table) ->    ?read(exec_stat, Table, #stats{}).insert_exec_stat(Table, Time, Ngc, Recl) ->    S0 = read_exec_stat(Table),    S = S0#stats{time = upd_stat(S0#stats.time, Time),		 gc = upd_stat(S0#stats.gc, Ngc),		 recl = upd_stat(S0#stats.recl, Recl)		},

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -