mnesia_subscr.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 493 行 · 第 1/2 页
ERL
493 行
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %% $Id$%%-module(mnesia_subscr).-behaviour(gen_server).-export([start/0, set_debug_level/1, subscribe/2, unsubscribe/2, unsubscribe_table/1, subscribers/0, report_table_event/4, report_table_event/5, report_table_event/6 ]).%% gen_server callbacks-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3 ]).-include("mnesia.hrl").-import(mnesia_lib, [error/2]).-record(state, {supervisor, pid_tab}).start() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [self()], [{timeout, infinity}]).set_debug_level(Level) -> OldEnv = application:get_env(mnesia, debug), case mnesia_monitor:patch_env(debug, Level) of {error, Reason} -> {error, Reason}; NewLevel -> set_debug_level(NewLevel, OldEnv) end.set_debug_level(Level, OldEnv) -> case mnesia:system_info(is_running) of no when OldEnv == undefined -> none; no -> {ok, E} = OldEnv, E; _ -> Old = mnesia_lib:val(debug), Local = mnesia:system_info(local_tables), E = whereis(mnesia_event), Sub = fun(Tab) -> subscribe(E, {table, Tab}) end, UnSub = fun(Tab) -> unsubscribe(E, {table, Tab}) end, case Level of none -> lists:foreach(UnSub, Local); verbose -> lists:foreach(UnSub, Local); debug -> lists:foreach(UnSub, Local -- [schema]), Sub(schema); trace -> lists:foreach(Sub, Local) end, mnesia_lib:set(debug, Level), Old end.subscribe(ClientPid, system) -> change_subscr(activate, ClientPid, system);subscribe(ClientPid, {table, Tab}) -> change_subscr(activate, ClientPid, {table, Tab, simple});subscribe(ClientPid, {table, Tab, simple}) -> change_subscr(activate, ClientPid, {table, Tab, simple});subscribe(ClientPid, {table, Tab, detailed}) -> change_subscr(activate, ClientPid, {table, Tab, detailed});subscribe(_ClientPid, What) -> {error, {badarg, What}}.unsubscribe(ClientPid, system) -> change_subscr(deactivate, ClientPid, system);unsubscribe(ClientPid, {table, Tab}) -> change_subscr(deactivate, ClientPid, {table, Tab, simple});unsubscribe(ClientPid, {table, Tab, simple}) -> change_subscr(deactivate, ClientPid, {table, Tab, simple});unsubscribe(ClientPid, {table, Tab, detailed}) -> change_subscr(deactivate, ClientPid, {table, Tab, detailed});unsubscribe(_ClientPid, What) -> {error, {badarg, What}}.unsubscribe_table(Tab) -> call({change, {deactivate_table, Tab}}).change_subscr(Kind, ClientPid, What) -> call({change, {Kind, ClientPid, What}}).subscribers() -> [whereis(mnesia_event) | mnesia_lib:val(subscribers)].report_table_event(Tab, Tid, Obj, Op) -> case ?catch_val({Tab, commit_work}) of {'EXIT', _} -> ok; Commit -> case lists:keysearch(subscribers, 1, Commit) of false -> ok; {value, Subs} -> report_table_event(Subs, Tab, Tid, Obj, Op, undefined) end end.%% Backwards compatible for the moment when mnesia_tm get's updated!report_table_event(Subscr, Tab, Tid, Obj, Op) -> report_table_event(Subscr, Tab, Tid, Obj, Op, undefined).report_table_event({subscribers, S1, S2}, Tab, Tid, _Obj, clear_table, _Old) -> What = {delete, {schema, Tab}, Tid}, deliver(S1, {mnesia_table_event, What}), TabDef = mnesia_schema:cs2list(?catch_val({Tab, cstruct})), What2 = {write, {schema, Tab, TabDef}, Tid}, deliver(S1, {mnesia_table_event, What2}), What3 = {delete, schema, {schema, Tab}, [{schema, Tab, TabDef}], Tid}, deliver(S2, {mnesia_table_event, What3}), What4 = {write, schema, {schema, Tab, TabDef}, [], Tid}, deliver(S2, {mnesia_table_event, What4});report_table_event({subscribers, Subscr, []}, Tab, Tid, Obj, Op, _Old) -> What = {Op, patch_record(Tab, Obj), Tid}, deliver(Subscr, {mnesia_table_event, What});report_table_event({subscribers, S1, S2}, Tab, Tid, Obj, Op, Old) -> Standard = {Op, patch_record(Tab, Obj), Tid}, deliver(S1, {mnesia_table_event, Standard}), Extended = what(Tab, Tid, Obj, Op, Old), deliver(S2, Extended);%% Backwards compatible for the moment when mnesia_tm get's updated!report_table_event({subscribers, Subscr}, Tab, Tid, Obj, Op, Old) -> report_table_event({subscribers, Subscr, []}, Tab, Tid, Obj, Op, Old).patch_record(Tab, Obj) -> case Tab == element(1, Obj) of true -> Obj; false -> setelement(1, Obj, Tab) end.what(Tab, Tid, {RecName, Key}, delete, undefined) -> case catch mnesia_lib:db_get(Tab, Key) of Old when list(Old) -> %% Op only allowed for set table. {mnesia_table_event, {delete, Tab, {RecName, Key}, Old, Tid}}; _ -> %% Record just deleted by a dirty_op or %% the whole table has been deleted ignore end;what(Tab, Tid, Obj, delete, Old) -> {mnesia_table_event, {delete, Tab, Obj, Old, Tid}};what(Tab, Tid, Obj, delete_object, _Old) -> {mnesia_table_event, {delete, Tab, Obj, [Obj], Tid}};what(Tab, Tid, Obj, write, undefined) -> case catch mnesia_lib:db_get(Tab, element(2, Obj)) of Old when list(Old) -> {mnesia_table_event, {write, Tab, Obj, Old, Tid}}; {'EXIT', _} -> ignore end.deliver(_, ignore) -> ok;deliver([Pid | Pids], Msg) -> Pid ! Msg, deliver(Pids, Msg);deliver([], _Msg) -> ok.call(Msg) -> Pid = whereis(?MODULE), case Pid of undefined -> {error, {node_not_running, node()}}; Pid -> Res = gen_server:call(Pid, Msg, infinity), %% We get an exit signal if server dies receive {'EXIT', _Pid, _Reason} -> {error, {node_not_running, node()}} after 0 -> ignore end, Res end.%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Callback functions from gen_server%%----------------------------------------------------------------------%% Func: init/1%% Returns: {ok, State} |%% {ok, State, Timeout} |%% {stop, Reason}%%----------------------------------------------------------------------init([Parent]) -> process_flag(trap_exit, true), ClientPid = whereis(mnesia_event), link(ClientPid), mnesia_lib:verbose("~p starting: ~p~n", [?MODULE, self()]), Tab = ?ets_new_table(mnesia_subscr, [duplicate_bag, private]), ?ets_insert(Tab, {ClientPid, system}), {ok, #state{supervisor = Parent, pid_tab = Tab}}.%%----------------------------------------------------------------------%% Func: handle_call/3%% Returns: {reply, Reply, State} |%% {reply, Reply, State, Timeout} |%% {noreply, State} |%% {noreply, State, Timeout} |%% {stop, Reason, Reply, State} | (terminate/2 is called)%%----------------------------------------------------------------------handle_call({change, How}, _From, State) -> Reply = do_change(How, State#state.pid_tab), {reply, Reply, State};handle_call(Msg, _From, State) -> error("~p got unexpected call: ~p~n", [?MODULE, Msg]), {noreply, State}.
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?