httpc_handler.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 954 行 · 第 1/3 页

ERL
954
字号
%%% Timeouts%% Internaly, to a request handling process, a request time out is%% seen as a canceld request.handle_info({timeout, RequestId}, State = 	    #state{request = Request = #request{id = RequestId}}) ->    httpc_response:send(Request#request.from, 		       httpc_response:error(Request,timeout)),    {stop, normal,      State#state{canceled = [RequestId | State#state.canceled],		 request = Request#request{from = answer_sent}}};handle_info({timeout, RequestId}, State = #state{request = Request}) ->    httpc_response:send(Request#request.from, 		       httpc_response:error(Request,timeout)),    {noreply, State#state{canceled = [RequestId | State#state.canceled]}};handle_info(timeout_pipeline, State = #state{request = undefined}) ->    {stop, normal, State};%% Setting up the connection to the server somehow failed. handle_info({init_error, _, ClientErrMsg},	    State = #state{request = Request}) ->    NewState = answer_request(Request, ClientErrMsg, State),    {stop, normal, NewState};%%% httpc_manager process dies. handle_info({'EXIT', _, _}, State = #state{request = undefined}) ->    {stop, normal, State};%%Try to finish the current request anyway,%% there is a fairly high probability that it can be done successfully.%% Then close the connection, hopefully a new manager is started that%% can retry requests in the pipeline.handle_info({'EXIT', _, _}, State) ->    {noreply, State#state{status = close}}.    %%--------------------------------------------------------------------%% Function: terminate(Reason, State) -> _  (ignored by gen_server)%% Description: Shutdown the httpc_handler%%--------------------------------------------------------------------terminate(normal, #state{session = undefined}) ->    ok;  %% Init error there is no socket to be closed.terminate(normal, #state{request = Request, 		    session = #tcp_session{id = undefined,					   socket = Socket}}) ->      %% Init error sending, no session information has been setup but    %% there is a socket that needs closing.    http_transport:close(socket_type(Request), Socket);terminate(_, State = #state{session = Session, request = undefined,			   timers = Timers}) ->     catch httpc_manager:delete_session(Session#tcp_session.id),        case queue:is_empty(State#state.pipeline) of 	false ->	    retry_pipline(queue:to_list(State#state.pipeline), State);	true ->	    ok    end,    cancel_timer(Timers#timers.pipeline_timer, timeout_pipeline),    http_transport:close(socket_type(Session#tcp_session.scheme),			 Session#tcp_session.socket);terminate(Reason, State = #state{request = Request})->     NewState = case Request#request.from of		   answer_sent ->		       State;		   _ ->		       answer_request(Request, 				      httpc_response:error(Request, Reason), 				      State)	       end,    terminate(Reason, NewState#state{request = undefined}).%%--------------------------------------------------------------------%% Func: code_change(_OldVsn, State, Extra) -> {ok, NewState}%% Purpose: Convert process state when code is changed%%--------------------------------------------------------------------code_change(_, State, _Extra) ->    {ok, State}.%%--------------------------------------------------------------------%%% Internal functions%%--------------------------------------------------------------------send_first_request(Address, Request, State) ->    Ipv6 = (State#state.options)#options.ipv6,    SocketType = socket_type(Request),    TimeOut = (Request#request.settings)#http_options.timeout,    case http_transport:connect(SocketType, Address, Ipv6, TimeOut) of	{ok, Socket} ->	    case httpc_request:send(Address, Request, Socket) of		ok ->		    ClientClose = 			httpc_request:is_client_closing(			  Request#request.headers),		    Session =			#tcp_session{id = {Request#request.address, self()},				     scheme = Request#request.scheme,				     socket = Socket,				     client_close = ClientClose},		    TmpState = State#state{request = Request, 					   session = Session, 					   mfa = 					   {httpc_response, parse,					    [State#state.max_header_size]},					   status_line = undefined,					   headers = undefined,					   body = undefined,					   status = new},		    http_transport:setopts(SocketType, 					   Socket, [{active, once}]),		    NewState = activate_request_timeout(TmpState),		    {ok, NewState};		{error, Reason} -> 		    %% Commented out in wait of ssl support to avoid		    %% dialyzer warning		    %%case State#state.status of		    %%	new -> % Called from init/1		    self() ! {init_error, error_sending, 			      httpc_response:error(Request, Reason)},		    {ok, State#state{request = Request,				     session = 				     #tcp_session{socket = Socket}}}		    %%ssl_tunnel -> % Not called from init/1		    %%  NewState = 		    %%	answer_request(Request, 		    %%httpc_response:error(Request, 		    %%Reason),		    %%			       State),		    %%	    {stop, normal, NewState}		    %%    end	    end;	{error, Reason} -> 	    %% Commented out in wait of ssl support to avoid	    %% dialyzer warning	    %% case State#state.status of	    %%	new -> % Called from init/1	    self() ! {init_error, error_connecting, 		      httpc_response:error(Request, Reason)},	    {ok, State#state{request = Request}}	    %%	ssl_tunnel -> % Not called from init/1	    %%    NewState = 	    %%	answer_request(Request, 	    %%		       httpc_response:error(Request, 	    %%					    Reason),	    %%		       State),	    %%    {stop, normal, NewState}	    %%end    end.handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body}, 		State = #state{request = Request}) ->        case Headers#http_response_h.'content-type' of        "multipart/byteranges" ++ _Param ->            exit(not_yet_implemented);        _ ->	    start_stream({{Version, StatusCode, ReasonPharse}, Headers}, 			 Request),            handle_http_body(Body, 			     State#state{status_line = {Version, 							StatusCode,							ReasonPharse},					 headers = Headers})    end;handle_http_msg({ChunkedHeaders, Body}, 		State = #state{headers = Headers}) ->    NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders),    handle_response(State#state{headers = NewHeaders, body = Body});handle_http_msg(Body, State = #state{status_line = {_,Code, _}}) ->    {NewBody, NewRequest}= stream(Body, State#state.request, Code),    handle_response(State#state{body = NewBody, request = NewRequest}).handle_http_body(<<>>, State = #state{request = #request{method = head}}) ->    handle_response(State#state{body = <<>>});handle_http_body(Body, State = #state{headers = Headers, session = Session,				      max_body_size = MaxBodySize,				      status_line = {_,Code, _},				      request = Request}) ->    case Headers#http_response_h.'transfer-encoding' of        "chunked" ->	    case http_chunk:decode(Body, State#state.max_body_size, 				   State#state.max_header_size, 				   {Code, Request}) of		{Module, Function, Args} ->		    http_transport:setopts(socket_type(					     Session#tcp_session.scheme), 					   Session#tcp_session.socket, 					   [{active, once}]),		    {noreply, State#state{mfa = 					  {Module, Function, Args}}};		{ok, {ChunkedHeaders, NewBody}} ->		    NewHeaders = http_chunk:handle_headers(Headers, 							   ChunkedHeaders),		    handle_response(State#state{headers = NewHeaders, 						body = NewBody})	    end;        Encoding when list(Encoding) ->	    NewState = answer_request(Request, 				      httpc_response:error(Request, 							  unknown_encoding),				     State),	    {stop, normal, NewState};        _ ->            Length =                list_to_integer(Headers#http_response_h.'content-length'),            case ((Length =< MaxBodySize) or (MaxBodySize == nolimit)) of                true ->                    case httpc_response:whole_body(Body, Length) of                        {ok, Body} ->			    {NewBody, NewRequest}= stream(Body, Request, Code),			    handle_response(State#state{body = NewBody,							request = NewRequest});                        MFA ->                            http_transport:setopts(			      socket_type(Session#tcp_session.scheme), 			      Session#tcp_session.socket, 			      [{active, once}]),			    {noreply, State#state{mfa = MFA}}		    end;                false ->		    NewState = 			answer_request(Request,				       httpc_response:error(Request, 							   body_too_big),				       State),                    {stop, normal, NewState}            end    end.%%% Normaly I do not comment out code, I throw it away. But this might%%% actually be used on day if ssl is improved.%% handle_response(State = #state{status = ssl_tunnel,%% 			       request = Request,%% 			       options = Options,%% 			       session = #tcp_session{socket = Socket,%% 						      scheme = Scheme},%% 			       status_line = {_, 200, _}}) ->%%     %%% Insert code for upgrading the socket if and when ssl supports this.  %%     Address = handle_proxy(Request#request.address, Options#options.proxy),   %%     send_first_request(Address, Request, State);%% handle_response(State = #state{status = ssl_tunnel,%% 			      request = Request}) ->%%     NewState = answer_request(Request,%% 			      httpc_response:error(Request,%% 						   ssl_proxy_tunnel_failed),%% 			      State),%%                     {stop, normal, NewState};handle_response(State = #state{status = new}) ->   handle_response(try_to_enable_pipline(State));handle_response(State = #state{request = Request,			       status = Status,			       session = Session, 			       status_line = StatusLine,			       headers = Headers, 			       body = Body,			       options = Options}) when Status =/= new ->    handle_cookies(Headers, Request, Options),    case httpc_response:result({StatusLine, Headers, Body}, Request) of	%% 100-continue	continue -> 	    %% Send request body	    {_, RequestBody} = Request#request.content,	    http_transport:send(socket_type(Session#tcp_session.scheme), 					    Session#tcp_session.socket, 				RequestBody),	    %% Wait for next response	    http_transport:setopts(socket_type(Session#tcp_session.scheme), 				   Session#tcp_session.socket, 				   [{active, once}]),	    {noreply, 	     State#state{mfa = {httpc_response, parse,				[State#state.max_header_size]},			 status_line = undefined,			 headers = undefined,			 body = undefined			}};	%% Ignore unexpected 100-continue response and receive the	%% actual response that the server will send right away. 	{ignore, Data} -> 	    NewState = State#state{mfa = 				   {httpc_response, parse,				    [State#state.max_header_size]},				   status_line = undefined,				   headers = undefined,				   body = undefined},	    handle_info({httpc_handler, dummy, Data}, NewState);	%% On a redirect or retry the current request becomes 	%% obsolete and the manager will create a new request 	%% with the same id as the current.	{redirect, NewRequest, Data}->	    ok = httpc_manager:redirect_request(NewRequest),	    handle_pipeline(State#state{request = undefined}, Data);	{retry, TimeNewRequest, Data}->	    ok = httpc_manager:retry_request(TimeNewRequest),	    handle_pipeline(State#state{request = undefined}, Data);	{ok, Msg, Data} ->	    end_stream(StatusLine, Request),	    NewState = answer_request(Request, Msg, State),	    handle_pipeline(NewState, Data); 	{stop, Msg} ->	    end_stream(StatusLine, Request),	    NewState = answer_request(Request, Msg, State),	    {stop, normal, NewState}    end.handle_cookies(_,_, #options{cookies = disabled}) ->    ok;%% User wants to verify the cookies before they are stored,%% so the user will have to call a store command.handle_cookies(_,_, #options{cookies = verify}) ->    ok;handle_cookies(Headers, Request, #options{cookies = enabled}) ->    {Host, _ } = Request#request.address,

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?