httpc_handler.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 954 行 · 第 1/3 页

ERL
954
字号
% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %%     $Id$-module(httpc_handler).-behaviour(gen_server).-include("httpc_internal.hrl").-include("http_internal.hrl").%%--------------------------------------------------------------------%% Application API-export([start_link/2, send/2, cancel/2, stream/3]).%% gen_server callbacks-export([init/1, handle_call/3, handle_cast/2, handle_info/2,	 terminate/2, code_change/3]).-record(timers, {request_timers = [], % [ref()]		 pipeline_timer % ref()	      }).-record(state, {request,        % #request{}                session,        % #tcp_session{}                 status_line,    % {Version, StatusCode, ReasonPharse}                headers,        % #http_response_h{}                body,           % binary()                mfa,            % {Moduel, Function, Args}                pipeline = queue:new(),% queue() 		status = new,          % new | pipeline | close | ssl_tunnel		canceled = [],	       % [RequestId]                max_header_size = nolimit,   % nolimit | integer()                 max_body_size = nolimit,     % nolimit | integer()		options,                     % #options{}		timers = #timers{}           % #timers{}               }).%%====================================================================%% External functions%%====================================================================%%--------------------------------------------------------------------%% Function: start() -> {ok, Pid}%%%% Description: Starts a http-request handler process. Intended to be%% called by the httpc manager process. %%%% Note: Uses proc_lib and gen_server:enter_loop so that waiting%% for gen_tcp:connect to timeout in init/1 will not%% block the httpc manager process in odd cases such as trying to call%% a server that does not exist. (See OTP-6735) The only API function%% sending messages to the handler process that can be called before%% init has compleated is cancel and that is not a problem! (Send and%% stream will not be called before the first request has been sent and%% the reply or part of it has arrived.)%%--------------------------------------------------------------------start_link(Request, ProxyOptions) ->    {ok, proc_lib:spawn_link(?MODULE, init, [[Request, ProxyOptions]])}.%%--------------------------------------------------------------------%% Function: send(Request, Pid) -> ok %%	Request = #request{}%%      Pid = pid() - the pid of the http-request handler process.%%%% Description: Uses this handlers session to send a request. Intended%% to be called by the httpc manager process.%%--------------------------------------------------------------------send(Request, Pid) ->    call(Request, Pid, 5000).%%--------------------------------------------------------------------%% Function: cancel(RequestId, Pid) -> ok%%	RequestId = ref()%%      Pid = pid() -  the pid of the http-request handler process.%%%% Description: Cancels a request. Intended to be called by the httpc%% manager process.%%--------------------------------------------------------------------cancel(RequestId, Pid) ->    cast({cancel, RequestId}, Pid).%%--------------------------------------------------------------------%% Function: stream(BodyPart, Request, Code) -> _%%	BodyPart = binary()%%      Request = #request{}%%      Code = integer()%%%% Description: Stream the HTTP body to the caller process (client) %%              or to a file. Note that the data that has been stream%%              does not have to be saved. (We do not want to use up%%              memory in vain.)%%--------------------------------------------------------------------%% Request should not be streamedstream(BodyPart, Request = #request{stream = none}, _) ->    {BodyPart, Request};stream(BodyPart, Request = #request{stream = self}, 200) -> % Stream to caller    httpc_response:send(Request#request.from, 			{Request#request.id, stream, BodyPart}),    {<<>>, Request};stream(BodyPart, Request = #request{stream = Filename}, 200)  when is_list(Filename) -> % Stream to file    case file:open(Filename, [write, raw, append, delayed_write]) of	{ok, Fd} ->	    stream(BodyPart, Request#request{stream = Fd}, 200);	{error, Reason} ->	    exit({stream_to_file_failed, Reason})    end;stream(BodyPart, Request = #request{stream = Fd}, 200) -> % Stream to file    case file:write(Fd, BodyPart) of	ok ->	    {<<>>, Request};	{error, Reason} ->	    exit({stream_to_file_failed, Reason})    end;stream(BodyPart, Request,_) -> % only 200 responses can be streamed    {BodyPart, Request}.%%====================================================================%% Server functions%%====================================================================%%--------------------------------------------------------------------%% Function: init([Request, Session]) -> {ok, State} | %%                       {ok, State, Timeout} | ignore |{stop, Reason}%%%% Description: Initiates the httpc_handler process %%%% Note: The init function may not fail, that will kill the%% httpc_manager process. We could make the httpc_manager more comlex%% but we do not want that so errors will be handled by the process%% sending an init_error message to itself.%%--------------------------------------------------------------------init([Request, Options]) ->    process_flag(trap_exit, true),    handle_verbose(Options#options.verbose),    Address = handle_proxy(Request#request.address, Options#options.proxy),    {ok, State} =	case {Address /= Request#request.address, Request#request.scheme} of	    {true, https} ->		Error = https_through_proxy_is_not_currently_supported,		self() ! {init_error, 			  Error, httpc_response:error(Request, Error)},		{ok, #state{request = Request, options = Options,			    status = ssl_tunnel}};	    %% This is what we should do if and when ssl supports 	    %% "socket upgrading"	    %%send_ssl_tunnel_request(Address, Request,	    %%		    #state{options = Options,	    %%		   status = ssl_tunnel});	    {_, _} ->		send_first_request(Address, Request, #state{options = Options})	end,    gen_server:enter_loop(?MODULE, [], State).%%--------------------------------------------------------------------%% Function: handle_call(Request, From, State) -> {reply, Reply, State} |%%          {reply, Reply, State, Timeout} |%%          {noreply, State}               |%%          {noreply, State, Timeout}      |%%          {stop, Reason, Reply, State}   | (terminate/2 is called)%%          {stop, Reason, State}            (terminate/2 is called)%% Description: Handling call messages%%--------------------------------------------------------------------handle_call(Request, _, State = #state{session = Session =				       #tcp_session{socket = Socket},				       timers = Timers,				       options = Options}) ->    Address = handle_proxy(Request#request.address, Options#options.proxy),    case httpc_request:send(Address, Request, Socket) of        ok ->	    %% Activate the request time out for the new request	    NewState = activate_request_timeout(State#state{request =							     Request}),	    ClientClose = httpc_request:is_client_closing(			    Request#request.headers),            case State#state.request of                #request{} -> %% Old request no yet finished		    %% Make sure to use the new value of timers in state		    NewTimers = NewState#state.timers,                    NewPipeline = queue:in(Request, State#state.pipeline),		    NewSession = 			Session#tcp_session{pipeline_length = 					    %% Queue + current					    queue:len(NewPipeline) + 1,					    client_close = ClientClose},		    httpc_manager:insert_session(NewSession),                    {reply, ok, State#state{pipeline = NewPipeline,					    session = NewSession,					    timers = NewTimers}};		undefined ->		    %% Note: tcp-message reciving has already been		    %% activated by handle_pipeline/2. Also		    %% the parsing-function #state.mfa is initiated		    %% by handle_pipeline/2.		    cancel_timer(Timers#timers.pipeline_timer, 				 timeout_pipeline),		    NewSession = 			Session#tcp_session{pipeline_length = 1,					    client_close = ClientClose},		    httpc_manager:insert_session(NewSession),		    {reply, ok, 		     NewState#state{request = Request,				    session = NewSession,				    timers = 				    Timers#timers{pipeline_timer =						  undefined}}}	    end;	{error, Reason} ->	    {reply, {pipline_failed, Reason}, State}    end.%%--------------------------------------------------------------------%% Function: handle_cast(Msg, State) -> {noreply, State} |%%          {noreply, State, Timeout} |%%          {stop, Reason, State}            (terminate/2 is called)%% Description: Handling cast messages%%--------------------------------------------------------------------%% When the request in process has been canceld the handler process is%% stopped and the pipelined requests will be reissued. This is is%% based on the assumption that it is proably cheaper to reissue the%% requests than to wait for a potentiall large response that we then%% only throw away. This of course is not always true maybe we could%% do something smarter here?! If the request canceled is not%% the one handled right now the same effect will take place in%% handle_pipeline/2 when the canceled request is on turn.handle_cast({cancel, RequestId}, State = #state{request = Request =						#request{id = RequestId}}) ->    httpc_manager:request_canceled(RequestId),    {stop, normal,      State#state{canceled = [RequestId | State#state.canceled],		 request = Request#request{from = answer_sent}}};handle_cast({cancel, RequestId}, State) ->    httpc_manager:request_canceled(RequestId),    {noreply, State#state{canceled = [RequestId | State#state.canceled]}}.%%--------------------------------------------------------------------%% Function: handle_info(Info, State) -> {noreply, State} |%%          {noreply, State, Timeout} |%%          {stop, Reason, State}            (terminate/2 is called)%% Description: Handling all non call/cast messages%%--------------------------------------------------------------------handle_info({Proto, _Socket, Data}, State = 	    #state{mfa = {Module, Function, Args}, 		   request = #request{method = Method, 				      stream = Stream} = Request, 		   session = Session, status_line = StatusLine})   when Proto == tcp; Proto == ssl; Proto == httpc_handler ->    case Module:Function([Data | Args]) of        {ok, Result} ->            handle_http_msg(Result, State);         {_, whole_body, _} when Method == head ->	    handle_response(State#state{body = <<>>}); 	{Module, whole_body, [Body, Length]} ->	    {_, Code, _} = StatusLine,	    {NewBody, NewRequest} = stream(Body, Request, Code),	    %% When we stream we will not keep the already	    %% streamed data, that would be a waste of memory.	    NewLength = case Stream of			    none ->   				Length;			    _ ->				Length - size(Body)			end,	    http_transport:setopts(socket_type(Session#tcp_session.scheme),                                    Session#tcp_session.socket, 				   [{active, once}]),                {noreply, State#state{mfa = {Module, whole_body, 					 [NewBody, NewLength]},				  request = NewRequest}};	NewMFA ->	    http_transport:setopts(socket_type(Session#tcp_session.scheme),                                    Session#tcp_session.socket, 				   [{active, once}]),            {noreply, State#state{mfa = NewMFA}}    end;%% The Server may close the connection to indicate that the%% whole body is now sent instead of sending an lengh%% indicator.handle_info({tcp_closed, _}, State = #state{mfa = {_, whole_body, Args}}) ->    handle_response(State#state{body = hd(Args)}); handle_info({ssl_closed, _}, State = #state{mfa = {_, whole_body, Args}}) ->    handle_response(State#state{body = hd(Args)}); %%% Server closes idle pipelinehandle_info({tcp_closed, _}, State = #state{request = undefined}) ->    {stop, normal, State};handle_info({ssl_closed, _}, State = #state{request = undefined}) ->    {stop, normal, State};%%% Error caseshandle_info({tcp_closed, _}, State) ->    {stop, session_remotly_closed, State};handle_info({ssl_closed, _}, State) ->    {stop, session_remotly_closed, State};handle_info({tcp_error, _, _} = Reason, State) ->    {stop, Reason, State};handle_info({ssl_error, _, _} = Reason, State) ->    {stop, Reason, State};

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?