pool.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 203 行

ERL
203
字号
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %%     $Id$%%-module(pool).%% Supplies a computational pool of processors.%% The chief user interface function here is get_node()%% Which returns the name of the nodes in the pool%% with the least load !!!!%% This function is callable from any node including the master%% That is part of the pool%% nodes are scheduled on a per usgae basis and per load basis,%% Whenever we use a node, we put at the end of the queue, and whenever%% a node report a change in load, we insert it accordingly% User interface Exports ...-export([start/1, 	 start/2, 	 stop/0,	 get_nodes/0, 	 get_nodes_and_load/0, 	 get_node/0,	 pspawn/3, 	 attach/1,	 pspawn_link/3]).%% Internal Exports -export([statistic_collector/0,	 do_spawn/4,	 init/1,	 handle_call/3,	 handle_cast/2,	 handle_info/2,	 terminate/2]).	 -import(gen_server, [call/2]).%% User interface %% Start up using the .hosts.erlang filestart(Name) ->        start(Name,[]).start(Name,Args) when is_atom(Name) ->    gen_server:start({global, pool_master}, pool, [], []),    Hosts = net_adm:host_file(),    Nodes = start_nodes(Hosts, Name, Args),    lists:foreach(fun attach/1, Nodes),    Nodes.%%%% Interface functions ...%%get_nodes() ->          get_elements(2,get_nodes_and_load()).attach(Node) ->         call({global, pool_master}, {attach, Node}).get_nodes_and_load() -> call({global, pool_master},get_nodes).get_node() ->           call({global, pool_master},get_node).pspawn(M,F,A) ->        call({global, pool_master}, 			     {spawn, group_leader(), M,F,A}).pspawn_link(M,F,A) ->   P = pspawn(M,F,A),link(P), P.start_nodes([], _, _) -> [];start_nodes([Host|Tail], Name, Args) ->     case slave:start(Host, Name, Args) of 	{error, R} ->	    io:format("Can't start node on host ~w due to ~w~n",[Host, R]),	    start_nodes(Tail, Name, Args);	{ok, Node} -> 	    [Node | start_nodes(Tail, Name, Args)]    end.stop() -> call({global, pool_master}, stop).get_elements(_Pos,[]) -> [];get_elements(Pos,[E|T]) -> [element(Pos,E) | get_elements(Pos,T)].stop_em([]) -> stopped;stop_em([N|Tail]) ->    rpc:cast(N,erlang,halt,[]),    stop_em(Tail).init([]) ->    process_flag(trap_exit, true),    spawn_link(pool, statistic_collector, []),    {ok,[{0,node()}]}.handle_call(get_nodes, _From, Nodes)->    {reply, Nodes, Nodes};handle_call(get_node, _From, [{Load,N}|Tail]) ->    {reply, N, lists:append(Tail, [{Load+1, N}])};handle_call({attach, Node}, _From, Nodes) ->    case lists:keysearch(Node,2,Nodes) of	{value,_} ->	    {reply, allready_attached, Nodes};	false ->	    erlang:monitor_node(Node, true),	    spawn_link(Node, pool, statistic_collector, []),	    {reply, attached, lists:append(Nodes,[{999999,Node}])}    end;handle_call({spawn,Gl, M, F, A}, _From, Nodes) ->    [{Load,N}|Tail] = Nodes,    Pid = spawn(N, pool, do_spawn, [Gl, M, F, A]),    {reply, Pid, lists:append(Tail, [{Load+1, N}]) };handle_call(stop, _From, Nodes) ->    %% clean up in terminate/2    {stop, normal, stopped, Nodes}.handle_cast(_, Nodes) ->    {noreply, Nodes}.handle_info({Node,load,Load}, Nodes) ->    Nodes2 = insert_node({Load,Node},Nodes),    {noreply, Nodes2};handle_info({nodedown, Node}, Nodes) ->    {noreply, lists:keydelete(Node,2,Nodes)};handle_info(_, Nodes) ->  %% The EXIT signals etc.etc    {noreply, Nodes}.terminate(_Reason, Nodes) ->    N = lists:delete(node(), get_elements(2, Nodes)),    stop_em(N),    ok.do_spawn(Gl, M, F, A) ->    group_leader(Gl,self()),    apply(M,F,A).insert_node({Load,Node},[{L,Node}|Tail]) when Load > L ->    %% We have a raised load here    pure_insert({Load,Node},Tail);insert_node({Load,Node},[{L,N}|Tail]) when Load =< L ->    %% Move forward in the list    T = lists:keydelete(Node,2,[{L,N}|Tail]),    [{Load,Node} | T];insert_node(Ln,[H|T]) ->    [H | insert_node(Ln,T)];insert_node(X,[]) ->          % Can't happen    error_logger:error_msg('Pool_master: Bad node list X=~w\n',[X]),    exit(crash).pure_insert({Load,Node},[]) ->    [{Load,Node}];pure_insert({Load,Node},[{L,N}|Tail]) when Load < L ->    [{Load,Node} , {L,N} | Tail];pure_insert(L,[H|T]) -> [H|pure_insert(L,T)].%% Really should not meassure the contributions from%% the back ground processes here .... which we do :-(%% We don';t have to monitor the master, since we're slaves anywaystatistic_collector() ->    statistic_collector(5).statistic_collector(0) -> exit(normal);statistic_collector(I) ->    sleep(300),      case global:whereis_name(pool_master) of	undefined ->	    statistic_collector(I-1);	M ->	    stat_loop(M,999999)    end.%% Do not tell the master about our load if it has not  changedstat_loop(M,Old) ->    sleep(2000),    case statistics(run_queue) of	Old ->	    stat_loop(M,Old);	NewLoad ->	    M ! {node(),load,NewLoad}, %% async 	    stat_loop(M,NewLoad)    end.sleep(I) -> receive after I -> ok end.        

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?