pullerconsumer_impl.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 773 行 · 第 1/3 页
ERL
773 行
?add_Publish(State, H), update_publish(add, State, T);update_publish(remove, State, [H|T]) -> ?del_Publish(State, H), update_publish(remove, State, T).%%----- Inherit from CosNotifyFilter::FilterAdmin -----------%%----------------------------------------------------------%%% function : add_filter%% Arguments: Filter - CosNotifyFilter::Filter%% Returns : FilterID - long%%-----------------------------------------------------------add_filter(_OE_THIS, _OE_FROM, State, Filter) -> 'CosNotification_Common':type_check(Filter, 'CosNotifyFilter_Filter'), FilterID = ?new_Id(State), NewState = ?set_IdCounter(State, FilterID), {reply, FilterID, ?add_Filter(NewState, FilterID, Filter)}.%%----------------------------------------------------------%%% function : remove_filter%% Arguments: FilterID - long%% Returns : ok%%-----------------------------------------------------------remove_filter(_OE_THIS, _OE_FROM, State, FilterID) when integer(FilterID) -> {reply, ok, ?del_Filter(State, FilterID)};remove_filter(_,_,_,What) -> orber:dbg("[~p] PullerConsumer:remove_filter(~p); Not an integer", [?LINE, What], ?DEBUG_LEVEL), corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : get_filter%% Arguments: FilterID - long%% Returns : Filter - CosNotifyFilter::Filter |%% {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}%%-----------------------------------------------------------get_filter(_OE_THIS, _OE_FROM, State, FilterID) when integer(FilterID) -> {reply, ?get_Filter(State, FilterID), State};get_filter(_,_,_,What) -> orber:dbg("[~p] PullerConsumer:get_filter(~p); Not an integer", [?LINE, What], ?DEBUG_LEVEL), corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : get_all_filters%% Arguments: -%% Returns : Filter - CosNotifyFilter::FilterIDSeq%%-----------------------------------------------------------get_all_filters(_OE_THIS, _OE_FROM, State) -> {reply, ?get_AllFilterID(State), State}.%%----------------------------------------------------------%%% function : remove_all_filters%% Arguments: -%% Returns : ok%%-----------------------------------------------------------remove_all_filters(_OE_THIS, _OE_FROM, State) -> {reply, ok, ?del_AllFilter(State)}.%%----- Inherit from CosEventComm::PullConsumer -------------%%----------------------------------------------------------%%% function : disconnect_pull_consumer%% Arguments: -%% Returns : ok%%-----------------------------------------------------------disconnect_pull_consumer(_OE_THIS, _OE_FROM, State) -> {stop, normal, ok, ?set_Unconnected(State)}.%%----- Inherit from CosNotifyComm::SequencePullConsumer ----%%----------------------------------------------------------%%% function : disconnect_sequence_pull_consumer%% Arguments: -%% Returns : ok %%-----------------------------------------------------------disconnect_sequence_pull_consumer(_OE_THIS, _OE_FROM, State) -> {stop, normal, ok, ?set_Unconnected(State)}.%%----- Inherit from CosNotifyComm::StructuredPullConsumer ----%%----------------------------------------------------------%%% function : disconnect_structured_pull_consumer%% Arguments: -%% Returns : ok%%-----------------------------------------------------------disconnect_structured_pull_consumer(_OE_THIS, _OE_FROM, State) -> {stop, normal, ok, ?set_Unconnected(State)}.%%--------------- LOCAL FUNCTIONS ----------------------------find_obj({value, {_, Obj}}) -> Obj;find_obj(_) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}.find_ids(List) -> find_ids(List, []).find_ids([], Acc) -> Acc;find_ids([{I,_}|T], Acc) -> find_ids(T, [I|Acc]);find_ids(What, _) -> orber:dbg("[~p] PullerConsumer:find_ids();~n" "Id corrupt: ~p", [?LINE, What], ?DEBUG_LEVEL), corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}).%% Delete a single object.%% The list don not differ, i.e., no filter removed, raise exception.delete_obj(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{});delete_obj(List,_) -> List.%% Start timer which send a message each time we should pull for new events.start_timer(State) -> case catch timer:send_interval(?get_PullInterval(State), pull) of {ok,PullTRef} -> ?DBG("PULL CONSUMER STARTED PULL TIMER ~p~n", [?get_PullInterval(State)]), ?set_PullTimer(State, PullTRef); What -> orber:dbg("[~p] PullerConsumer:start_timer();~n" "Unable to invoke timer:send_interval/2: ~p", [?LINE, What], ?DEBUG_LEVEL), corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) end.stop_timer(State) -> ?DBG("PULL CONSUMER STOPPED TIMER~n",[]), timer:cancel(?get_PullTimer(State)).%% Try pull event(s); which method is determined by which type this proxy is.try_pull_events(State) when ?is_ANY(State) -> case catch 'CosEventComm_PullSupplier':try_pull(?get_Client(State)) of {_,false} -> {noreply, State}; {Event, true} -> case ?not_isConvertedStructured(Event) of true -> forward(seq, State, cosNotification_eventDB:filter_events([any:get_value(Event)], ?get_AllFilter(State))); _ -> forward(any, State, cosNotification_eventDB:filter_events([Event], ?get_AllFilter(State))) end; _-> {noreply, State} end;try_pull_events(State) when ?is_SEQUENCE(State) -> case catch 'CosNotifyComm_SequencePullSupplier': try_pull_structured_events(?get_Client(State), ?get_BatchLimit(State)) of {_,false} -> {noreply, State}; {EventSeq, true} -> %% We cannot convert parts of the sequence to any, event though they %% are converted from any to structured. Would be 'impossible' to send. forward(seq, State, cosNotification_eventDB:filter_events(EventSeq, ?get_AllFilter(State))); _-> {noreply, State} end;try_pull_events(State) when ?is_STRUCTURED(State) -> case catch 'CosNotifyComm_StructuredPullSupplier': try_pull_structured_event(?get_Client(State)) of {_,false} -> {noreply, State}; {Event, true} when ?not_isConvertedAny(Event) -> forward(any, State, cosNotification_eventDB:filter_events([Event#'CosNotification_StructuredEvent'.remainder_of_body], ?get_AllFilter(State))); {Event, true} -> forward(seq, State, cosNotification_eventDB:filter_events([Event], ?get_AllFilter(State))); _-> {noreply, State} end. %% Forward eventsforward(_, State, {[], _}) when ?is_ANDOP(State) -> %% Did not pass filtering. Since AND no need to pass on. {noreply, State};forward(Type, State, {[], Failed}) -> %% Did not pass filtering, but since OR it may pass Admin filters, hence, pass %% on to Admin forward(Type, State, Failed, ?get_MyAdmin(State), 'MATCH');forward(Type, State, {Passed, _}) when ?is_ANDOP(State) -> %% Did pass filtering, but since AND we must pass it to Admin to check against %% its Filters. Just ignore the ones that failed. forward(Type, State, Passed, ?get_MyAdmin(State), 'MATCH');forward(Type, State, {Passed, []}) -> %% Did pass filtering, and since OR we can pass it to the Channel directly. forward(Type, State, Passed, ?get_MyChannel(State), 'MATCHED');forward(Type, State, {Passed, Failed}) -> %% Some passed filtering, and since OR we can pass the ones that passed directly %% to the channel and the other ones via the admin. forward(Type, State, Passed, ?get_MyChannel(State), 'MATCHED'), forward(Type, State, Failed, ?get_MyAdmin(State), 'MATCH').forward(any, State, [Event], SendTo, Status) -> case catch oe_CosNotificationComm_Event:callAny(SendTo, Event, Status) of ok -> ?DBG("PROXY FORWARD ANY: ~p~n",[Event]), {noreply, State}; {'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ; record(E, 'NO_PERMISSION') ; record(E, 'CosEventComm_Disconnected') -> orber:dbg("[~p] PullerConsumer:forward();~n" "Admin/Channel no longer exists; terminating and dropping: ~p", [?LINE, Event], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, State#state.this}, {client, ?get_Client(State)}, {reason, {'EXCEPTION', E}}]), {stop, normal, State}; R when ?is_PersistentConnection(State) -> orber:dbg("[~p] PullerConsumer:forward();~n" "Admin/Channel respond incorrect: ~p~n" "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL), {noreply, State}; R -> orber:dbg("[~p] PullerConsumer:forward();~n" "Admin/Channel respond incorrect: ~p~n" "Terminating and dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, State#state.this}, {client, ?get_Client(State)}, {reason, R}]), {stop, normal, State} end;forward(seq, State, Event, SendTo, Status) -> case catch oe_CosNotificationComm_Event:callSeq(SendTo, Event, Status) of ok -> ?DBG("PROXY FORWARD SEQUENCE: ~p~n",[Event]), {noreply, State}; {'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ; record(E, 'NO_PERMISSION') ; record(E, 'CosEventComm_Disconnected') -> orber:dbg("[~p] PullerConsumer:forward();~n" "Admin/Channel no longer exists; terminating and dropping: ~p", [?LINE, Event], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, State#state.this}, {client, ?get_Client(State)}, {reason, {'EXCEPTION', E}}]), {stop, normal, State}; R when ?is_PersistentConnection(State) -> orber:dbg("[~p] PullerConsumer:forward();~n" "Admin/Channel respond incorrect: ~p~n" "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL), {noreply, State}; R -> orber:dbg("[~p] PullerConsumer:forward();~n" "Admin/Channel respond incorrect: ~p~n" "Terminating and dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL), 'CosNotification_Common':notify([{proxy, State#state.this}, {client, ?get_Client(State)}, {reason, R}]), {stop, normal, State} end.%%--------------- MISC FUNCTIONS, E.G. DEBUGGING -------------%%--------------- END OF MODULE ------------------------------
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?