httpc_handler.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 954 行 · 第 1/3 页
ERL
954 行
% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %% $Id$-module(httpc_handler).-behaviour(gen_server).-include("httpc_internal.hrl").-include("http_internal.hrl").%%--------------------------------------------------------------------%% Application API-export([start_link/2, send/2, cancel/2, stream/3]).%% gen_server callbacks-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).-record(timers, {request_timers = [], % [ref()] pipeline_timer % ref() }).-record(state, {request, % #request{} session, % #tcp_session{} status_line, % {Version, StatusCode, ReasonPharse} headers, % #http_response_h{} body, % binary() mfa, % {Moduel, Function, Args} pipeline = queue:new(),% queue() status = new, % new | pipeline | close | ssl_tunnel canceled = [], % [RequestId] max_header_size = nolimit, % nolimit | integer() max_body_size = nolimit, % nolimit | integer() options, % #options{} timers = #timers{} % #timers{} }).%%====================================================================%% External functions%%====================================================================%%--------------------------------------------------------------------%% Function: start() -> {ok, Pid}%%%% Description: Starts a http-request handler process. Intended to be%% called by the httpc manager process. %%%% Note: Uses proc_lib and gen_server:enter_loop so that waiting%% for gen_tcp:connect to timeout in init/1 will not%% block the httpc manager process in odd cases such as trying to call%% a server that does not exist. (See OTP-6735) The only API function%% sending messages to the handler process that can be called before%% init has compleated is cancel and that is not a problem! (Send and%% stream will not be called before the first request has been sent and%% the reply or part of it has arrived.)%%--------------------------------------------------------------------start_link(Request, ProxyOptions) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Request, ProxyOptions]])}.%%--------------------------------------------------------------------%% Function: send(Request, Pid) -> ok %% Request = #request{}%% Pid = pid() - the pid of the http-request handler process.%%%% Description: Uses this handlers session to send a request. Intended%% to be called by the httpc manager process.%%--------------------------------------------------------------------send(Request, Pid) -> call(Request, Pid, 5000).%%--------------------------------------------------------------------%% Function: cancel(RequestId, Pid) -> ok%% RequestId = ref()%% Pid = pid() - the pid of the http-request handler process.%%%% Description: Cancels a request. Intended to be called by the httpc%% manager process.%%--------------------------------------------------------------------cancel(RequestId, Pid) -> cast({cancel, RequestId}, Pid).%%--------------------------------------------------------------------%% Function: stream(BodyPart, Request, Code) -> _%% BodyPart = binary()%% Request = #request{}%% Code = integer()%%%% Description: Stream the HTTP body to the caller process (client) %% or to a file. Note that the data that has been stream%% does not have to be saved. (We do not want to use up%% memory in vain.)%%--------------------------------------------------------------------%% Request should not be streamedstream(BodyPart, Request = #request{stream = none}, _) -> {BodyPart, Request};stream(BodyPart, Request = #request{stream = self}, 200) -> % Stream to caller httpc_response:send(Request#request.from, {Request#request.id, stream, BodyPart}), {<<>>, Request};stream(BodyPart, Request = #request{stream = Filename}, 200) when is_list(Filename) -> % Stream to file case file:open(Filename, [write, raw, append, delayed_write]) of {ok, Fd} -> stream(BodyPart, Request#request{stream = Fd}, 200); {error, Reason} -> exit({stream_to_file_failed, Reason}) end;stream(BodyPart, Request = #request{stream = Fd}, 200) -> % Stream to file case file:write(Fd, BodyPart) of ok -> {<<>>, Request}; {error, Reason} -> exit({stream_to_file_failed, Reason}) end;stream(BodyPart, Request,_) -> % only 200 responses can be streamed {BodyPart, Request}.%%====================================================================%% Server functions%%====================================================================%%--------------------------------------------------------------------%% Function: init([Request, Session]) -> {ok, State} | %% {ok, State, Timeout} | ignore |{stop, Reason}%%%% Description: Initiates the httpc_handler process %%%% Note: The init function may not fail, that will kill the%% httpc_manager process. We could make the httpc_manager more comlex%% but we do not want that so errors will be handled by the process%% sending an init_error message to itself.%%--------------------------------------------------------------------init([Request, Options]) -> process_flag(trap_exit, true), handle_verbose(Options#options.verbose), Address = handle_proxy(Request#request.address, Options#options.proxy), {ok, State} = case {Address /= Request#request.address, Request#request.scheme} of {true, https} -> Error = https_through_proxy_is_not_currently_supported, self() ! {init_error, Error, httpc_response:error(Request, Error)}, {ok, #state{request = Request, options = Options, status = ssl_tunnel}}; %% This is what we should do if and when ssl supports %% "socket upgrading" %%send_ssl_tunnel_request(Address, Request, %% #state{options = Options, %% status = ssl_tunnel}); {_, _} -> send_first_request(Address, Request, #state{options = Options}) end, gen_server:enter_loop(?MODULE, [], State).%%--------------------------------------------------------------------%% Function: handle_call(Request, From, State) -> {reply, Reply, State} |%% {reply, Reply, State, Timeout} |%% {noreply, State} |%% {noreply, State, Timeout} |%% {stop, Reason, Reply, State} | (terminate/2 is called)%% {stop, Reason, State} (terminate/2 is called)%% Description: Handling call messages%%--------------------------------------------------------------------handle_call(Request, _, State = #state{session = Session = #tcp_session{socket = Socket}, timers = Timers, options = Options}) -> Address = handle_proxy(Request#request.address, Options#options.proxy), case httpc_request:send(Address, Request, Socket) of ok -> %% Activate the request time out for the new request NewState = activate_request_timeout(State#state{request = Request}), ClientClose = httpc_request:is_client_closing( Request#request.headers), case State#state.request of #request{} -> %% Old request no yet finished %% Make sure to use the new value of timers in state NewTimers = NewState#state.timers, NewPipeline = queue:in(Request, State#state.pipeline), NewSession = Session#tcp_session{pipeline_length = %% Queue + current queue:len(NewPipeline) + 1, client_close = ClientClose}, httpc_manager:insert_session(NewSession), {reply, ok, State#state{pipeline = NewPipeline, session = NewSession, timers = NewTimers}}; undefined -> %% Note: tcp-message reciving has already been %% activated by handle_pipeline/2. Also %% the parsing-function #state.mfa is initiated %% by handle_pipeline/2. cancel_timer(Timers#timers.pipeline_timer, timeout_pipeline), NewSession = Session#tcp_session{pipeline_length = 1, client_close = ClientClose}, httpc_manager:insert_session(NewSession), {reply, ok, NewState#state{request = Request, session = NewSession, timers = Timers#timers{pipeline_timer = undefined}}} end; {error, Reason} -> {reply, {pipline_failed, Reason}, State} end.%%--------------------------------------------------------------------%% Function: handle_cast(Msg, State) -> {noreply, State} |%% {noreply, State, Timeout} |%% {stop, Reason, State} (terminate/2 is called)%% Description: Handling cast messages%%--------------------------------------------------------------------%% When the request in process has been canceld the handler process is%% stopped and the pipelined requests will be reissued. This is is%% based on the assumption that it is proably cheaper to reissue the%% requests than to wait for a potentiall large response that we then%% only throw away. This of course is not always true maybe we could%% do something smarter here?! If the request canceled is not%% the one handled right now the same effect will take place in%% handle_pipeline/2 when the canceled request is on turn.handle_cast({cancel, RequestId}, State = #state{request = Request = #request{id = RequestId}}) -> httpc_manager:request_canceled(RequestId), {stop, normal, State#state{canceled = [RequestId | State#state.canceled], request = Request#request{from = answer_sent}}};handle_cast({cancel, RequestId}, State) -> httpc_manager:request_canceled(RequestId), {noreply, State#state{canceled = [RequestId | State#state.canceled]}}.%%--------------------------------------------------------------------%% Function: handle_info(Info, State) -> {noreply, State} |%% {noreply, State, Timeout} |%% {stop, Reason, State} (terminate/2 is called)%% Description: Handling all non call/cast messages%%--------------------------------------------------------------------handle_info({Proto, _Socket, Data}, State = #state{mfa = {Module, Function, Args}, request = #request{method = Method, stream = Stream} = Request, session = Session, status_line = StatusLine}) when Proto == tcp; Proto == ssl; Proto == httpc_handler -> case Module:Function([Data | Args]) of {ok, Result} -> handle_http_msg(Result, State); {_, whole_body, _} when Method == head -> handle_response(State#state{body = <<>>}); {Module, whole_body, [Body, Length]} -> {_, Code, _} = StatusLine, {NewBody, NewRequest} = stream(Body, Request, Code), %% When we stream we will not keep the already %% streamed data, that would be a waste of memory. NewLength = case Stream of none -> Length; _ -> Length - size(Body) end, http_transport:setopts(socket_type(Session#tcp_session.scheme), Session#tcp_session.socket, [{active, once}]), {noreply, State#state{mfa = {Module, whole_body, [NewBody, NewLength]}, request = NewRequest}}; NewMFA -> http_transport:setopts(socket_type(Session#tcp_session.scheme), Session#tcp_session.socket, [{active, once}]), {noreply, State#state{mfa = NewMFA}} end;%% The Server may close the connection to indicate that the%% whole body is now sent instead of sending an lengh%% indicator.handle_info({tcp_closed, _}, State = #state{mfa = {_, whole_body, Args}}) -> handle_response(State#state{body = hd(Args)}); handle_info({ssl_closed, _}, State = #state{mfa = {_, whole_body, Args}}) -> handle_response(State#state{body = hd(Args)}); %%% Server closes idle pipelinehandle_info({tcp_closed, _}, State = #state{request = undefined}) -> {stop, normal, State};handle_info({ssl_closed, _}, State = #state{request = undefined}) -> {stop, normal, State};%%% Error caseshandle_info({tcp_closed, _}, State) -> {stop, session_remotly_closed, State};handle_info({ssl_closed, _}, State) -> {stop, session_remotly_closed, State};handle_info({tcp_error, _, _} = Reason, State) -> {stop, Reason, State};handle_info({ssl_error, _, _} = Reason, State) -> {stop, Reason, State};
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?