bench_generate.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 668 行 · 第 1/2 页

ERL
668
字号
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% File    : bench_generate.hrl%%% Author  : Hakan Mattsson <hakan@cslab.ericsson.se>%%% Purpose : Start request generators and collect statistics%%% Created : 21 Jun 2001 by Hakan Mattsson <hakan@cslab.ericsson.se>%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%-module(bench_generate).-author('hakan@cslab.ericsson.se').-include("bench.hrl").%% Public-export([start/1]).%% Internal-export([	 monitor_init/2,	 generator_init/2,	 worker_init/1	]).%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% The traffic generator%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -------------------------------------------------------------------%% Start request generators%% -------------------------------------------------------------------start(C) when record(C, config) ->    MonPid = spawn_link(?MODULE, monitor_init, [C, self()]),    receive	{'EXIT', MonPid, Reason} ->	    exit(Reason);	{monitor_done, MonPid, Res} ->	    Res    end.monitor_init(C, Parent) when record(C, config) ->    process_flag(trap_exit, true),    %% net_kernel:monitor_nodes(true), %% BUGBUG: Needed in order to re-start generators    Nodes     = C#config.generator_nodes,    PerNode   = C#config.n_generators_per_node,    Timer     = C#config.generator_warmup,    ?d("~n", []),    ?d("Start ~p request generators each at ~p nodes...~n",       [PerNode, length(Nodes)]),    ?d("~n", []),    warmup_sticky(C),    ?d("    ~p seconds warmup...~n", [Timer div 1000]),    Alive = spawn_generators(C, Nodes, PerNode),    erlang:send_after(Timer, self(), warmup_done),    monitor_loop(C, Parent, Alive, []).spawn_generators(C, Nodes, PerNode) ->    [spawn_link(Node, ?MODULE, generator_init, [self(), C]) ||	Node <- Nodes,	_    <- lists:seq(1, PerNode)].    warmup_sticky(C) ->    %% Select one node per fragment as master node    Tabs = [subscriber, session, server, suffix],    Fun = fun(S) ->		  {[Node | _], _, Wlock} = nearest_node(S, transaction, C),		  Stick = fun() -> [mnesia:read({T, S}, S, Wlock) || T <- Tabs] end,		  Args = [transaction, Stick, [], mnesia_frag],		  rpc:call(Node, mnesia, activity, Args)	  end,    Suffixes = lists:seq(0, C#config.n_fragments - 1), % Assume even distrib.    lists:foreach(Fun, Suffixes).%% Main loop for benchmark monitormonitor_loop(C, Parent, Alive, Deceased) ->    receive        warmup_done ->            multicall(Alive, reset_statistics),	    	    Timer = C#config.generator_duration,	    ?d("    ~p seconds actual benchmarking...~n", [Timer div 1000]),	    erlang:send_after(Timer, self(), measurement_done),	    monitor_loop(C, Parent, Alive, Deceased);        measurement_done ->            Stats = multicall(Alive, get_statistics),	    	    Timer = C#config.generator_cooldown,	    ?d("    ~p seconds cooldown...~n", [Timer div 1000]),	    erlang:send_after(Timer, self(), {cooldown_done, Stats}),	    monitor_loop(C, Parent, Alive, Deceased);        {cooldown_done, Stats} ->            multicall(Alive, stop),            display_statistics(Stats, C),            Parent ! {monitor_done, self(), ok},	    unlink(Parent),	    exit(monitor_done);	{nodedown, Node} ->	    monitor_loop(C, Parent, Alive, Deceased);	{nodeup, Node} ->	    NeedsBirth = [N || N <- Deceased, N == Node],	    Born = spawn_generators(C, NeedsBirth, 1),	    monitor_loop(C, Parent, Born ++ Alive, Deceased -- NeedsBirth);        {'EXIT', Pid, Reason} when Pid == Parent ->	    exit(Reason);        {'EXIT', Pid, Reason} ->            case lists:member(Pid, Alive) of                true ->		    ?d("Generator on node ~p died: ~p~n", [node(Pid), Reason]),                    monitor_loop(C, Parent, Alive -- [Pid], [node(Pid) | Deceased]);                false ->                    monitor_loop(C, Parent, Alive, Deceased)            end    end.%% Send message to a set of processes and wait for their repliesmulticall(Pids, Message) ->    Send =        fun(Pid) ->                 Ref = erlang:monitor(process, Pid),                Pid ! {self(), Ref, Message},                {Pid, Ref}        end,    PidRefs = lists:map(Send, Pids),    Collect =        fun({Pid, Ref}) ->                receive                    {'DOWN', Ref, process, Pid, Reason} ->                        {Pid, {'EXIT', Reason}};                    {Pid, Ref, Reply} ->                        erlang:demonitor(Ref),                        {Pid, Reply}                end        end,    lists:map(Collect, PidRefs).%% Initialize a traffic generatorgenerator_init(Monitor, C) ->    process_flag(trap_exit, true),    Tables = mnesia:system_info(tables),    ok = mnesia:wait_for_tables(Tables, infinity),    {_Mega, Sec, Micro} = erlang:now(),    Uniq = lists:sum(binary_to_list(term_to_binary(make_ref()))),    random:seed(Uniq, Sec, Micro),    Counters = reset_counters(C, C#config.statistics_detail),    SessionTab = ets:new(bench_sessions, [public, {keypos, 1}]),    generator_loop(Monitor, C, SessionTab, Counters).%% Main loop for traffic generatorgenerator_loop(Monitor, C, SessionTab, Counters) ->    receive        {ReplyTo, Ref, get_statistics} ->	    Stats = get_counters(C, Counters),	    ReplyTo ! {self(), Ref, Stats},	    generator_loop(Monitor, C, SessionTab, Counters);        {ReplyTo, Ref, reset_statistics} ->	    Stats = get_counters(C, Counters),	    Counters2 = reset_counters(C, Counters),	    ReplyTo ! {self(), Ref, Stats},	    generator_loop(Monitor, C, SessionTab, Counters2);        {ReplyTo, Ref, stop} ->	    exit(shutdown);        {'EXIT', Pid, Reason} when Pid == Monitor ->	    exit(Reason);	{'EXIT', Pid, Reason} ->	    Node = node(Pid),	    ?d("Worker on node ~p(~p) died: ~p~n", [Node, node(), Reason]),	    Key = {worker,Node},	    case get(Key) of		undefined -> ignore;		Pid       -> erase(Key);		_         -> ignore	    end,	    generator_loop(Monitor, C, SessionTab, Counters)    after 0 ->	    {Name, {Nodes, Activity, Wlock}, Fun, CommitSessions} =		gen_trans(C, SessionTab),	    Before = erlang:now(),	    Res  = call_worker(Nodes, Activity, Fun, Wlock, mnesia_frag),	    After = erlang:now(),	    Elapsed = elapsed(Before, After),	    post_eval(Monitor, C, Elapsed, Res, Name, CommitSessions, SessionTab, Counters)    end.%% Perform a transaction on a node near the datacall_worker([Node | _], Activity, Fun, Wlock, Mod) when Node == node() ->    {Node, catch mnesia:activity(Activity, Fun, [Wlock], Mod)};call_worker([Node | _] = Nodes, Activity, Fun, Wlock, Mod) ->    Key = {worker,Node},    case get(Key) of	Pid when pid(Pid) ->	    Args = [Activity, Fun, [Wlock], Mod],	    Pid ! {activity, self(), Args},	    receive		{'EXIT', Pid, Reason} ->		    ?d("Worker on node ~p(~p) died: ~p~n", [Node, node(), Reason]),		    erase(Key),		    retry_worker(Nodes, Activity, Fun, Wlock, Mod, {'EXIT', Reason});		{activity_result, Pid, Result} ->		    case Result of			{'EXIT', {aborted, {not_local, _}}} ->			    retry_worker(Nodes, Activity, Fun, Wlock, Mod, Result);			_ ->			    {Node, Result}		    end	    end;	undefined ->	    GenPid = self(),	    Pid = spawn_link(Node, ?MODULE, worker_init, [GenPid]),	    put(Key, Pid),	    call_worker(Nodes, Activity, Fun, Wlock, Mod)    end.retry_worker([], Activity, Fun, Wlock, Mod, Reason) ->    {node(), Reason};retry_worker([BadNode | SpareNodes], Activity, Fun, Wlock, Mod, Reason) ->    Nodes = SpareNodes -- [BadNode],    case Nodes of	[] ->	    {BadNode, Reason};	[_] ->	    call_worker(Nodes, Activity, Fun, write, Mod);	_ ->	    call_worker(Nodes, Activity, Fun, Wlock, Mod)    end.worker_init(Parent) ->    Tables = mnesia:system_info(tables),    ok = mnesia:wait_for_tables(Tables, infinity),    worker_loop(Parent).%% Main loop for remote workersworker_loop(Parent) ->    receive	{activity, Parent, [Activity, Fun, Extra, Mod]} ->	    Result = (catch mnesia:activity(Activity, Fun, Extra, Mod)),	    Parent ! {activity_result, self(), Result},	    worker_loop(Parent)    end.elapsed({Before1, Before2, Before3}, {After1, After2, After3}) ->    After  = After1  * 1000000000000  + After2  * 1000000 + After3,    Before = Before1 * 1000000000000  + Before2 * 1000000 + Before3,    After - Before.%% Lookup countersget_counters(C, {table, Tab}) ->    ets:match_object(Tab, '_');get_counters(C, {NM, NC, NA, NB}) ->    Trans = any,    Node  = somewhere,    [{{Trans, n_micros, Node}, NM},     {{Trans, n_commits, Node}, NC},     {{Trans, n_aborts, Node}, NA},     {{Trans, n_branches_executed, Node}, NB}].% Clear all counters    reset_counters(C, normal) ->    {0, 0, 0, 0};reset_counters(C, {_, _, _, _}) ->    reset_counters(C, normal);reset_counters(C, debug) ->    CounterTab = ets:new(bench_pending, [public, {keypos, 1}]),    reset_counters(C, {table, CounterTab});reset_counters(C, debug2) ->    CounterTab = ets:new(bench_pending, [public, {keypos, 1}]),    reset_counters(C, {table, CounterTab});reset_counters(C, {table, Tab} = Counters) ->    Names = [n_micros, n_commits, n_aborts, n_branches_executed],    Nodes = C#config.generator_nodes ++ C#config.table_nodes,    TransTypes = [t1, t2, t3, t4, t5, ping],    [ets:insert(Tab, {{Trans, Name, Node}, 0}) || Name <- Names,						  Node <- Nodes,						  Trans <- TransTypes],    Counters.%% Determine the outcome of a transaction and increment the counterspost_eval(Monitor, C, Elapsed, {Node, Res}, Name, CommitSessions, SessionTab, {table, Tab} = Counters) ->    case Res of	{do_commit, BranchExecuted, _} ->	    incr(Tab, {Name, n_micros, Node}, Elapsed),	    incr(Tab, {Name, n_commits, Node}, 1),	    case BranchExecuted of		true  ->		    incr(Tab, {Name, n_branches_executed, Node}, 1),		    commit_session(CommitSessions),		    generator_loop(Monitor, C, SessionTab, Counters);		false ->		    generator_loop(Monitor, C, SessionTab, Counters)	    end;	{'EXIT', {aborted, {do_rollback, BranchExecuted, _}}} ->	    incr(Tab, {Name, n_micros, Node}, Elapsed),	    incr(Tab, {Name, n_aborts, Node}, 1),	    case BranchExecuted of		true  ->		    incr(Tab, {Name, n_branches_executed, Node}, 1),		    generator_loop(Monitor, C, SessionTab, Counters);		false ->		    generator_loop(Monitor, C, SessionTab, Counters)	    end;	_ ->	    ?d("Failed(~p): ~p~n", [Node, Res]),	    incr(Tab, {Name, n_micros, Node}, Elapsed),	    incr(Tab, {Name, n_aborts, Node}, 1),	    generator_loop(Monitor, C, SessionTab, Counters)    end;post_eval(Monitor, C, Elapsed, {Node, Res}, Name, CommitSessions, SessionTab, {NM, NC, NA, NB}) ->    case Res of	{do_commit, BranchExecuted, _} ->	    case BranchExecuted of		true  ->		    commit_session(CommitSessions),		    generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC + 1, NA, NB + 1});		false ->		    generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC + 1, NA, NB})	    end;	{'EXIT', {aborted, {do_rollback, BranchExecuted, _}}} ->	    case BranchExecuted of		true  ->		    generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC, NA + 1, NB + 1});		false ->		    generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC, NA + 1, NB})	    end;	_ ->	    ?d("Failed: ~p~n", [Res]),	    generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC, NA + 1, NB})    end.incr(Tab, Counter, Incr) ->    ets:update_counter(Tab, Counter, Incr).commit_session(no_fun) ->    ignore;commit_session(Fun) when function(Fun) ->    Fun().%% Randlomly choose a transaction type according to benchmar spec

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?