bench_generate.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 668 行 · 第 1/2 页
ERL
668 行
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% File : bench_generate.hrl%%% Author : Hakan Mattsson <hakan@cslab.ericsson.se>%%% Purpose : Start request generators and collect statistics%%% Created : 21 Jun 2001 by Hakan Mattsson <hakan@cslab.ericsson.se>%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%-module(bench_generate).-author('hakan@cslab.ericsson.se').-include("bench.hrl").%% Public-export([start/1]).%% Internal-export([ monitor_init/2, generator_init/2, worker_init/1 ]).%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% The traffic generator%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -------------------------------------------------------------------%% Start request generators%% -------------------------------------------------------------------start(C) when record(C, config) -> MonPid = spawn_link(?MODULE, monitor_init, [C, self()]), receive {'EXIT', MonPid, Reason} -> exit(Reason); {monitor_done, MonPid, Res} -> Res end.monitor_init(C, Parent) when record(C, config) -> process_flag(trap_exit, true), %% net_kernel:monitor_nodes(true), %% BUGBUG: Needed in order to re-start generators Nodes = C#config.generator_nodes, PerNode = C#config.n_generators_per_node, Timer = C#config.generator_warmup, ?d("~n", []), ?d("Start ~p request generators each at ~p nodes...~n", [PerNode, length(Nodes)]), ?d("~n", []), warmup_sticky(C), ?d(" ~p seconds warmup...~n", [Timer div 1000]), Alive = spawn_generators(C, Nodes, PerNode), erlang:send_after(Timer, self(), warmup_done), monitor_loop(C, Parent, Alive, []).spawn_generators(C, Nodes, PerNode) -> [spawn_link(Node, ?MODULE, generator_init, [self(), C]) || Node <- Nodes, _ <- lists:seq(1, PerNode)]. warmup_sticky(C) -> %% Select one node per fragment as master node Tabs = [subscriber, session, server, suffix], Fun = fun(S) -> {[Node | _], _, Wlock} = nearest_node(S, transaction, C), Stick = fun() -> [mnesia:read({T, S}, S, Wlock) || T <- Tabs] end, Args = [transaction, Stick, [], mnesia_frag], rpc:call(Node, mnesia, activity, Args) end, Suffixes = lists:seq(0, C#config.n_fragments - 1), % Assume even distrib. lists:foreach(Fun, Suffixes).%% Main loop for benchmark monitormonitor_loop(C, Parent, Alive, Deceased) -> receive warmup_done -> multicall(Alive, reset_statistics), Timer = C#config.generator_duration, ?d(" ~p seconds actual benchmarking...~n", [Timer div 1000]), erlang:send_after(Timer, self(), measurement_done), monitor_loop(C, Parent, Alive, Deceased); measurement_done -> Stats = multicall(Alive, get_statistics), Timer = C#config.generator_cooldown, ?d(" ~p seconds cooldown...~n", [Timer div 1000]), erlang:send_after(Timer, self(), {cooldown_done, Stats}), monitor_loop(C, Parent, Alive, Deceased); {cooldown_done, Stats} -> multicall(Alive, stop), display_statistics(Stats, C), Parent ! {monitor_done, self(), ok}, unlink(Parent), exit(monitor_done); {nodedown, Node} -> monitor_loop(C, Parent, Alive, Deceased); {nodeup, Node} -> NeedsBirth = [N || N <- Deceased, N == Node], Born = spawn_generators(C, NeedsBirth, 1), monitor_loop(C, Parent, Born ++ Alive, Deceased -- NeedsBirth); {'EXIT', Pid, Reason} when Pid == Parent -> exit(Reason); {'EXIT', Pid, Reason} -> case lists:member(Pid, Alive) of true -> ?d("Generator on node ~p died: ~p~n", [node(Pid), Reason]), monitor_loop(C, Parent, Alive -- [Pid], [node(Pid) | Deceased]); false -> monitor_loop(C, Parent, Alive, Deceased) end end.%% Send message to a set of processes and wait for their repliesmulticall(Pids, Message) -> Send = fun(Pid) -> Ref = erlang:monitor(process, Pid), Pid ! {self(), Ref, Message}, {Pid, Ref} end, PidRefs = lists:map(Send, Pids), Collect = fun({Pid, Ref}) -> receive {'DOWN', Ref, process, Pid, Reason} -> {Pid, {'EXIT', Reason}}; {Pid, Ref, Reply} -> erlang:demonitor(Ref), {Pid, Reply} end end, lists:map(Collect, PidRefs).%% Initialize a traffic generatorgenerator_init(Monitor, C) -> process_flag(trap_exit, true), Tables = mnesia:system_info(tables), ok = mnesia:wait_for_tables(Tables, infinity), {_Mega, Sec, Micro} = erlang:now(), Uniq = lists:sum(binary_to_list(term_to_binary(make_ref()))), random:seed(Uniq, Sec, Micro), Counters = reset_counters(C, C#config.statistics_detail), SessionTab = ets:new(bench_sessions, [public, {keypos, 1}]), generator_loop(Monitor, C, SessionTab, Counters).%% Main loop for traffic generatorgenerator_loop(Monitor, C, SessionTab, Counters) -> receive {ReplyTo, Ref, get_statistics} -> Stats = get_counters(C, Counters), ReplyTo ! {self(), Ref, Stats}, generator_loop(Monitor, C, SessionTab, Counters); {ReplyTo, Ref, reset_statistics} -> Stats = get_counters(C, Counters), Counters2 = reset_counters(C, Counters), ReplyTo ! {self(), Ref, Stats}, generator_loop(Monitor, C, SessionTab, Counters2); {ReplyTo, Ref, stop} -> exit(shutdown); {'EXIT', Pid, Reason} when Pid == Monitor -> exit(Reason); {'EXIT', Pid, Reason} -> Node = node(Pid), ?d("Worker on node ~p(~p) died: ~p~n", [Node, node(), Reason]), Key = {worker,Node}, case get(Key) of undefined -> ignore; Pid -> erase(Key); _ -> ignore end, generator_loop(Monitor, C, SessionTab, Counters) after 0 -> {Name, {Nodes, Activity, Wlock}, Fun, CommitSessions} = gen_trans(C, SessionTab), Before = erlang:now(), Res = call_worker(Nodes, Activity, Fun, Wlock, mnesia_frag), After = erlang:now(), Elapsed = elapsed(Before, After), post_eval(Monitor, C, Elapsed, Res, Name, CommitSessions, SessionTab, Counters) end.%% Perform a transaction on a node near the datacall_worker([Node | _], Activity, Fun, Wlock, Mod) when Node == node() -> {Node, catch mnesia:activity(Activity, Fun, [Wlock], Mod)};call_worker([Node | _] = Nodes, Activity, Fun, Wlock, Mod) -> Key = {worker,Node}, case get(Key) of Pid when pid(Pid) -> Args = [Activity, Fun, [Wlock], Mod], Pid ! {activity, self(), Args}, receive {'EXIT', Pid, Reason} -> ?d("Worker on node ~p(~p) died: ~p~n", [Node, node(), Reason]), erase(Key), retry_worker(Nodes, Activity, Fun, Wlock, Mod, {'EXIT', Reason}); {activity_result, Pid, Result} -> case Result of {'EXIT', {aborted, {not_local, _}}} -> retry_worker(Nodes, Activity, Fun, Wlock, Mod, Result); _ -> {Node, Result} end end; undefined -> GenPid = self(), Pid = spawn_link(Node, ?MODULE, worker_init, [GenPid]), put(Key, Pid), call_worker(Nodes, Activity, Fun, Wlock, Mod) end.retry_worker([], Activity, Fun, Wlock, Mod, Reason) -> {node(), Reason};retry_worker([BadNode | SpareNodes], Activity, Fun, Wlock, Mod, Reason) -> Nodes = SpareNodes -- [BadNode], case Nodes of [] -> {BadNode, Reason}; [_] -> call_worker(Nodes, Activity, Fun, write, Mod); _ -> call_worker(Nodes, Activity, Fun, Wlock, Mod) end.worker_init(Parent) -> Tables = mnesia:system_info(tables), ok = mnesia:wait_for_tables(Tables, infinity), worker_loop(Parent).%% Main loop for remote workersworker_loop(Parent) -> receive {activity, Parent, [Activity, Fun, Extra, Mod]} -> Result = (catch mnesia:activity(Activity, Fun, Extra, Mod)), Parent ! {activity_result, self(), Result}, worker_loop(Parent) end.elapsed({Before1, Before2, Before3}, {After1, After2, After3}) -> After = After1 * 1000000000000 + After2 * 1000000 + After3, Before = Before1 * 1000000000000 + Before2 * 1000000 + Before3, After - Before.%% Lookup countersget_counters(C, {table, Tab}) -> ets:match_object(Tab, '_');get_counters(C, {NM, NC, NA, NB}) -> Trans = any, Node = somewhere, [{{Trans, n_micros, Node}, NM}, {{Trans, n_commits, Node}, NC}, {{Trans, n_aborts, Node}, NA}, {{Trans, n_branches_executed, Node}, NB}].% Clear all counters reset_counters(C, normal) -> {0, 0, 0, 0};reset_counters(C, {_, _, _, _}) -> reset_counters(C, normal);reset_counters(C, debug) -> CounterTab = ets:new(bench_pending, [public, {keypos, 1}]), reset_counters(C, {table, CounterTab});reset_counters(C, debug2) -> CounterTab = ets:new(bench_pending, [public, {keypos, 1}]), reset_counters(C, {table, CounterTab});reset_counters(C, {table, Tab} = Counters) -> Names = [n_micros, n_commits, n_aborts, n_branches_executed], Nodes = C#config.generator_nodes ++ C#config.table_nodes, TransTypes = [t1, t2, t3, t4, t5, ping], [ets:insert(Tab, {{Trans, Name, Node}, 0}) || Name <- Names, Node <- Nodes, Trans <- TransTypes], Counters.%% Determine the outcome of a transaction and increment the counterspost_eval(Monitor, C, Elapsed, {Node, Res}, Name, CommitSessions, SessionTab, {table, Tab} = Counters) -> case Res of {do_commit, BranchExecuted, _} -> incr(Tab, {Name, n_micros, Node}, Elapsed), incr(Tab, {Name, n_commits, Node}, 1), case BranchExecuted of true -> incr(Tab, {Name, n_branches_executed, Node}, 1), commit_session(CommitSessions), generator_loop(Monitor, C, SessionTab, Counters); false -> generator_loop(Monitor, C, SessionTab, Counters) end; {'EXIT', {aborted, {do_rollback, BranchExecuted, _}}} -> incr(Tab, {Name, n_micros, Node}, Elapsed), incr(Tab, {Name, n_aborts, Node}, 1), case BranchExecuted of true -> incr(Tab, {Name, n_branches_executed, Node}, 1), generator_loop(Monitor, C, SessionTab, Counters); false -> generator_loop(Monitor, C, SessionTab, Counters) end; _ -> ?d("Failed(~p): ~p~n", [Node, Res]), incr(Tab, {Name, n_micros, Node}, Elapsed), incr(Tab, {Name, n_aborts, Node}, 1), generator_loop(Monitor, C, SessionTab, Counters) end;post_eval(Monitor, C, Elapsed, {Node, Res}, Name, CommitSessions, SessionTab, {NM, NC, NA, NB}) -> case Res of {do_commit, BranchExecuted, _} -> case BranchExecuted of true -> commit_session(CommitSessions), generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC + 1, NA, NB + 1}); false -> generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC + 1, NA, NB}) end; {'EXIT', {aborted, {do_rollback, BranchExecuted, _}}} -> case BranchExecuted of true -> generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC, NA + 1, NB + 1}); false -> generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC, NA + 1, NB}) end; _ -> ?d("Failed: ~p~n", [Res]), generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC, NA + 1, NB}) end.incr(Tab, Counter, Incr) -> ets:update_counter(Tab, Counter, Incr).commit_session(no_fun) -> ignore;commit_session(Fun) when function(Fun) -> Fun().%% Randlomly choose a transaction type according to benchmar spec
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?