📄 requestresponsefilter.java
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */package org.apache.mina.filter.reqres;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.LinkedHashSet;import java.util.List;import java.util.Map;import java.util.Set;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.ScheduledFuture;import java.util.concurrent.TimeUnit;import org.apache.mina.core.filterchain.IoFilterChain;import org.apache.mina.core.session.AttributeKey;import org.apache.mina.core.session.IoSession;import org.apache.mina.core.write.WriteRequest;import org.apache.mina.filter.util.WriteRequestFilter;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * TODO Add documentation * * @author The Apache MINA Project (dev@mina.apache.org) * @version $Rev: 689337 $, $Date: 2008-08-27 04:18:17 +0200 (Wed, 27 Aug 2008) $ * @org.apache.xbean.XBean */public class RequestResponseFilter extends WriteRequestFilter { private final AttributeKey RESPONSE_INSPECTOR = new AttributeKey(getClass(), "responseInspector"); private final AttributeKey REQUEST_STORE = new AttributeKey(getClass(), "requestStore"); private final AttributeKey UNRESPONDED_REQUEST_STORE = new AttributeKey(getClass(), "unrespondedRequestStore"); private final ResponseInspectorFactory responseInspectorFactory; private final ScheduledExecutorService timeoutScheduler; private final Logger logger = LoggerFactory.getLogger(getClass()); public RequestResponseFilter(final ResponseInspector responseInspector, ScheduledExecutorService timeoutScheduler) { if (responseInspector == null) { throw new NullPointerException("responseInspector"); } if (timeoutScheduler == null) { throw new NullPointerException("timeoutScheduler"); } this.responseInspectorFactory = new ResponseInspectorFactory() { public ResponseInspector getResponseInspector() { return responseInspector; } }; this.timeoutScheduler = timeoutScheduler; } public RequestResponseFilter( ResponseInspectorFactory responseInspectorFactory, ScheduledExecutorService timeoutScheduler) { if (responseInspectorFactory == null) { throw new NullPointerException("responseInspectorFactory"); } if (timeoutScheduler == null) { throw new NullPointerException("timeoutScheduler"); } this.responseInspectorFactory = responseInspectorFactory; this.timeoutScheduler = timeoutScheduler; } @Override public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { if (parent.contains(this)) { throw new IllegalArgumentException( "You can't add the same filter instance more than once. Create another instance and add it."); } IoSession session = parent.getSession(); session.setAttribute(RESPONSE_INSPECTOR, responseInspectorFactory .getResponseInspector()); session.setAttribute(REQUEST_STORE, createRequestStore(session)); session.setAttribute(UNRESPONDED_REQUEST_STORE, createUnrespondedRequestStore(session)); } @Override public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { IoSession session = parent.getSession(); destroyUnrespondedRequestStore(getUnrespondedRequestStore(session)); destroyRequestStore(getRequestStore(session)); session.removeAttribute(UNRESPONDED_REQUEST_STORE); session.removeAttribute(REQUEST_STORE); session.removeAttribute(RESPONSE_INSPECTOR); } @Override public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { ResponseInspector responseInspector = (ResponseInspector) session .getAttribute(RESPONSE_INSPECTOR); Object requestId = responseInspector.getRequestId(message); if (requestId == null) { // Not a response message. Ignore. nextFilter.messageReceived(session, message); return; } // Retrieve (or remove) the corresponding request. ResponseType type = responseInspector.getResponseType(message); if (type == null) { nextFilter.exceptionCaught(session, new IllegalStateException( responseInspector.getClass().getName() + "#getResponseType() may not return null.")); } Map<Object, Request> requestStore = getRequestStore(session); Request request; switch (type) { case WHOLE: case PARTIAL_LAST: synchronized (requestStore) { request = requestStore.remove(requestId); } break; case PARTIAL: synchronized (requestStore) { request = requestStore.get(requestId); } break; default: throw new InternalError(); } if (request == null) { // A response message without request. Swallow the event because // the response might have arrived too late. if (logger.isWarnEnabled()) { logger.warn("Unknown request ID '" + requestId + "' for the response message. Timed out already?: " + message); } } else { // Found a matching request. // Cancel the timeout task if needed. if (type != ResponseType.PARTIAL) { ScheduledFuture<?> scheduledFuture = request.getTimeoutFuture(); if (scheduledFuture != null) { scheduledFuture.cancel(false); Set<Request> unrespondedRequests = getUnrespondedRequestStore(session); synchronized (unrespondedRequests) { unrespondedRequests.remove(request); } } } // And forward the event. Response response = new Response(request, message, type); request.signal(response); nextFilter.messageReceived(session, response); } } @Override protected Object doFilterWrite( final NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { Object message = writeRequest.getMessage(); if (!(message instanceof Request)) { return null; } final Request request = (Request) message; if (request.getTimeoutFuture() != null) { throw new IllegalArgumentException("Request can not be reused."); } Map<Object, Request> requestStore = getRequestStore(session); Object oldValue = null; Object requestId = request.getId(); synchronized (requestStore) { oldValue = requestStore.get(requestId); if (oldValue == null) { requestStore.put(requestId, request); } } if (oldValue != null) { throw new IllegalStateException( "Duplicate request ID: " + request.getId()); } // Schedule a task to be executed on timeout. TimeoutTask timeoutTask = new TimeoutTask( nextFilter, request, session); ScheduledFuture<?> timeoutFuture = timeoutScheduler.schedule( timeoutTask, request.getTimeoutMillis(), TimeUnit.MILLISECONDS); request.setTimeoutTask(timeoutTask); request.setTimeoutFuture(timeoutFuture); // Add the timeout task to the unfinished task set. Set<Request> unrespondedRequests = getUnrespondedRequestStore(session); synchronized (unrespondedRequests) { unrespondedRequests.add(request); } return request.getMessage(); } @Override public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { // Copy the unfinished task set to avoid unnecessary lock acquisition. // Copying will be cheap because there won't be that many requests queued. Set<Request> unrespondedRequests = getUnrespondedRequestStore(session); List<Request> unrespondedRequestsCopy; synchronized (unrespondedRequests) { unrespondedRequestsCopy = new ArrayList<Request>( unrespondedRequests); unrespondedRequests.clear(); } // Generate timeout artificially. for (Request r : unrespondedRequestsCopy) { if (r.getTimeoutFuture().cancel(false)) { r.getTimeoutTask().run(); } } // Clear the request store just in case we missed something, though it's unlikely. Map<Object, Request> requestStore = getRequestStore(session); synchronized (requestStore) { requestStore.clear(); } // Now tell the main subject. nextFilter.sessionClosed(session); } @SuppressWarnings("unchecked") private Map<Object, Request> getRequestStore(IoSession session) { return (Map<Object, Request>) session.getAttribute(REQUEST_STORE); } @SuppressWarnings("unchecked") private Set<Request> getUnrespondedRequestStore(IoSession session) { return (Set<Request>) session.getAttribute(UNRESPONDED_REQUEST_STORE); } /** * Returns a {@link Map} which stores {@code messageId}-{@link Request} * pairs whose {@link Response}s are not received yet. Please override * this method if you need to use other {@link Map} implementation * than the default one ({@link HashMap}). */ protected Map<Object, Request> createRequestStore( IoSession session) { return new HashMap<Object, Request>(); } /** * Returns a {@link Set} which stores {@link Request} whose * {@link Response}s are not received yet. Please override * this method if you need to use other {@link Set} implementation * than the default one ({@link LinkedHashSet}). Please note that * the {@link Iterator} of the returned {@link Set} have to iterate * its elements in the insertion order to ensure that * {@link RequestTimeoutException}s are thrown in the order which * {@link Request}s were written. If you don't need to guarantee * the order of thrown exceptions, any {@link Set} implementation * can be used. */ protected Set<Request> createUnrespondedRequestStore( IoSession session) { return new LinkedHashSet<Request>(); } /** * Releases any resources related with the {@link Map} created by * {@link #createRequestStore(IoSession)}. This method is useful * if you override {@link #createRequestStore(IoSession)}. * * @param requestStore what you returned in {@link #createRequestStore(IoSession)} */ protected void destroyRequestStore( Map<Object, Request> requestStore) { } /** * Releases any resources related with the {@link Set} created by * {@link #createUnrespondedRequestStore(IoSession)}. This method is * useful if you override {@link #createUnrespondedRequestStore(IoSession)}. * * @param unrespondedRequestStore what you returned in {@link #createUnrespondedRequestStore(IoSession)} */ protected void destroyUnrespondedRequestStore( Set<Request> unrespondedRequestStore) { } private class TimeoutTask implements Runnable { private final NextFilter filter; private final Request request; private final IoSession session; private TimeoutTask(NextFilter filter, Request request, IoSession session) { this.filter = filter; this.request = request; this.session = session; } public void run() { Set<Request> unrespondedRequests = getUnrespondedRequestStore(session); if (unrespondedRequests != null) { synchronized (unrespondedRequests) { unrespondedRequests.remove(request); } } Map<Object, Request> requestStore = getRequestStore(session); Object requestId = request.getId(); boolean timedOut; synchronized (requestStore) { if (requestStore.get(requestId) == request) { requestStore.remove(requestId); timedOut = true; } else { timedOut = false; } } if (timedOut) { // Throw the exception only when it's really timed out. RequestTimeoutException e = new RequestTimeoutException(request); request.signal(e); filter.exceptionCaught(session, e); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -