📄 megaco_trans_sender.erl
字号:
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %% $Id$%%%%----------------------------------------------------------------------%% Purpose: Transaction sender process%%-----------------------------------------------------------------------module(megaco_trans_sender).-export([start_link/5, stop/1, upgrade/2, send_req/3, send_reqs/3, send_ack/2, send_ack_now/2, send_pending/2, send_reply/2, timeout/2, ack_maxcount/2, req_maxcount/2, req_maxsize/2]).-export([system_continue/3, system_terminate/4, system_code_change/4]).-export([init/6]).-include_lib("megaco/include/megaco.hrl").-include("megaco_message_internal.hrl").-include_lib("megaco/src/app/megaco_internal.hrl").-record(state, { parent, conn_handle, timeout, req_sz = 0, req_maxsize, %% Max total size of all accumulated reqs req_maxcount, ack_maxcount, reqs = [], acks = [] }).%%%-----------------------------------------------------------------%%% Public API%%%-----------------------------------------------------------------start_link(CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks) -> ?d("start_link -> entry with" "~n CH: ~p" "~n To: ~p" "~n MaxSzReqs: ~p" "~n MaxNoReqs: ~p" "~n MaxNoAcks: ~p", [CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks]), Args = [self(), CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks], proc_lib:start_link(?MODULE, init, Args).stop(Pid) when pid(Pid) -> Pid ! stop, ok.upgrade(Pid, CH) when pid(Pid) -> Pid ! {upgrade, CH}, ok.send_req(Pid, Tid, Req) when pid(Pid), binary(Req) -> Pid ! {send_req, Tid, Req}, ok.send_reqs(Pid, Tids, Reqs) when pid(Pid), list(Tids), list(Reqs), length(Tids) == length(Reqs) -> Pid ! {send_reqs, Tids, Reqs}, ok.send_ack(Pid, Serial) when pid(Pid), integer(Serial) -> Pid ! {send_ack, Serial}, ok.send_ack_now(Pid, Serial) when pid(Pid), integer(Serial) -> Pid ! {send_ack_now, Serial}, ok.send_pending(Pid, Serial) when pid(Pid), integer(Serial) -> Pid ! {send_pending, Serial}, ok.send_reply(Pid, Reply) when pid(Pid), binary(Reply) -> Pid ! {send_reply, Reply}.ack_maxcount(Pid, Max) when pid(Pid), integer(Max) -> Pid ! {ack_maxcount, Max}, ok.req_maxcount(Pid, Max) when pid(Pid), integer(Max) -> Pid ! {req_maxcount, Max}, ok.req_maxsize(Pid, Max) when pid(Pid), integer(Max) -> Pid ! {req_maxsize, Max}, ok.timeout(Pid, Timeout) when pid(Pid) -> Pid ! {timeout, Timeout}, ok.%%%-----------------------------------------------------------------%%% Internal exports%%%-----------------------------------------------------------------init(Parent, CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks) -> ?d("init -> entry with" "~n Parent: ~p" "~n CH: ~p" "~n To: ~p" "~n MaxSzReqs: ~p" "~n MaxNoReqs: ~p" "~n MaxNoAcks: ~p", [Parent, CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks]), process_flag(trap_exit, true), proc_lib:init_ack(Parent, {ok, self()}), S = #state{parent = Parent, conn_handle = CH, timeout = To, req_maxsize = MaxSzReqs, req_maxcount = MaxNoReqs, ack_maxcount = MaxNoAcks}, loop(S, To).%%%-----------------------------------------------------------------%%% Internal functions%%%-----------------------------------------------------------------%% idle (= empty)loop(#state{reqs = [], acks = [], timeout = Timeout} = S, _) -> receive {send_ack, Serial} -> ?d("loop(empty) -> received send_ack [~w] request", [Serial]), loop(S#state{acks = [Serial]}, Timeout); {send_ack_now, Serial} -> ?d("loop(empty) -> received send_ack_now [~w] request", [Serial]), send_msg(S#state.conn_handle, [], [Serial]), loop(S, Timeout); {send_req, Tid, Req} when size(Req) >= S#state.req_maxsize -> ?d("loop(empty) -> received (big) send_req request ~w", [Tid]), send_msg(S#state.conn_handle, [{Tid, Req}], []), loop(S, Timeout); {send_req, Tid, Req} -> ?d("loop(empty) -> received send_req request ~w", [Tid]), loop(S#state{req_sz = size(Req), reqs = [{Tid,Req}]}, Timeout); {send_reqs, Tids, Reqs} -> ?d("loop(empty) -> received send_reqs request: ~w", [Tids]), {NewS, _} = handle_send_reqs(Tids, Reqs, S), loop(NewS, Timeout); {send_pending, Serial} -> ?d("loop(empty) -> received send_pending [~w] request", [Serial]), handle_send_result( send_pending(S#state.conn_handle, Serial, [], []) ), loop(S, Timeout); {send_reply, Reply} -> ?d("loop(empty) -> received send_reply request", []), #state{conn_handle = CH, req_maxsize = MaxSz} = S, handle_send_result( send_reply(CH, Reply, MaxSz, 0, [], []) ), loop(S, Timeout); {upgrade, CH} -> ?d("loop(empty) -> received upgrade request:" "~n CH: ~p", [CH]), loop(S#state{conn_handle = CH}, Timeout); {ack_maxcount, NewMax} -> ?d("loop(empty) -> received ack_maxcount request", []), loop(S#state{ack_maxcount = NewMax}, Timeout); {req_maxcount, NewMax} -> ?d("loop(empty) -> received req_maxcount request", []), loop(S#state{req_maxcount = NewMax}, Timeout); {req_maxsize, NewMax} -> ?d("loop(empty) -> received req_maxsize request", []), loop(S#state{req_maxsize = NewMax}, Timeout); {timeout, NewTimeout} -> ?d("loop(empty) -> received timeout request", []), loop(S#state{timeout = NewTimeout}, NewTimeout); stop -> ?d("loop(empty) -> received stop request", []), exit(normal); {system, From, Msg} -> ?d("loop(empty) -> received system message:" "~n From: ~p" "~n Msg: ~p", [From, Msg]), Parent = S#state.parent, sys:handle_system_msg(Msg, From, Parent, ?MODULE, [], {S, Timeout}); {'EXIT', Parent, Reason} when S#state.parent == Parent -> ?d("loop(empty) -> received upgrade request", []), exit(Reason); M -> warning_msg("received unexpected message (ignoring): " "~n~p", [M]), loop(S, Timeout) end;%% active (= some acks or reqs waiting to to be sent)loop(#state{reqs = Reqs, acks = Acks, ack_maxcount = MaxAcks, timeout = Timeout} = S, To) when To >= 0 -> Start = t(), receive {send_ack, Serial} when length(Acks) + 1 >= MaxAcks -> ?d("loop(active,~w) -> " "received [~w] send_ack [~w] request", [To, length(Acks), Serial]), handle_send_result( send_msg(S#state.conn_handle, Reqs, [Serial|Acks]) ), loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout); {send_ack, Serial} -> ?d("loop(active,~w) -> received send_ack [~w] request", [To, Serial]), loop(S#state{acks = [Serial|Acks]}, to(To, Start)); {send_ack_now, Serial} -> ?d("loop(active,~w) -> [~w,~w] " "received send_ack_now [~w] request", [To, length(Reqs), length(Acks), Serial]), handle_send_result( send_msg(S#state.conn_handle, Reqs, [Serial|Acks]) ), loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout); %% We need to check that this is not a resend!! %% In that case, send whatever we have in store {send_req, Tid, Req} -> ?d("loop(active,~w) -> received send_req request ~w", [To,Tid]), {NewS, NewT} = case handle_send_req(Tid, Req, S) of {S1, true} -> {S1, Timeout}; {S1, false} -> {S1, to(To, Start)} end, loop(NewS, NewT); {send_reqs, Tids, NewReqs} -> ?d("loop(active,~w) -> received send_reqs request ~w", [To,Tids]), {NewS, NewT} = case handle_send_reqs(Tids, NewReqs, S) of {S1, true} -> {S1, Timeout}; {S1, false} -> {S1, to(To, Start)} end, loop(NewS, NewT); {send_pending, Serial} -> ?d("loop(active,~w) -> received send_pending [~w] request", [To, Serial]), handle_send_result( send_pending(S#state.conn_handle, Serial, Reqs, Acks) ), loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout); {send_reply, Reply} -> ?d("loop(active,~w) -> received send_reply request", [To]), #state{conn_handle = CH, req_maxsize = MaxSz, req_sz = ReqSz} = S, handle_send_result( send_reply(CH, Reply, MaxSz, ReqSz, Reqs, Acks) ), loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout); {upgrade, CH} -> ?d("loop(active,~w) -> received upgrade request", [To]), loop(S#state{conn_handle = CH}, to(To, Start)); {req_maxsize, NewMax} -> ?d("loop(active,~w) -> received req_maxsize request", [To]), loop(S#state{req_maxsize = NewMax}, to(To, Start)); {req_maxcount, NewMax} -> ?d("loop(active,~w) -> received req_maxcount request", [To]), loop(S#state{req_maxcount = NewMax}, to(To, Start)); {ack_maxcount, NewMax} -> ?d("loop(active,~w) -> received ack_maxcount request", [To]), loop(S#state{ack_maxcount = NewMax}, to(To, Start)); {timeout, NewTimeout} when NewTimeout > Timeout -> ?d("loop(active,~w) -> received timeout request: ~w", [To, NewTimeout]), %% We need to recalculate To NewTo = NewTimeout - (Timeout - to(To, Start)), loop(S#state{timeout = NewTimeout}, NewTo); {timeout, NewTimeout} -> ?d("loop(active,~w) -> received timeout request: ~w", [To, NewTimeout]), %% We need to recalculate To NewTo = to(To, Start) - (Timeout - NewTimeout), loop(S#state{timeout = NewTimeout}, NewTo); stop -> ?d("loop(active,~w) -> received stop request", [To]), handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ), exit(normal); {system, From, Msg} -> ?d("loop(active,~w) -> received system message:" "~n From: ~p" "~n Msg: ~p", [To, From, Msg]), Parent = S#state.parent, sys:handle_system_msg(Msg, From, Parent, ?MODULE, [], {S, to(To, Start)}); {'EXIT', Parent, Reason} when S#state.parent == Parent -> ?d("loop(active,~w) -> received exit request", [To]), exit(Reason);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -