pushersupplier_impl.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 1,052 行 · 第 1/3 页
ERL
1,052 行
?DBG("PROXY NOSUBSCRIPTION SEQUENCE/STRUCTURED: ~p~n",[EventsIn]), {noreply, State}; {Events,_} when ?is_Suspended(State) -> store_events(State, Events), {noreply, State}; {Events,_} when ?is_UnConnected(State) -> orber:dbg("[~p] PusherSupplier:callAny();~n" "Not connected, dropping event(s): ~p", [?LINE, Events], ?DEBUG_LEVEL), {noreply, State}; {[Event],_} when ?is_STRUCTURED(State) -> ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Event]), empty_db(State, ?addAndGet_Event(State, Event)); {[Event],_} when ?is_ANY(State) -> ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Event]), AnyEvent = any:create('CosNotification_StructuredEvent':tc(),Event), empty_db(State, ?addAndGet_Event(State, AnyEvent)); {Events,_} -> ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Events]), store_events(State, Events), lookup_and_push(State) end.%%-----------------------------------------------------------%% function : callAny%% Arguments: %% Returns : %%-----------------------------------------------------------callAny(_OE_THIS, OE_FROM, State, EventIn, Status) -> corba:reply(OE_FROM, ok), case cosNotification_eventDB:validate_event(?get_SubscribeData(State), EventIn, ?get_AllFilter(State), ?get_SubscribeDB(State), Status) of {[],_} -> ?DBG("PROXY NOSUBSCRIPTION ANY: ~p~n",[EventIn]), %% To be on the safe side, test if there are any events that not %% have been forwarded (should only be possible if StartTime is used). lookup_and_push(State); {Event,_} when ?is_Suspended(State), ?is_ANY(State) -> ?add_Event(State, Event), {noreply, State}; {Event,_} when ?is_Suspended(State) -> ?add_Event(State, ?not_CreateSE("","%ANY","",[],[],Event)), {noreply, State}; {Event,_} when ?is_UnConnected(State) -> orber:dbg("[~p] PusherSupplier:callAny();~n" "Not connected, dropping event: ~p", [?LINE, Event], ?DEBUG_LEVEL), {noreply, State}; {Event,_} when ?is_ANY(State) -> ?DBG("PROXY RECEIVED ANY: ~p~n",[Event]), %% We must store the event since there may be other events that should %% be delivered first, e.g., higher priority. empty_db(State, ?addAndGet_Event(State, Event)); {Event,_} when ?is_SEQUENCE(State) -> ?DBG("PROXY RECEIVED ANY==>SEQUENCE: ~p~n",[Event]), StrEvent = ?not_CreateSE("","%ANY","",[],[],Event), ?add_Event(State, StrEvent), lookup_and_push(State); {Event,_} -> ?DBG("PROXY RECEIVED ANY==>STRUCTURED: ~p~n",[Event]), StrEvent = ?not_CreateSE("","%ANY","",[],[],Event), empty_db(State, ?addAndGet_Event(State, StrEvent)) end.%% Lookup and push "the correct" amount of events.lookup_and_push(State) -> %% The boolean indicates, if false, that we will only push events if we have %% passed the BatchLimit. If true we will ignore this limit and push events %% anyway (typcially invoked when pacing limit passed). lookup_and_push(State, false).lookup_and_push(State, false) when ?is_SEQUENCE(State) -> case ?is_BatchLimitReached(State) of true -> case ?get_Events(State, ?get_BatchLimit(State)) of {[], _, _} -> ?DBG("BATCHLIMIT (~p) REACHED BUT NO EVENTS FOUND~n", [?get_BatchLimit(State)]), {noreply, State}; {Events, _, Keys} -> ?DBG("BATCHLIMIT (~p) REACHED, EVENTS FOUND: ~p~n", [?get_BatchLimit(State), Events]), case catch 'CosNotifyComm_SequencePushConsumer': push_structured_events(?get_Client(State), Events) of ok -> cosNotification_eventDB:delete_events(Keys), lookup_and_push(reset_cache(State), false); {'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ; record(E, 'NO_PERMISSION') ; record(E, 'CosEventComm_Disconnected') -> ?DBG("PUSH SUPPLIER CLIENT NO LONGER EXIST~n", []), 'CosNotification_Common':notify([{proxy, State#state.this}, {client, ?get_Client(State)}, {reason, {'EXCEPTION', E}}]), {stop, normal, State}; What when ?is_PersistentEvent(State), ?is_PersistentConnection(State) -> orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" "Client respond incorrect: ~p", [?LINE, What], ?DEBUG_LEVEL), check_cache(State); What when ?is_PersistentConnection(State) -> %% Here we should do something when we want to handle %% Persistent EventReliability. orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" "Client respond incorrect: ~p~n" "Dropping events: ~p", [?LINE, What, Events], ?DEBUG_LEVEL), cosNotification_eventDB:delete_events(Keys), {noreply, State}; WhatII -> orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" "Client respond incorrect: ~p~n" "Terminating and dropping events: ~p", [?LINE, WhatII, Events], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, State#state.this}, {client, ?get_Client(State)}, {reason, WhatII}]), {stop, normal, State} end end; _ -> ?DBG("BATCHLIMIT (~p) NOT REACHED~n",[?get_BatchLimit(State)]), {noreply, State} end;lookup_and_push(State, true) when ?is_SEQUENCE(State) -> case ?get_Events(State, ?get_BatchLimit(State)) of {[], _, _} -> ?DBG("PACELIMIT REACHED BUT NO EVENTS FOUND~n", []), {noreply, State}; {Events, _, Keys} -> ?DBG("PACELIMIT REACHED, EVENTS FOUND: ~p~n", [Events]), case catch 'CosNotifyComm_SequencePushConsumer': push_structured_events(?get_Client(State), Events) of ok -> cosNotification_eventDB:delete_events(Keys), lookup_and_push(reset_cache(State), false); {'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ; record(E, 'NO_PERMISSION') ; record(E, 'CosEventComm_Disconnected') -> orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" "Client no longer exists; terminating and dropping events: ~p", [?LINE, Events], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, State#state.this}, {client, ?get_Client(State)}, {reason, {'EXCEPTION', E}}]), {stop, normal, State}; What when ?is_PersistentEvent(State), ?is_PersistentConnection(State) -> orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" "Client respond incorrect: ~p", [?LINE, What], ?DEBUG_LEVEL), check_cache(State); What when ?is_PersistentConnection(State) -> %% Here we should do something when we want to handle %% Persistent EventReliability. orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" "Client respond incorrect: ~p~n" "Dropping events: ~p", [?LINE, What, Events], ?DEBUG_LEVEL), cosNotification_eventDB:delete_events(Keys), {noreply, State}; WhatII -> orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" "Client respond incorrect: ~p~n" "Terminating and dropping events: ~p", [?LINE, WhatII, Events], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, State#state.this}, {client, ?get_Client(State)}, {reason, WhatII}]), {stop, normal, State} end end;lookup_and_push(State, _) -> empty_db(State, ?get_Event(State)).%% Push all events stored while not connected or received in sequence.empty_db(State, {[], _, _}) -> {noreply, State};empty_db(State, {Event, _, Keys}) when ?is_STRUCTURED(State) -> case catch 'CosNotifyComm_StructuredPushConsumer': push_structured_event(?get_Client(State), Event) of ok -> cosNotification_eventDB:delete_events(Keys), NewState = reset_cache(State), empty_db(NewState, ?get_Event(NewState)); {'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ; record(E, 'NO_PERMISSION') ; record(E, 'CosEventComm_Disconnected') -> orber:dbg("[~p] PusherSupplier:empty_db();~n" "Client no longer exists; terminating and dropping: ~p", [?LINE, Event], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, State#state.this}, {client, ?get_Client(State)}, {reason, {'EXCEPTION', E}}]), {stop, normal, State}; What when ?is_PersistentEvent(State), ?is_PersistentConnection(State) -> orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" "Client respond incorrect: ~p", [?LINE, What], ?DEBUG_LEVEL), check_cache(State); What when ?is_PersistentConnection(State) -> %% Here we should do something when we want to handle %% Persistent EventReliability. orber:dbg("[~p] PusherSupplier:empty_db();~n" "Client respond incorrect: ~p~n" "Dropping event: ~p", [?LINE, What, Event], ?DEBUG_LEVEL), cosNotification_eventDB:delete_events(Keys), {noreply, State}; WhatII -> orber:dbg("[~p] PusherSupplier:empty_db();~n" "Client respond incorrect: ~p~n" "Terminating and dropping: ~p", [?LINE, WhatII, Event], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, State#state.this}, {client, ?get_Client(State)}, {reason, WhatII}]), {stop, normal, State} end;empty_db(State, {Event, _, Keys}) when ?is_ANY(State) -> case catch 'CosEventComm_PushConsumer':push(?get_Client(State), Event) of ok -> cosNotification_eventDB:delete_events(Keys), NewState = reset_cache(State), empty_db(NewState, ?get_Event(NewState)); {'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ; record(E, 'NO_PERMISSION') ; record(E, 'CosEventComm_Disconnected') -> orber:dbg("[~p] PusherSupplier:empty_db();~n" "Client no longer exists; terminating and dropping: ~p", [?LINE, Event], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, State#state.this}, {client, ?get_Client(State)}, {reason, {'EXCEPTION', E}}]), {stop, normal, State}; What when ?is_PersistentEvent(State), ?is_PersistentConnection(State) -> orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" "Client respond incorrect: ~p", [?LINE, What], ?DEBUG_LEVEL), check_cache(State); What when ?is_PersistentConnection(State) -> %% Here we should do something when we want to handle %% Persistent EventReliability. orber:dbg("[~p] PusherSupplier:empty_db();~n" "Client respond incorrect: ~p~n" "Dropping Event: ~p", [?LINE, What, Event], ?DEBUG_LEVEL), cosNotification_eventDB:delete_events(Keys), {noreply, State}; WhatII -> orber:dbg("[~p] PusherSupplier:empty_db();~n" "Client respond incorrect: ~p~n" "Terminating and dropping: ~p", [?LINE, WhatII, Event], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, State#state.this}, {client, ?get_Client(State)}, {reason, WhatII}]), {stop, normal, State} end.reset_cache(#state{cacheTimeout = undefined, cacheInterval = undefined} = State) -> State;reset_cache(State) -> stop_timer(State#state.cacheTimeout), stop_timer(State#state.cacheInterval), State#state{cacheTimeout = undefined, cacheInterval = undefined}.check_cache(#state{maxCache = Max, cacheTimeout = Timeout, cacheInterval = Interval} = State) -> case cosNotification_eventDB:status(State#state.eventDB, eventCounter) of Count when Count > Max -> %% Reached the upper limit, terminate. 'CosNotification_Common':notify([{proxy, State#state.this}, {client, State#state.client}, {reason, {max_events, Max}}]), {stop, normal, State}; _ when Timeout == undefined, Interval == undefined -> case {timer:send_interval(cosNotificationApp:interval_events(), cacheInterval), timer:send_after(cosNotificationApp:timeout_events(), cacheTimeout)} of {{ok, IntervalRef}, {ok, TimeoutRef}} -> {noreply, State#state{cacheTimeout = TimeoutRef, cacheInterval = IntervalRef}}; Error -> orber:dbg("[~p] PusherSupplier:check_cache();~n" "Unable to start timers: ~p", [?LINE, Error], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, State#state.this}, {client, State#state.client}, {reason, {timer, Error}}]), {stop, normal, State} end; _ -> %% Timers already started. {noreply, State} end.store_events(_State, []) -> ok;store_events(State, [Event|Rest]) when ?is_ANY(State) -> AnyEvent = any:create('CosNotification_StructuredEvent':tc(),Event), ?add_Event(State, AnyEvent), store_events(State, Rest);store_events(State, [Event|Rest]) -> ?add_Event(State, Event), store_events(State, Rest).%% Start timers which send a message each time we should push events. Only used%% when this objects is defined to supply sequences.start_timer(State) -> case ?get_PacingInterval(State) of 0 -> ?DBG("PUSH SUPPLIER STARTED NO TIMER (0), BATCH LIMIT: ~p~n", [?get_BatchLimit(State)]), State; PacInt -> case catch timer:send_interval(timer:seconds(PacInt), pacing) of {ok,PacTRef} -> ?DBG("PUSH SUPPLIER STARTED TIMER, BATCH LIMIT: ~p~n", [?get_BatchLimit(State)]), ?set_PacingTimer(State, PacTRef); What -> orber:dbg("[~p] PusherSupplier:start_timer();~n" "Unable to invoke timer:send_interval/2: ~p", [?LINE, What], ?DEBUG_LEVEL), corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) end end.stop_timer(undefined) -> ?DBG("PUSH SUPPLIER HAVE NO TIMER TO STOP~n",[]), ok;stop_timer(Timer) -> ?DBG("PUSH SUPPLIER STOPPED TIMER~n",[]), timer:cancel(Timer), ok. %%--------------- MISC FUNCTIONS, E.G. DEBUGGING -------------%%--------------- END OF MODULE ------------------------------
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?