pushersupplier_impl.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 1,052 行 · 第 1/3 页

ERL
1,052
字号
	    ?DBG("PROXY NOSUBSCRIPTION SEQUENCE/STRUCTURED: ~p~n",[EventsIn]),	    {noreply, State};	{Events,_} when ?is_Suspended(State) ->	    store_events(State, Events),	    {noreply, State};	{Events,_} when ?is_UnConnected(State) ->	    orber:dbg("[~p] PusherSupplier:callAny();~n"		      "Not connected, dropping event(s): ~p", 		      [?LINE, Events], ?DEBUG_LEVEL),	    {noreply, State};	{[Event],_} when ?is_STRUCTURED(State) ->	    ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Event]),	    empty_db(State, ?addAndGet_Event(State, Event));	{[Event],_} when ?is_ANY(State) ->	    ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Event]),	    AnyEvent = any:create('CosNotification_StructuredEvent':tc(),Event),	    empty_db(State, ?addAndGet_Event(State, AnyEvent));	{Events,_} ->	    ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Events]),	    store_events(State, Events),	    lookup_and_push(State)    end.%%-----------------------------------------------------------%% function : callAny%% Arguments: %% Returns  : %%-----------------------------------------------------------callAny(_OE_THIS, OE_FROM, State, EventIn, Status) ->    corba:reply(OE_FROM, ok),    case cosNotification_eventDB:validate_event(?get_SubscribeData(State), EventIn,						?get_AllFilter(State),						?get_SubscribeDB(State),						Status) of	{[],_} ->	    ?DBG("PROXY NOSUBSCRIPTION ANY: ~p~n",[EventIn]),	    %% To be on the safe side, test if there are any events that not	    %% have been forwarded (should only be possible if StartTime is used).	    lookup_and_push(State);	{Event,_} when ?is_Suspended(State), ?is_ANY(State) ->	    ?add_Event(State, Event),	    {noreply, State};	{Event,_} when ?is_Suspended(State) ->	    ?add_Event(State, ?not_CreateSE("","%ANY","",[],[],Event)),	    {noreply, State};	{Event,_} when ?is_UnConnected(State) ->	    orber:dbg("[~p] PusherSupplier:callAny();~n"		      "Not connected, dropping event: ~p", 		      [?LINE, Event], ?DEBUG_LEVEL),	    {noreply, State};	{Event,_} when ?is_ANY(State) ->	    ?DBG("PROXY RECEIVED ANY: ~p~n",[Event]),	    %% We must store the event since there may be other events that should	    %% be delivered first, e.g., higher priority.	    empty_db(State, ?addAndGet_Event(State, Event));	{Event,_} when ?is_SEQUENCE(State) ->	    ?DBG("PROXY RECEIVED ANY==>SEQUENCE: ~p~n",[Event]),	    StrEvent = ?not_CreateSE("","%ANY","",[],[],Event),	    ?add_Event(State, StrEvent),	    lookup_and_push(State);	{Event,_} ->	    ?DBG("PROXY RECEIVED ANY==>STRUCTURED: ~p~n",[Event]),	    StrEvent = ?not_CreateSE("","%ANY","",[],[],Event),	    empty_db(State, ?addAndGet_Event(State, StrEvent))    end.%% Lookup and push "the correct" amount of events.lookup_and_push(State) ->    %% The boolean indicates, if false, that we will only push events if we have     %% passed the BatchLimit. If true we will ignore this limit and push events    %% anyway (typcially invoked when pacing limit passed).    lookup_and_push(State, false).lookup_and_push(State, false) when ?is_SEQUENCE(State) ->    case ?is_BatchLimitReached(State) of	true ->	    case ?get_Events(State, ?get_BatchLimit(State)) of		{[], _, _} ->		    ?DBG("BATCHLIMIT (~p) REACHED BUT NO EVENTS FOUND~n",				 [?get_BatchLimit(State)]),		    {noreply, State};		{Events, _, Keys} ->		    ?DBG("BATCHLIMIT (~p) REACHED, EVENTS FOUND: ~p~n",				 [?get_BatchLimit(State), Events]),		    case catch 'CosNotifyComm_SequencePushConsumer':			push_structured_events(?get_Client(State), Events) of			ok ->			    cosNotification_eventDB:delete_events(Keys),			    lookup_and_push(reset_cache(State), false);			{'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ;					      record(E, 'NO_PERMISSION') ;					      record(E, 'CosEventComm_Disconnected') ->			    ?DBG("PUSH SUPPLIER CLIENT NO LONGER EXIST~n", []),			    'CosNotification_Common':notify([{proxy, State#state.this},							     {client, ?get_Client(State)}, 							     {reason, {'EXCEPTION', E}}]),			    {stop, normal, State};			What when ?is_PersistentEvent(State),				  ?is_PersistentConnection(State) ->			    orber:dbg("[~p] PusherSupplier:lookup_and_push();~n"				      "Client respond incorrect: ~p", 				      [?LINE, What], ?DEBUG_LEVEL),			    check_cache(State);			What when ?is_PersistentConnection(State) ->			    %% Here we should do something when we want to handle			    %% Persistent EventReliability.			    orber:dbg("[~p] PusherSupplier:lookup_and_push();~n"				      "Client respond incorrect: ~p~n"				      "Dropping events: ~p", 				      [?LINE, What, Events], ?DEBUG_LEVEL),			    cosNotification_eventDB:delete_events(Keys),			    {noreply, State};			WhatII ->			    orber:dbg("[~p] PusherSupplier:lookup_and_push();~n"				      "Client respond incorrect: ~p~n"				      "Terminating and dropping events: ~p",				      [?LINE, WhatII, Events], ?DEBUG_LEVEL),			    'CosNotification_Common':notify([{proxy, State#state.this},							     {client, ?get_Client(State)}, 							     {reason, WhatII}]),			    {stop, normal, State}		    end	    end;	_ ->	    ?DBG("BATCHLIMIT (~p) NOT REACHED~n",[?get_BatchLimit(State)]),	    {noreply, State}    end;lookup_and_push(State, true) when ?is_SEQUENCE(State) ->    case ?get_Events(State, ?get_BatchLimit(State)) of	{[], _, _} ->	    ?DBG("PACELIMIT REACHED BUT NO EVENTS FOUND~n", []),	    {noreply, State};	{Events, _, Keys} ->	    ?DBG("PACELIMIT REACHED, EVENTS FOUND: ~p~n", [Events]),	    case catch 'CosNotifyComm_SequencePushConsumer':		push_structured_events(?get_Client(State), Events) of		ok ->		    cosNotification_eventDB:delete_events(Keys),		    lookup_and_push(reset_cache(State), false);		{'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ;				      record(E, 'NO_PERMISSION') ;				      record(E, 'CosEventComm_Disconnected') ->		    orber:dbg("[~p] PusherSupplier:lookup_and_push();~n"			      "Client no longer exists; terminating and dropping events: ~p", 			      [?LINE, Events], ?DEBUG_LEVEL),		    'CosNotification_Common':notify([{proxy, State#state.this},						     {client, ?get_Client(State)}, 						     {reason, {'EXCEPTION', E}}]),		    {stop, normal, State};		What when ?is_PersistentEvent(State),			  ?is_PersistentConnection(State) ->		    orber:dbg("[~p] PusherSupplier:lookup_and_push();~n"			      "Client respond incorrect: ~p", 			      [?LINE, What], ?DEBUG_LEVEL),		    check_cache(State);		What when ?is_PersistentConnection(State) ->		    %% Here we should do something when we want to handle		    %% Persistent EventReliability.		    orber:dbg("[~p] PusherSupplier:lookup_and_push();~n"			      "Client respond incorrect: ~p~n"			      "Dropping events: ~p", 			      [?LINE, What, Events], ?DEBUG_LEVEL),		    cosNotification_eventDB:delete_events(Keys),		    {noreply, State};		WhatII ->		    orber:dbg("[~p] PusherSupplier:lookup_and_push();~n"			      "Client respond incorrect: ~p~n"			      "Terminating and dropping events: ~p", 			      [?LINE, WhatII, Events], ?DEBUG_LEVEL),		    'CosNotification_Common':notify([{proxy, State#state.this},						     {client, ?get_Client(State)}, 						     {reason, WhatII}]),		    {stop, normal, State}	    end    end;lookup_and_push(State, _) ->    empty_db(State, ?get_Event(State)).%% Push all events stored while not connected or received in sequence.empty_db(State, {[], _, _}) ->    {noreply, State};empty_db(State, {Event, _, Keys}) when ?is_STRUCTURED(State) ->    case catch 'CosNotifyComm_StructuredPushConsumer':	push_structured_event(?get_Client(State), Event) of	ok ->	    cosNotification_eventDB:delete_events(Keys),	    NewState = reset_cache(State),	    empty_db(NewState, ?get_Event(NewState));	{'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ;			      record(E, 'NO_PERMISSION') ;			      record(E, 'CosEventComm_Disconnected') ->	    orber:dbg("[~p] PusherSupplier:empty_db();~n"		      "Client no longer exists; terminating and dropping: ~p", 		      [?LINE, Event], ?DEBUG_LEVEL),	    'CosNotification_Common':notify([{proxy, State#state.this},					     {client, ?get_Client(State)}, 					     {reason, {'EXCEPTION', E}}]),	    {stop, normal, State};	What when ?is_PersistentEvent(State),		  ?is_PersistentConnection(State) ->	    orber:dbg("[~p] PusherSupplier:lookup_and_push();~n"		      "Client respond incorrect: ~p", 		      [?LINE, What], ?DEBUG_LEVEL),	    check_cache(State);	What when ?is_PersistentConnection(State) ->	    %% Here we should do something when we want to handle	    %% Persistent EventReliability.	    orber:dbg("[~p] PusherSupplier:empty_db();~n"		      "Client respond incorrect: ~p~n"		      "Dropping event: ~p", 		      [?LINE, What, Event], ?DEBUG_LEVEL),	    cosNotification_eventDB:delete_events(Keys),	    {noreply, State};	WhatII ->	    orber:dbg("[~p] PusherSupplier:empty_db();~n"		      "Client respond incorrect: ~p~n"		      "Terminating and dropping: ~p", 		      [?LINE, WhatII, Event], ?DEBUG_LEVEL),	    'CosNotification_Common':notify([{proxy, State#state.this},					     {client, ?get_Client(State)}, 					     {reason, WhatII}]),	    {stop, normal, State}    end;empty_db(State, {Event, _, Keys}) when ?is_ANY(State) ->    case catch 'CosEventComm_PushConsumer':push(?get_Client(State), Event) of	ok ->	    cosNotification_eventDB:delete_events(Keys),	    NewState = reset_cache(State),	    empty_db(NewState, ?get_Event(NewState));	{'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ;			      record(E, 'NO_PERMISSION') ;			      record(E, 'CosEventComm_Disconnected') ->	    orber:dbg("[~p] PusherSupplier:empty_db();~n"		      "Client no longer exists; terminating and dropping: ~p", 		      [?LINE, Event], ?DEBUG_LEVEL),	    'CosNotification_Common':notify([{proxy, State#state.this},					     {client, ?get_Client(State)}, 					     {reason, {'EXCEPTION', E}}]),	    {stop, normal, State};	What when ?is_PersistentEvent(State),		  ?is_PersistentConnection(State) ->	    orber:dbg("[~p] PusherSupplier:lookup_and_push();~n"		      "Client respond incorrect: ~p", 		      [?LINE, What], ?DEBUG_LEVEL),	    check_cache(State);	What when ?is_PersistentConnection(State) ->	    %% Here we should do something when we want to handle	    %% Persistent EventReliability.	    orber:dbg("[~p] PusherSupplier:empty_db();~n"		      "Client respond incorrect: ~p~n"		      "Dropping Event: ~p", 		      [?LINE, What, Event], ?DEBUG_LEVEL),	    cosNotification_eventDB:delete_events(Keys),	    {noreply, State};	WhatII ->	    orber:dbg("[~p] PusherSupplier:empty_db();~n"		      "Client respond incorrect: ~p~n"		      "Terminating and dropping: ~p", 		      [?LINE, WhatII, Event], ?DEBUG_LEVEL),	    'CosNotification_Common':notify([{proxy, State#state.this},					     {client, ?get_Client(State)}, 					     {reason, WhatII}]),	    {stop, normal, State}    end.reset_cache(#state{cacheTimeout = undefined, 		   cacheInterval = undefined} = State) ->    State;reset_cache(State) ->    stop_timer(State#state.cacheTimeout),    stop_timer(State#state.cacheInterval),    State#state{cacheTimeout = undefined, 		cacheInterval = undefined}.check_cache(#state{maxCache = Max, cacheTimeout = Timeout, 		   cacheInterval = Interval} = State) ->    case cosNotification_eventDB:status(State#state.eventDB, eventCounter) of	Count when Count > Max ->	    %% Reached the upper limit, terminate.	    'CosNotification_Common':notify([{proxy, State#state.this},					     {client, State#state.client}, 					     {reason, {max_events, Max}}]),	    {stop, normal, State};	_ when Timeout == undefined, Interval == undefined ->	    case {timer:send_interval(cosNotificationApp:interval_events(), 				      cacheInterval),		  timer:send_after(cosNotificationApp:timeout_events(), 				   cacheTimeout)} of		{{ok, IntervalRef}, {ok, TimeoutRef}} ->		    {noreply, State#state{cacheTimeout = TimeoutRef, 					  cacheInterval = IntervalRef}};		Error ->		    orber:dbg("[~p] PusherSupplier:check_cache();~n"			      "Unable to start timers: ~p", 			      [?LINE, Error], ?DEBUG_LEVEL),		    'CosNotification_Common':notify([{proxy, State#state.this},						     {client, State#state.client}, 						     {reason, {timer, Error}}]),		    {stop, normal, State}	    end;	_ ->	    %% Timers already started.	    {noreply, State}    end.store_events(_State, []) ->    ok;store_events(State, [Event|Rest]) when ?is_ANY(State) ->    AnyEvent = any:create('CosNotification_StructuredEvent':tc(),Event),    ?add_Event(State, AnyEvent),    store_events(State, Rest);store_events(State, [Event|Rest]) ->    ?add_Event(State, Event),    store_events(State, Rest).%% Start timers which send a message each time we should push events. Only used%% when this objects is defined to supply sequences.start_timer(State) ->    case ?get_PacingInterval(State) of	0 ->	    ?DBG("PUSH SUPPLIER STARTED NO TIMER (0), BATCH LIMIT: ~p~n", 		 [?get_BatchLimit(State)]),	    State;	PacInt ->	    case catch timer:send_interval(timer:seconds(PacInt), pacing) of		{ok,PacTRef} ->		    ?DBG("PUSH SUPPLIER STARTED TIMER, BATCH LIMIT: ~p~n",			 [?get_BatchLimit(State)]),		    ?set_PacingTimer(State, PacTRef);		What ->		    orber:dbg("[~p] PusherSupplier:start_timer();~n"			      "Unable to invoke timer:send_interval/2: ~p", 			      [?LINE, What], ?DEBUG_LEVEL),		    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO})	    end    end.stop_timer(undefined) ->    ?DBG("PUSH SUPPLIER HAVE NO TIMER TO STOP~n",[]),    ok;stop_timer(Timer) ->    ?DBG("PUSH SUPPLIER STOPPED TIMER~n",[]),    timer:cancel(Timer),    ok.	    %%--------------- MISC FUNCTIONS, E.G. DEBUGGING -------------%%--------------- END OF MODULE ------------------------------

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?