httpc_handler.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 954 行 · 第 1/3 页

ERL
954
字号
    Cookies = http_cookie:cookies(Headers#http_response_h.other, 				  Request#request.path, Host),    httpc_manager:store_cookies(Cookies, Request#request.address).%% This request could not be pipelinedhandle_pipeline(State = #state{status = close}, _) ->    {stop, normal, State};handle_pipeline(State = #state{status = pipeline, session = Session}, 		Data) ->    case queue:out(State#state.pipeline) of	{empty, _} ->	    %% The server may choose too teminate an idle pipeline	    %% in this case we want to receive the close message	    %% at once and not when trying to pipline the next	    %% request.	    http_transport:setopts(socket_type(Session#tcp_session.scheme), 				   Session#tcp_session.socket, 				   [{active, once}]),	    %% If a pipeline that has been idle for some time is not	    %% closed by the server, the client may want to close it.	    NewState = activate_pipeline_timeout(State),	    NewSession = Session#tcp_session{pipeline_length = 0},	    httpc_manager:insert_session(NewSession),	    {noreply, 	     NewState#state{request = undefined, 			    mfa = {httpc_response, parse,				   [NewState#state.max_header_size]},			    status_line = undefined,			    headers = undefined,			    body = undefined			   }	    };	{{value, NextRequest}, Pipeline} ->    	    case lists:member(NextRequest#request.id, 			      State#state.canceled) of				true ->		    %% See comment for handle_cast({cancel, RequestId})		    {stop, normal, 		     State#state{request = 				 NextRequest#request{from = answer_sent}}};		false ->		    NewSession = 			Session#tcp_session{pipeline_length =					    %% Queue + current					    queue:len(Pipeline) + 1},		    httpc_manager:insert_session(NewSession),		    NewState = 			State#state{pipeline = Pipeline,				    request = NextRequest,				    mfa = {httpc_response, parse,					   [State#state.max_header_size]},				    status_line = undefined,				    headers = undefined,				    body = undefined},		    case Data of			<<>> ->			    http_transport:setopts(			      socket_type(Session#tcp_session.scheme), 			      Session#tcp_session.socket, 			      [{active, once}]),			    {noreply, NewState};			_ ->			    %% If we already received some bytes of			    %% the next response			    handle_info({httpc_handler, dummy, Data}, 					NewState) 		    end	    end    end.call(Msg, Pid, Timeout) ->    gen_server:call(Pid, Msg, Timeout).cast(Msg, Pid) ->    gen_server:cast(Pid, Msg).activate_request_timeout(State = #state{request = Request}) ->    Time = (Request#request.settings)#http_options.timeout,    case Time of	infinity ->	    State;	_ ->	    Ref = erlang:send_after(Time, self(), 				    {timeout, Request#request.id}),	    State#state	      {timers = 	       #timers{request_timers = 		       [{Request#request.id, Ref}|			(State#state.timers)#timers.request_timers]}}    end.activate_pipeline_timeout(State = #state{options = 					 #options{pipeline_timeout = 						  infinity}}) ->    State;activate_pipeline_timeout(State = #state{options = 					 #options{pipeline_timeout = Time}}) ->    Ref = erlang:send_after(Time, self(), timeout_pipeline),    State#state{timers = #timers{pipeline_timer = Ref}}.is_pipeline_capable_server("HTTP/1." ++ N, _) when hd(N) >= $1 ->    true;is_pipeline_capable_server("HTTP/1.0", 			   #http_response_h{connection = "keep-alive"}) ->    true;is_pipeline_capable_server(_,_) ->    false.is_keep_alive_connection(Headers, Session) ->    (not ((Session#tcp_session.client_close) or  	  httpc_response:is_server_closing(Headers))).try_to_enable_pipline(State = #state{session = Session, 				     request = #request{method = Method},				     status_line = {Version, _, _},				     headers = Headers}) ->    case (is_pipeline_capable_server(Version, Headers)) and  	(is_keep_alive_connection(Headers, Session)) and 	(httpc_request:is_idempotent(Method)) of	true ->	    httpc_manager:insert_session(Session),	    State#state{status = pipeline};	false ->	    State#state{status = close}    end.answer_request(Request, Msg, State = #state{timers = Timers}) ->        httpc_response:send(Request#request.from, Msg),    RequestTimers = Timers#timers.request_timers,    TimerRef =	http_util:key1search(RequestTimers, Request#request.id, undefined),    Timer = {Request#request.id, TimerRef},    cancel_timer(TimerRef, {timeout, Request#request.id}),    State#state{request = Request#request{from = answer_sent},		timers = 		Timers#timers{request_timers =			      lists:delete(Timer, RequestTimers)}}.cancel_timer(undefined, _) ->    ok;cancel_timer(Timer, TimeoutMsg) ->    erlang:cancel_timer(Timer),    receive 	TimeoutMsg ->	    ok    after 0 ->	    ok    end.retry_pipline([], _) ->    ok;retry_pipline([Request |PipeLine],  State = #state{timers = Timers}) ->    NewState =	case (catch httpc_manager:retry_request(Request)) of	    ok ->		RequestTimers = Timers#timers.request_timers,		Timer = {_, TimerRef} =		    http_util:key1search(RequestTimers, Request#request.id, 					  {undefined, undefined}),		cancel_timer(TimerRef, {timeout, Request#request.id}),		State#state{timers = Timers#timers{request_timers =					  lists:delete(Timer,						       RequestTimers)}};	    Error ->		answer_request(Request#request.from,			       httpc_response:error(Request, Error), State) 	end,    retry_pipline(PipeLine, NewState).%%% Check to see if the given {Host,Port} tuple is in the NoProxyList%%% Returns an eventually updated {Host,Port} tuple, with the proxy addresshandle_proxy(HostPort = {Host, _Port}, {Proxy, NoProxy}) ->    case Proxy of	undefined ->	    HostPort;	Proxy ->	    case is_no_proxy_dest(Host, NoProxy) of		true ->		    HostPort;		false ->		    Proxy	    end    end.is_no_proxy_dest(_, []) ->    false;is_no_proxy_dest(Host, [ "*." ++ NoProxyDomain | NoProxyDests]) ->            case is_no_proxy_dest_domain(Host, NoProxyDomain) of	true ->	    true;	false ->	    is_no_proxy_dest(Host, NoProxyDests)    end;is_no_proxy_dest(Host, [NoProxyDest | NoProxyDests]) ->    IsNoProxyDest = case http_util:is_hostname(NoProxyDest) of			true ->			    fun is_no_proxy_host_name/2;			false ->			    fun is_no_proxy_dest_address/2		    end,        case IsNoProxyDest(Host, NoProxyDest) of	true ->	    true;	false ->	    is_no_proxy_dest(Host, NoProxyDests)    end.is_no_proxy_host_name(Host, Host) ->    true;is_no_proxy_host_name(_,_) ->    false.is_no_proxy_dest_domain(Dest, DomainPart) ->    lists:suffix(DomainPart, Dest).is_no_proxy_dest_address(Dest, Dest) ->    true;is_no_proxy_dest_address(Dest, AddressPart) ->    lists:prefix(AddressPart, Dest).socket_type(#request{scheme = http}) ->    ip_comm;socket_type(#request{scheme = https, settings = Settings}) ->    {ssl, Settings#http_options.ssl};socket_type(http) ->    ip_comm;socket_type(https) ->    {ssl, []}. %% Dummy value ok for ex setops that does not use this valuestart_stream(_, #request{stream = none}) ->    ok;start_stream({{_, 200, _}, Headers}, Request = #request{stream = self}) ->    Msg = httpc_response:stream_start(Headers, Request),    httpc_response:send(Request#request.from, Msg);start_stream(_, _) ->    ok.%% Note the end stream message is handled by httpc_response and will%% be sent by answer_requestend_stream(_, #request{stream = none}) ->    ok;end_stream(_, #request{stream = self}) ->    ok;end_stream({_,200,_}, #request{stream = Fd}) ->    case file:close(Fd) of 	ok ->	    ok;	{error, enospc} -> % Could be due to delayed_write	    file:close(Fd)    end;end_stream(_, _) ->    ok.handle_verbose(verbose) ->    dbg:p(self(), [r]);handle_verbose(debug) ->    dbg:p(self(), [call]),    dbg:tp(?MODULE, [{'_', [], [{return_trace}]}]);handle_verbose(trace) ->    dbg:p(self(), [call]),    dbg:tpl(?MODULE, [{'_', [], [{return_trace}]}]);handle_verbose(_) ->    ok.    %%% Normaly I do not comment out code, I throw it away. But this might%%% actually be used on day if ssl is improved.%% send_ssl_tunnel_request(Address, Request = #request{address = {Host, Port}}, %% 			State) ->%%     %% A ssl tunnel request is a special http request that looks like%%     %% CONNECT host:port HTTP/1.1%%     SslTunnelRequest = #request{method = connect, scheme = http,%% 				headers = %% 				#http_request_h{%% 				  host = Host, %% 				  address = Address, %% 				  path = Host ++ ":",%% 				  pquery = integer_to_list(Port),%% 				  other = [{ "Proxy-Connection", "keep-alive"}]},%%     Ipv6 = (State#state.options)#options.ipv6,%%     SocketType = socket_type(SslTunnelRequest),%%     case http_transport:connect(SocketType, %%                                 SslTunnelRequest#request.address, Ipv6) of%% 	{ok, Socket} ->%% 	    case httpc_request:send(Address, SslTunnelRequest, Socket) of%% 		ok ->%% 		    Session = #tcp_session{id = %% 					   {SslTunnelRequest#request.address,%% 					    self()},%% 					   scheme = %% 					   SslTunnelRequest#request.scheme,%% 					   socket = Socket},%% 		    NewState = State#state{mfa = %% 					   {httpc_response, parse,%% 					    [State#state.max_header_size]},%% 					   request = Request,%% 					   session = Session},%% 		    http_transport:setopts(socket_type(%%                                           SslTunnelRequest#request.scheme), %% 					   Socket, %% 					   [{active, once}]),%% 		    {ok, NewState};%% 		{error, Reason} -> %% 		    self() ! {init_error, error_sending, %% 			      httpc_response:error(Request, Reason)},%% 		    {ok, State#state{request = Request,%% 				     session = #tcp_session{socket = %% 							    Socket}}}%% 	    end;%% 	{error, Reason} ->%% 	    self() ! {init_error, error_connecting, %% 		      httpc_response:error(Request, Reason)},%% 	    {ok, State#state{request = Request}}%%     end.

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?