pusherconsumer_impl.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 729 行 · 第 1/2 页

ERL
729
字号
%%-----------------------------------------------------------obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_OFF') ->    {reply, ?get_AllPublish(State), ?set_PublishType(State, false)};obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_ON') ->    {reply, ?get_AllPublish(State), ?set_PublishType(State, true)};obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_OFF') ->    {reply, [], ?set_PublishType(State, false)};obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_ON') ->    {reply, [], ?set_PublishType(State, true)};obtain_subscription_types(_,_,_,What) ->    orber:dbg("[~p] PusherConsumer:obtain_subscription_types(~p);~n"	      "Incorrect enumerant", [?LINE, What], ?DEBUG_LEVEL),    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : validate_event_qos%% Arguments: RequiredQoS - CosNotification::QoSProperties%% Returns  : ok | {'EXCEPTION', #'UnsupportedQoS'{}}%%            AvilableQoS - CosNotification::NamedPropertyRangeSeq (out)%%-----------------------------------------------------------validate_event_qos(_OE_THIS, _OE_FROM, State, RequiredQoS) ->    AvilableQoS = 'CosNotification_Common':validate_event_qos(RequiredQoS,							      ?get_LocalQoS(State)),    {reply, {ok, AvilableQoS}, State}.%%----- Inherit from CosNotification::QoSAdmin --------------%%----------------------------------------------------------%%% function : get_qos%% Arguments: %% Returns  : %%-----------------------------------------------------------get_qos(_OE_THIS, _OE_FROM, State) ->    {reply, ?get_GlobalQoS(State), State}.    %%----------------------------------------------------------%%% function : set_qos%% Arguments: QoS - CosNotification::QoSProperties, i.e.,%%            [#'Property'{name, value}, ...] where name eq. string()%%            and value eq. any().%% Returns  : ok | {'EXCEPTION', CosNotification::UnsupportedQoS}%%-----------------------------------------------------------set_qos(_OE_THIS, _OE_FROM, State, QoS) ->    {NewQoS, LQS} = 'CosNotification_Common':set_qos(QoS, ?get_BothQoS(State), 						     proxy, ?get_MyAdmin(State), 						     false),    NewState = ?update_EventDB(State, LQS),    {reply, ok, ?set_BothQoS(NewState, NewQoS, LQS)}.%%----------------------------------------------------------%%% function : validate_qos%% Arguments: Required_qos - CosNotification::QoSProperties%%            [#'Property'{name, value}, ...] where name eq. string()%%            and value eq. any().%% Returns  : {'EXCEPTION', CosNotification::UnsupportedQoS}%%            {ok, CosNotification::NamedPropertyRangeSeq}%%-----------------------------------------------------------validate_qos(_OE_THIS, _OE_FROM, State, Required_qos) ->    QoS = 'CosNotification_Common':validate_qos(Required_qos, ?get_BothQoS(State), 						proxy, ?get_MyAdmin(State), 						false),    {reply, {ok, QoS}, State}.%%----- Inherit from CosNotifyComm::NotifyPublish -----------%%----------------------------------------------------------%%% function : offer_change%% Arguments: Added - #'CosNotification_EventType'{}%%            Removed - #'CosNotification_EventType'{}%% Returns  : ok | %%            {'EXCEPTION', #'CosNotifyComm_InvalidEventType'{}}%%-----------------------------------------------------------offer_change(_OE_THIS, _OE_FROM, State, Added, Removed) ->    cosNotification_Filter:validate_types(Added),     cosNotification_Filter:validate_types(Removed),    %% On this "side" we don't really care about which    %% type of events the client will supply.     %% Perhaps, later on, if we want to check this against Filters    %% associated with this object we may change this approach, i.e., if    %% the filter will not allow passing certain event types. But the    %% user should see to that that situation never occurs. It would add    %% extra overhead. Also see PusherSupplier- and PullerSuppler-    %% 'subscription_change'.    update_publish(add, State, Added),    update_publish(remove, State, Removed),    case ?get_PublishType(State) of	true ->	    %% Perhaps we should handle exception here?!	    %% Probably not. Better to stay "on-line".	    catch 'CosNotifyComm_NotifySubscribe':			subscription_change(?get_Client(State), Added, Removed),	    ok;	_->	    ok    end,	    {reply, ok, State}.update_publish(_, _, [])->    ok;update_publish(add, State, [H|T]) ->    ?add_Publish(State, H),    update_publish(add, State, T);update_publish(remove, State, [H|T]) ->    ?del_Publish(State, H),    update_publish(remove, State, T).    %%----- Inherit from CosNotifyFilter::FilterAdmin -----------%%----------------------------------------------------------%%% function : add_filter%% Arguments: Filter - CosNotifyFilter::Filter%% Returns  : FilterID - long%%-----------------------------------------------------------add_filter(_OE_THIS, _OE_FROM, State, Filter) ->    'CosNotification_Common':type_check(Filter, 'CosNotifyFilter_Filter'),    FilterID = ?new_Id(State),    NewState = ?set_IdCounter(State, FilterID),    {reply, FilterID, ?add_Filter(NewState, FilterID, Filter)}.%%----------------------------------------------------------%%% function : remove_filter%% Arguments: FilterID - long%% Returns  : ok%%-----------------------------------------------------------remove_filter(_OE_THIS, _OE_FROM, State, FilterID) when integer(FilterID) ->    {reply, ok, ?del_Filter(State, FilterID)};remove_filter(_,_,_,What) ->    orber:dbg("[~p] PusherConsumer:remove_filter(~p); Not an integer", 	      [?LINE, What], ?DEBUG_LEVEL),    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : get_filter%% Arguments: FilterID - long%% Returns  : Filter - CosNotifyFilter::Filter |%%            {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}%%-----------------------------------------------------------get_filter(_OE_THIS, _OE_FROM, State, FilterID) when integer(FilterID) ->    {reply, ?get_Filter(State, FilterID), State};get_filter(_,_,_,What) ->    orber:dbg("[~p] PusherConsumer:get_filter(~p); Not an integer", 	      [?LINE, What], ?DEBUG_LEVEL),    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : get_all_filters%% Arguments: -%% Returns  : Filter - CosNotifyFilter::FilterIDSeq%%-----------------------------------------------------------get_all_filters(_OE_THIS, _OE_FROM, State) ->    {reply, ?get_AllFilterID(State), State}.%%----------------------------------------------------------%%% function : remove_all_filters%% Arguments: -%% Returns  : ok%%-----------------------------------------------------------remove_all_filters(_OE_THIS, _OE_FROM, State) ->    {reply, ok, ?del_AllFilter(State)}.%%----- Inherit from CosEventComm::PushConsumer -------------%%----------------------------------------------------------%%% function : disconnect_push_consumer%% Arguments: -%% Returns  : ok%%-----------------------------------------------------------disconnect_push_consumer(_OE_THIS, _OE_FROM, State) ->    {stop, normal, ok, ?set_Unconnected(State)}.%%----------------------------------------------------------%%% function : push%% Arguments: AnyEvent%% Returns  : ok | %%-----------------------------------------------------------push(OE_THIS, OE_FROM, State, Event) when ?is_ANY(State) ->    corba:reply(OE_FROM, ok),    case {?not_isConvertedStructured(Event),	  cosNotification_eventDB:filter_events([Event], ?get_AllFilter(State))} of	{_, {[],_}} when ?is_ANDOP(State) ->	    {noreply, State};	{true, {[],[_]}} ->	    %% Is OR and converted, change back and forward to Admin	    forward(seq, ?get_MyAdmin(State), State, [any:get_value(Event)], 		    'MATCH', OE_THIS);	{_, {[],[_]}} ->	    %% Is OR and not converted, forward to Admin	    forward(any, ?get_MyAdmin(State), State, Event, 'MATCH', OE_THIS);	{true, {[_],_}} when ?is_ANDOP(State) ->	    %% Is AND and converted, change back and forward to Admin	    forward(seq, ?get_MyAdmin(State), State, [any:get_value(Event)], 		    'MATCH', OE_THIS);	{true, {[_],_}} ->	    %% Is OR and converted, change back and forward to Channel	    forward(seq, ?get_MyChannel(State), State, [any:get_value(Event)], 		    'MATCHED', OE_THIS);	{_, {[_],_}} when ?is_ANDOP(State) ->	    %% Is AND and not converted, forward to Admin	    forward(any, ?get_MyAdmin(State), State, Event, 'MATCH', OE_THIS);	_ ->	    %% Is OR and not converted, forward to Channel	    forward(any, ?get_MyChannel(State), State, Event, 'MATCHED', OE_THIS)    end;push(_, _, _, _) ->    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----- Inherit from CosNotifyComm::SequencePushConsumer ----%%----------------------------------------------------------%%% function : disconnect_sequence_push_consumer%% Arguments: -%% Returns  : ok%%-----------------------------------------------------------disconnect_sequence_push_consumer(_OE_THIS, _OE_FROM, State) ->    {stop, normal, ok, ?set_Unconnected(State)}.%%----------------------------------------------------------%%% function : push_structured_events%% Arguments: CosNotification::EventBatch%% Returns  : ok | %%-----------------------------------------------------------push_structured_events(OE_THIS, OE_FROM, State, Events) when ?is_SEQUENCE(State) ->    corba:reply(OE_FROM, ok),    %% We cannot convert parts of the sequence to any, event though they    %% are converted from any to structured. Would be 'impossible' to send.    case cosNotification_eventDB:filter_events(Events, ?get_AllFilter(State)) of	{[],_} when ?is_ANDOP(State) ->	    {noreply, State};	{[],Failed} ->	    forward(seq, ?get_MyAdmin(State), State, Failed, 'MATCH', OE_THIS);	{Passed, _} when ?is_ANDOP(State) ->	    forward(seq, ?get_MyAdmin(State), State, Passed, 'MATCH', OE_THIS);	{Passed, []} ->	    forward(seq, ?get_MyChannel(State), State, Passed, 'MATCHED', OE_THIS);	{Passed, Failed} ->	    %% Is OR, send Passed to channel and Failed to Admin.	    forward(seq, ?get_MyChannel(State), State, Passed, 'MATCHED', OE_THIS),	    forward(seq, ?get_MyAdmin(State), State, Failed, 'MATCH', OE_THIS)    end;push_structured_events(_,_,_,_) ->    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----- Inherit from CosNotifyComm::StructuredPushConsumer --%%----------------------------------------------------------%%% function : disconnect_structured_push_consumer%% Arguments: -%% Returns  : ok%%-----------------------------------------------------------disconnect_structured_push_consumer(_OE_THIS, _OE_FROM, State) ->    {stop, normal, ok, ?set_Unconnected(State)}.%%----------------------------------------------------------%%% function : push_structured_event%% Arguments: CosNotification::StructuredEvent%% Returns  : ok | %%-----------------------------------------------------------push_structured_event(OE_THIS, OE_FROM, State, Event) when ?is_STRUCTURED(State) ->    corba:reply(OE_FROM, ok),    case {?not_isConvertedAny(Event),	  cosNotification_eventDB:filter_events([Event], ?get_AllFilter(State))} of	{_, {[],_}} when ?is_ANDOP(State) ->	    {noreply, State};	{true, {[],[_]}} ->	    %% Is OR and converted, change back and forward to Admin	    forward(any, ?get_MyAdmin(State), State, 		    Event#'CosNotification_StructuredEvent'.remainder_of_body, 		    'MATCH', OE_THIS);	{_, {[],[_]}} ->	    %% Is OR and not converted, forward to Admin	    forward(seq, ?get_MyAdmin(State), State, [Event], 'MATCH', OE_THIS);	{true, {[_],_}} when ?is_ANDOP(State) ->	    %% Is AND and converted, change back and forward to Admin	    forward(any, ?get_MyAdmin(State), State, 		    Event#'CosNotification_StructuredEvent'.remainder_of_body, 		    'MATCH', OE_THIS);	{true, {[_],_}} ->	    %% Is OR and converted, change back and forward to Channel	    forward(any, ?get_MyChannel(State), State, 		    Event#'CosNotification_StructuredEvent'.remainder_of_body, 		    'MATCHED', OE_THIS);	{_, {[_],_}} when ?is_ANDOP(State) ->	    %% Is AND and not converted, forward to Admin	    forward(seq, ?get_MyAdmin(State), State, [Event], 'MATCH', OE_THIS);	_ ->	    %% Is OR and not converted, forward to Channel	    forward(seq, ?get_MyChannel(State), State, [Event], 'MATCHED', OE_THIS)    end;push_structured_event(_,_,_,_) ->    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%--------------- LOCAL FUNCTIONS ----------------------------find_obj({value, {_, Obj}}) -> Obj;find_obj(_) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}.find_ids(List) ->           find_ids(List, []).find_ids([], Acc) ->        Acc;find_ids([{I,_}|T], Acc) -> find_ids(T, [I|Acc]);find_ids(What, _) ->    orber:dbg("[~p] PusherConsumer:find_ids();~n"	      "Id corrupt: ~p", [?LINE, What], ?DEBUG_LEVEL),    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}).%% Delete a single object.%% The list don not differ, i.e., no filter removed, raise exception.delete_obj(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{});delete_obj(List,_) -> List.%% Forward eventsforward(any, SendTo, State, Event, Status, OE_THIS) ->    case catch oe_CosNotificationComm_Event:callAny(SendTo, Event, Status) of	ok ->	    ?DBG("PROXY FORWARD ANY: ~p~n",[Event]),	    {noreply, State};	{'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ;			      record(E, 'NO_PERMISSION') ;			      record(E, 'CosEventComm_Disconnected') ->	    orber:dbg("[~p] PusherConsumer:forward();~n"		      "Admin/Channel no longer exists; terminating and dropping: ~p", 		      [?LINE, Event], ?DEBUG_LEVEL),	    'CosNotification_Common':notify([{proxy, OE_THIS},					     {client, ?get_Client(State)}, 					     {reason, {'EXCEPTION', E}}]),	    {stop, normal, State};	R when ?is_PersistentConnection(State) ->	    orber:dbg("[~p] PusherConsumer:forward();~n"		      "Admin/Channel respond incorrect: ~p~n"		      "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL),	    {noreply, State};	R ->	    orber:dbg("[~p] PusherConsumer:forward();~n"		      "Admin/Channel respond incorrect: ~p~n"		      "Terminating and dropping: ~p", 		      [?LINE, R, Event], ?DEBUG_LEVEL),	    'CosNotification_Common':notify([{proxy, OE_THIS},					     {client, ?get_Client(State)}, 					     {reason, R}]),	    {stop, normal, State}    end;forward(seq, SendTo, State, Event, Status, OE_THIS) ->    case catch oe_CosNotificationComm_Event:callSeq(SendTo, Event, Status) of	ok ->	    {noreply, State};	{'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ;			      record(E, 'NO_PERMISSION') ;			      record(E, 'CosEventComm_Disconnected') ->	    ?DBG("ADMIN NO LONGER EXIST; DROPPING: ~p~n", [Event]),	    'CosNotification_Common':notify([{proxy, OE_THIS},					     {client, ?get_Client(State)}, 					     {reason, {'EXCEPTION', E}}]),	    {stop, normal, State};	R when ?is_PersistentConnection(State) ->	    orber:dbg("[~p] PusherConsumer:forward();~n"		      "Admin/Channel respond incorrect: ~p~n"		      "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL),	    {noreply, State};	R ->	    orber:dbg("[~p] PusherConsumer:forward();~n"		      "Admin/Channel respond incorrect: ~p~n"		      "Terminating and dropping: ~p", 		      [?LINE, R, Event], ?DEBUG_LEVEL),	    'CosNotification_Common':notify([{proxy, OE_THIS},					     {client, ?get_Client(State)}, 					     {reason, R}]),	    {stop, normal, State}    end.%%--------------- MISC FUNCTIONS, E.G. DEBUGGING -------------%%--------------- END OF MODULE ------------------------------

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?