pusherconsumer_impl.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 729 行 · 第 1/2 页
ERL
729 行
%%-----------------------------------------------------------obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_OFF') -> {reply, ?get_AllPublish(State), ?set_PublishType(State, false)};obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_ON') -> {reply, ?get_AllPublish(State), ?set_PublishType(State, true)};obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_OFF') -> {reply, [], ?set_PublishType(State, false)};obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_ON') -> {reply, [], ?set_PublishType(State, true)};obtain_subscription_types(_,_,_,What) -> orber:dbg("[~p] PusherConsumer:obtain_subscription_types(~p);~n" "Incorrect enumerant", [?LINE, What], ?DEBUG_LEVEL), corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : validate_event_qos%% Arguments: RequiredQoS - CosNotification::QoSProperties%% Returns : ok | {'EXCEPTION', #'UnsupportedQoS'{}}%% AvilableQoS - CosNotification::NamedPropertyRangeSeq (out)%%-----------------------------------------------------------validate_event_qos(_OE_THIS, _OE_FROM, State, RequiredQoS) -> AvilableQoS = 'CosNotification_Common':validate_event_qos(RequiredQoS, ?get_LocalQoS(State)), {reply, {ok, AvilableQoS}, State}.%%----- Inherit from CosNotification::QoSAdmin --------------%%----------------------------------------------------------%%% function : get_qos%% Arguments: %% Returns : %%-----------------------------------------------------------get_qos(_OE_THIS, _OE_FROM, State) -> {reply, ?get_GlobalQoS(State), State}. %%----------------------------------------------------------%%% function : set_qos%% Arguments: QoS - CosNotification::QoSProperties, i.e.,%% [#'Property'{name, value}, ...] where name eq. string()%% and value eq. any().%% Returns : ok | {'EXCEPTION', CosNotification::UnsupportedQoS}%%-----------------------------------------------------------set_qos(_OE_THIS, _OE_FROM, State, QoS) -> {NewQoS, LQS} = 'CosNotification_Common':set_qos(QoS, ?get_BothQoS(State), proxy, ?get_MyAdmin(State), false), NewState = ?update_EventDB(State, LQS), {reply, ok, ?set_BothQoS(NewState, NewQoS, LQS)}.%%----------------------------------------------------------%%% function : validate_qos%% Arguments: Required_qos - CosNotification::QoSProperties%% [#'Property'{name, value}, ...] where name eq. string()%% and value eq. any().%% Returns : {'EXCEPTION', CosNotification::UnsupportedQoS}%% {ok, CosNotification::NamedPropertyRangeSeq}%%-----------------------------------------------------------validate_qos(_OE_THIS, _OE_FROM, State, Required_qos) -> QoS = 'CosNotification_Common':validate_qos(Required_qos, ?get_BothQoS(State), proxy, ?get_MyAdmin(State), false), {reply, {ok, QoS}, State}.%%----- Inherit from CosNotifyComm::NotifyPublish -----------%%----------------------------------------------------------%%% function : offer_change%% Arguments: Added - #'CosNotification_EventType'{}%% Removed - #'CosNotification_EventType'{}%% Returns : ok | %% {'EXCEPTION', #'CosNotifyComm_InvalidEventType'{}}%%-----------------------------------------------------------offer_change(_OE_THIS, _OE_FROM, State, Added, Removed) -> cosNotification_Filter:validate_types(Added), cosNotification_Filter:validate_types(Removed), %% On this "side" we don't really care about which %% type of events the client will supply. %% Perhaps, later on, if we want to check this against Filters %% associated with this object we may change this approach, i.e., if %% the filter will not allow passing certain event types. But the %% user should see to that that situation never occurs. It would add %% extra overhead. Also see PusherSupplier- and PullerSuppler- %% 'subscription_change'. update_publish(add, State, Added), update_publish(remove, State, Removed), case ?get_PublishType(State) of true -> %% Perhaps we should handle exception here?! %% Probably not. Better to stay "on-line". catch 'CosNotifyComm_NotifySubscribe': subscription_change(?get_Client(State), Added, Removed), ok; _-> ok end, {reply, ok, State}.update_publish(_, _, [])-> ok;update_publish(add, State, [H|T]) -> ?add_Publish(State, H), update_publish(add, State, T);update_publish(remove, State, [H|T]) -> ?del_Publish(State, H), update_publish(remove, State, T). %%----- Inherit from CosNotifyFilter::FilterAdmin -----------%%----------------------------------------------------------%%% function : add_filter%% Arguments: Filter - CosNotifyFilter::Filter%% Returns : FilterID - long%%-----------------------------------------------------------add_filter(_OE_THIS, _OE_FROM, State, Filter) -> 'CosNotification_Common':type_check(Filter, 'CosNotifyFilter_Filter'), FilterID = ?new_Id(State), NewState = ?set_IdCounter(State, FilterID), {reply, FilterID, ?add_Filter(NewState, FilterID, Filter)}.%%----------------------------------------------------------%%% function : remove_filter%% Arguments: FilterID - long%% Returns : ok%%-----------------------------------------------------------remove_filter(_OE_THIS, _OE_FROM, State, FilterID) when integer(FilterID) -> {reply, ok, ?del_Filter(State, FilterID)};remove_filter(_,_,_,What) -> orber:dbg("[~p] PusherConsumer:remove_filter(~p); Not an integer", [?LINE, What], ?DEBUG_LEVEL), corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : get_filter%% Arguments: FilterID - long%% Returns : Filter - CosNotifyFilter::Filter |%% {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}%%-----------------------------------------------------------get_filter(_OE_THIS, _OE_FROM, State, FilterID) when integer(FilterID) -> {reply, ?get_Filter(State, FilterID), State};get_filter(_,_,_,What) -> orber:dbg("[~p] PusherConsumer:get_filter(~p); Not an integer", [?LINE, What], ?DEBUG_LEVEL), corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : get_all_filters%% Arguments: -%% Returns : Filter - CosNotifyFilter::FilterIDSeq%%-----------------------------------------------------------get_all_filters(_OE_THIS, _OE_FROM, State) -> {reply, ?get_AllFilterID(State), State}.%%----------------------------------------------------------%%% function : remove_all_filters%% Arguments: -%% Returns : ok%%-----------------------------------------------------------remove_all_filters(_OE_THIS, _OE_FROM, State) -> {reply, ok, ?del_AllFilter(State)}.%%----- Inherit from CosEventComm::PushConsumer -------------%%----------------------------------------------------------%%% function : disconnect_push_consumer%% Arguments: -%% Returns : ok%%-----------------------------------------------------------disconnect_push_consumer(_OE_THIS, _OE_FROM, State) -> {stop, normal, ok, ?set_Unconnected(State)}.%%----------------------------------------------------------%%% function : push%% Arguments: AnyEvent%% Returns : ok | %%-----------------------------------------------------------push(OE_THIS, OE_FROM, State, Event) when ?is_ANY(State) -> corba:reply(OE_FROM, ok), case {?not_isConvertedStructured(Event), cosNotification_eventDB:filter_events([Event], ?get_AllFilter(State))} of {_, {[],_}} when ?is_ANDOP(State) -> {noreply, State}; {true, {[],[_]}} -> %% Is OR and converted, change back and forward to Admin forward(seq, ?get_MyAdmin(State), State, [any:get_value(Event)], 'MATCH', OE_THIS); {_, {[],[_]}} -> %% Is OR and not converted, forward to Admin forward(any, ?get_MyAdmin(State), State, Event, 'MATCH', OE_THIS); {true, {[_],_}} when ?is_ANDOP(State) -> %% Is AND and converted, change back and forward to Admin forward(seq, ?get_MyAdmin(State), State, [any:get_value(Event)], 'MATCH', OE_THIS); {true, {[_],_}} -> %% Is OR and converted, change back and forward to Channel forward(seq, ?get_MyChannel(State), State, [any:get_value(Event)], 'MATCHED', OE_THIS); {_, {[_],_}} when ?is_ANDOP(State) -> %% Is AND and not converted, forward to Admin forward(any, ?get_MyAdmin(State), State, Event, 'MATCH', OE_THIS); _ -> %% Is OR and not converted, forward to Channel forward(any, ?get_MyChannel(State), State, Event, 'MATCHED', OE_THIS) end;push(_, _, _, _) -> corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----- Inherit from CosNotifyComm::SequencePushConsumer ----%%----------------------------------------------------------%%% function : disconnect_sequence_push_consumer%% Arguments: -%% Returns : ok%%-----------------------------------------------------------disconnect_sequence_push_consumer(_OE_THIS, _OE_FROM, State) -> {stop, normal, ok, ?set_Unconnected(State)}.%%----------------------------------------------------------%%% function : push_structured_events%% Arguments: CosNotification::EventBatch%% Returns : ok | %%-----------------------------------------------------------push_structured_events(OE_THIS, OE_FROM, State, Events) when ?is_SEQUENCE(State) -> corba:reply(OE_FROM, ok), %% We cannot convert parts of the sequence to any, event though they %% are converted from any to structured. Would be 'impossible' to send. case cosNotification_eventDB:filter_events(Events, ?get_AllFilter(State)) of {[],_} when ?is_ANDOP(State) -> {noreply, State}; {[],Failed} -> forward(seq, ?get_MyAdmin(State), State, Failed, 'MATCH', OE_THIS); {Passed, _} when ?is_ANDOP(State) -> forward(seq, ?get_MyAdmin(State), State, Passed, 'MATCH', OE_THIS); {Passed, []} -> forward(seq, ?get_MyChannel(State), State, Passed, 'MATCHED', OE_THIS); {Passed, Failed} -> %% Is OR, send Passed to channel and Failed to Admin. forward(seq, ?get_MyChannel(State), State, Passed, 'MATCHED', OE_THIS), forward(seq, ?get_MyAdmin(State), State, Failed, 'MATCH', OE_THIS) end;push_structured_events(_,_,_,_) -> corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----- Inherit from CosNotifyComm::StructuredPushConsumer --%%----------------------------------------------------------%%% function : disconnect_structured_push_consumer%% Arguments: -%% Returns : ok%%-----------------------------------------------------------disconnect_structured_push_consumer(_OE_THIS, _OE_FROM, State) -> {stop, normal, ok, ?set_Unconnected(State)}.%%----------------------------------------------------------%%% function : push_structured_event%% Arguments: CosNotification::StructuredEvent%% Returns : ok | %%-----------------------------------------------------------push_structured_event(OE_THIS, OE_FROM, State, Event) when ?is_STRUCTURED(State) -> corba:reply(OE_FROM, ok), case {?not_isConvertedAny(Event), cosNotification_eventDB:filter_events([Event], ?get_AllFilter(State))} of {_, {[],_}} when ?is_ANDOP(State) -> {noreply, State}; {true, {[],[_]}} -> %% Is OR and converted, change back and forward to Admin forward(any, ?get_MyAdmin(State), State, Event#'CosNotification_StructuredEvent'.remainder_of_body, 'MATCH', OE_THIS); {_, {[],[_]}} -> %% Is OR and not converted, forward to Admin forward(seq, ?get_MyAdmin(State), State, [Event], 'MATCH', OE_THIS); {true, {[_],_}} when ?is_ANDOP(State) -> %% Is AND and converted, change back and forward to Admin forward(any, ?get_MyAdmin(State), State, Event#'CosNotification_StructuredEvent'.remainder_of_body, 'MATCH', OE_THIS); {true, {[_],_}} -> %% Is OR and converted, change back and forward to Channel forward(any, ?get_MyChannel(State), State, Event#'CosNotification_StructuredEvent'.remainder_of_body, 'MATCHED', OE_THIS); {_, {[_],_}} when ?is_ANDOP(State) -> %% Is AND and not converted, forward to Admin forward(seq, ?get_MyAdmin(State), State, [Event], 'MATCH', OE_THIS); _ -> %% Is OR and not converted, forward to Channel forward(seq, ?get_MyChannel(State), State, [Event], 'MATCHED', OE_THIS) end;push_structured_event(_,_,_,_) -> corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%--------------- LOCAL FUNCTIONS ----------------------------find_obj({value, {_, Obj}}) -> Obj;find_obj(_) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}.find_ids(List) -> find_ids(List, []).find_ids([], Acc) -> Acc;find_ids([{I,_}|T], Acc) -> find_ids(T, [I|Acc]);find_ids(What, _) -> orber:dbg("[~p] PusherConsumer:find_ids();~n" "Id corrupt: ~p", [?LINE, What], ?DEBUG_LEVEL), corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}).%% Delete a single object.%% The list don not differ, i.e., no filter removed, raise exception.delete_obj(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{});delete_obj(List,_) -> List.%% Forward eventsforward(any, SendTo, State, Event, Status, OE_THIS) -> case catch oe_CosNotificationComm_Event:callAny(SendTo, Event, Status) of ok -> ?DBG("PROXY FORWARD ANY: ~p~n",[Event]), {noreply, State}; {'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ; record(E, 'NO_PERMISSION') ; record(E, 'CosEventComm_Disconnected') -> orber:dbg("[~p] PusherConsumer:forward();~n" "Admin/Channel no longer exists; terminating and dropping: ~p", [?LINE, Event], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, OE_THIS}, {client, ?get_Client(State)}, {reason, {'EXCEPTION', E}}]), {stop, normal, State}; R when ?is_PersistentConnection(State) -> orber:dbg("[~p] PusherConsumer:forward();~n" "Admin/Channel respond incorrect: ~p~n" "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL), {noreply, State}; R -> orber:dbg("[~p] PusherConsumer:forward();~n" "Admin/Channel respond incorrect: ~p~n" "Terminating and dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, OE_THIS}, {client, ?get_Client(State)}, {reason, R}]), {stop, normal, State} end;forward(seq, SendTo, State, Event, Status, OE_THIS) -> case catch oe_CosNotificationComm_Event:callSeq(SendTo, Event, Status) of ok -> {noreply, State}; {'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ; record(E, 'NO_PERMISSION') ; record(E, 'CosEventComm_Disconnected') -> ?DBG("ADMIN NO LONGER EXIST; DROPPING: ~p~n", [Event]), 'CosNotification_Common':notify([{proxy, OE_THIS}, {client, ?get_Client(State)}, {reason, {'EXCEPTION', E}}]), {stop, normal, State}; R when ?is_PersistentConnection(State) -> orber:dbg("[~p] PusherConsumer:forward();~n" "Admin/Channel respond incorrect: ~p~n" "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL), {noreply, State}; R -> orber:dbg("[~p] PusherConsumer:forward();~n" "Admin/Channel respond incorrect: ~p~n" "Terminating and dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, OE_THIS}, {client, ?get_Client(State)}, {reason, R}]), {stop, normal, State} end.%%--------------- MISC FUNCTIONS, E.G. DEBUGGING -------------%%--------------- END OF MODULE ------------------------------
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?