pool.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 203 行
ERL
203 行
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %% $Id$%%-module(pool).%% Supplies a computational pool of processors.%% The chief user interface function here is get_node()%% Which returns the name of the nodes in the pool%% with the least load !!!!%% This function is callable from any node including the master%% That is part of the pool%% nodes are scheduled on a per usgae basis and per load basis,%% Whenever we use a node, we put at the end of the queue, and whenever%% a node report a change in load, we insert it accordingly% User interface Exports ...-export([start/1, start/2, stop/0, get_nodes/0, get_nodes_and_load/0, get_node/0, pspawn/3, attach/1, pspawn_link/3]).%% Internal Exports -export([statistic_collector/0, do_spawn/4, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). -import(gen_server, [call/2]).%% User interface %% Start up using the .hosts.erlang filestart(Name) -> start(Name,[]).start(Name,Args) when is_atom(Name) -> gen_server:start({global, pool_master}, pool, [], []), Hosts = net_adm:host_file(), Nodes = start_nodes(Hosts, Name, Args), lists:foreach(fun attach/1, Nodes), Nodes.%%%% Interface functions ...%%get_nodes() -> get_elements(2,get_nodes_and_load()).attach(Node) -> call({global, pool_master}, {attach, Node}).get_nodes_and_load() -> call({global, pool_master},get_nodes).get_node() -> call({global, pool_master},get_node).pspawn(M,F,A) -> call({global, pool_master}, {spawn, group_leader(), M,F,A}).pspawn_link(M,F,A) -> P = pspawn(M,F,A),link(P), P.start_nodes([], _, _) -> [];start_nodes([Host|Tail], Name, Args) -> case slave:start(Host, Name, Args) of {error, R} -> io:format("Can't start node on host ~w due to ~w~n",[Host, R]), start_nodes(Tail, Name, Args); {ok, Node} -> [Node | start_nodes(Tail, Name, Args)] end.stop() -> call({global, pool_master}, stop).get_elements(_Pos,[]) -> [];get_elements(Pos,[E|T]) -> [element(Pos,E) | get_elements(Pos,T)].stop_em([]) -> stopped;stop_em([N|Tail]) -> rpc:cast(N,erlang,halt,[]), stop_em(Tail).init([]) -> process_flag(trap_exit, true), spawn_link(pool, statistic_collector, []), {ok,[{0,node()}]}.handle_call(get_nodes, _From, Nodes)-> {reply, Nodes, Nodes};handle_call(get_node, _From, [{Load,N}|Tail]) -> {reply, N, lists:append(Tail, [{Load+1, N}])};handle_call({attach, Node}, _From, Nodes) -> case lists:keysearch(Node,2,Nodes) of {value,_} -> {reply, allready_attached, Nodes}; false -> erlang:monitor_node(Node, true), spawn_link(Node, pool, statistic_collector, []), {reply, attached, lists:append(Nodes,[{999999,Node}])} end;handle_call({spawn,Gl, M, F, A}, _From, Nodes) -> [{Load,N}|Tail] = Nodes, Pid = spawn(N, pool, do_spawn, [Gl, M, F, A]), {reply, Pid, lists:append(Tail, [{Load+1, N}]) };handle_call(stop, _From, Nodes) -> %% clean up in terminate/2 {stop, normal, stopped, Nodes}.handle_cast(_, Nodes) -> {noreply, Nodes}.handle_info({Node,load,Load}, Nodes) -> Nodes2 = insert_node({Load,Node},Nodes), {noreply, Nodes2};handle_info({nodedown, Node}, Nodes) -> {noreply, lists:keydelete(Node,2,Nodes)};handle_info(_, Nodes) -> %% The EXIT signals etc.etc {noreply, Nodes}.terminate(_Reason, Nodes) -> N = lists:delete(node(), get_elements(2, Nodes)), stop_em(N), ok.do_spawn(Gl, M, F, A) -> group_leader(Gl,self()), apply(M,F,A).insert_node({Load,Node},[{L,Node}|Tail]) when Load > L -> %% We have a raised load here pure_insert({Load,Node},Tail);insert_node({Load,Node},[{L,N}|Tail]) when Load =< L -> %% Move forward in the list T = lists:keydelete(Node,2,[{L,N}|Tail]), [{Load,Node} | T];insert_node(Ln,[H|T]) -> [H | insert_node(Ln,T)];insert_node(X,[]) -> % Can't happen error_logger:error_msg('Pool_master: Bad node list X=~w\n',[X]), exit(crash).pure_insert({Load,Node},[]) -> [{Load,Node}];pure_insert({Load,Node},[{L,N}|Tail]) when Load < L -> [{Load,Node} , {L,N} | Tail];pure_insert(L,[H|T]) -> [H|pure_insert(L,T)].%% Really should not meassure the contributions from%% the back ground processes here .... which we do :-(%% We don';t have to monitor the master, since we're slaves anywaystatistic_collector() -> statistic_collector(5).statistic_collector(0) -> exit(normal);statistic_collector(I) -> sleep(300), case global:whereis_name(pool_master) of undefined -> statistic_collector(I-1); M -> stat_loop(M,999999) end.%% Do not tell the master about our load if it has not changedstat_loop(M,Old) -> sleep(2000), case statistics(run_queue) of Old -> stat_loop(M,Old); NewLoad -> M ! {node(),load,NewLoad}, %% async stat_loop(M,NewLoad) end.sleep(I) -> receive after I -> ok end.
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?