⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioeventqueuethrottle.java

📁 mina是以Java实现的一个开源的网络程序框架
💻 JAVA
字号:
/* *  Licensed to the Apache Software Foundation (ASF) under one *  or more contributor license agreements.  See the NOTICE file *  distributed with this work for additional information *  regarding copyright ownership.  The ASF licenses this file *  to you under the Apache License, Version 2.0 (the *  "License"); you may not use this file except in compliance *  with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * *  Unless required by applicable law or agreed to in writing, *  software distributed under the License is distributed on an *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *  KIND, either express or implied.  See the License for the *  specific language governing permissions and limitations *  under the License. * */package org.apache.mina.filter.executor;import java.util.concurrent.atomic.AtomicInteger;import org.apache.mina.core.session.IoEvent;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * Throttles incoming or outgoing events. * * @author The Apache MINA Project (dev@mina.apache.org) * @version $Rev: 762170 $, $Date: 2009-04-06 00:01:18 +0200 (Mon, 06 Apr 2009) $ */public class IoEventQueueThrottle implements IoEventQueueHandler {    /** A logger for this class */    private final Logger logger = LoggerFactory.getLogger(getClass());    /** The event size estimator instance */    private final IoEventSizeEstimator eventSizeEstimator;        private volatile int threshold;    private final Object lock = new Object();    private final AtomicInteger counter = new AtomicInteger();    private int waiters;    public IoEventQueueThrottle() {        this(new DefaultIoEventSizeEstimator(), 65536);    }    public IoEventQueueThrottle(int threshold) {        this(new DefaultIoEventSizeEstimator(), threshold);    }    public IoEventQueueThrottle(IoEventSizeEstimator eventSizeEstimator, int threshold) {        if (eventSizeEstimator == null) {            throw new NullPointerException("eventSizeEstimator");        }        this.eventSizeEstimator = eventSizeEstimator;        setThreshold(threshold);    }    public IoEventSizeEstimator getEventSizeEstimator() {        return eventSizeEstimator;    }    public int getThreshold() {        return threshold;    }    public int getCounter() {        return counter.get();    }    public void setThreshold(int threshold) {        if (threshold <= 0) {            throw new IllegalArgumentException("threshold: " + threshold);        }        this.threshold = threshold;    }    public boolean accept(Object source, IoEvent event) {        return true;    }    public void offered(Object source, IoEvent event) {        int eventSize = estimateSize(event);        int currentCounter = counter.addAndGet(eventSize);        logState();        if (currentCounter >= threshold) {            block();        }    }    public void polled(Object source, IoEvent event) {        int eventSize = estimateSize(event);        int currentCounter = counter.addAndGet(-eventSize);        logState();        if (currentCounter < threshold) {            unblock();        }    }    private int estimateSize(IoEvent event) {        int size = getEventSizeEstimator().estimateSize(event);        if (size < 0) {            throw new IllegalStateException(                    IoEventSizeEstimator.class.getSimpleName() + " returned " +                    "a negative value (" + size + "): " + event);        }        return size;    }    private void logState() {        if (logger.isDebugEnabled()) {            logger.debug(Thread.currentThread().getName() + " state: " + counter.get() + " / " + getThreshold());        }    }    protected void block() {        if (logger.isDebugEnabled()) {            logger.debug(Thread.currentThread().getName() + " blocked: " + counter.get() + " >= " + threshold);        }        synchronized (lock) {            while (counter.get() >= threshold) {                waiters ++;                try {                    lock.wait();                } catch (InterruptedException e) {                    // Wait uninterruptably.                } finally {                    waiters --;                }            }        }        if (logger.isDebugEnabled()) {            logger.debug(Thread.currentThread().getName() + " unblocked: " + counter.get() + " < " + threshold);        }    }    protected void unblock() {        synchronized (lock) {            if (waiters > 0) {                lock.notify();            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -