⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streamwritefiltertest.java

📁 apache 的一个socket框架
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        /*         * Record expectations         */        mockSession.reset();        session.getAttribute(StreamWriteFilter.CURRENT_STREAM);        mockSession.setReturnValue(stream);        session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);        mockSession.setReturnValue(stream);        session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);        mockSession.setReturnValue(new DefaultWriteFuture(session));        session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);        mockSession.setReturnValue(queue);        nextFilter.filterWrite(session, wrs[0]);        session.getAttribute(StreamWriteFilter.CURRENT_STREAM);        mockSession.setReturnValue(null);        nextFilter.filterWrite(session, wrs[1]);        session.getAttribute(StreamWriteFilter.CURRENT_STREAM);        mockSession.setReturnValue(null);        nextFilter.filterWrite(session, wrs[2]);        session.getAttribute(StreamWriteFilter.CURRENT_STREAM);        mockSession.setReturnValue(null);        nextFilter.messageSent(session, stream);        /*         * Replay.         */        mockNextFilter.replay();        mockSession.replay();        filter.messageSent(nextFilter, session, new Object());        assertEquals(0, queue.size());        /*         * Verify.         */        mockNextFilter.verify();        mockSession.verify();    }    /**     * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the     * specified size.     */    public void testSetWriteBufferSize() throws Exception {        StreamWriteFilter filter = new StreamWriteFilter();        try {            filter.setWriteBufferSize(0);            fail("0 writeBuferSize specified. IllegalArgumentException expected.");        } catch (IllegalArgumentException iae) {        }        try {            filter.setWriteBufferSize(-100);            fail("Negative writeBuferSize specified. IllegalArgumentException expected.");        } catch (IllegalArgumentException iae) {        }        filter.setWriteBufferSize(1);        assertEquals(1, filter.getWriteBufferSize());        filter.setWriteBufferSize(1024);        assertEquals(1024, filter.getWriteBufferSize());    }    public void testWriteUsingSocketTransport() throws Exception {        IoAcceptor acceptor = new SocketAcceptor();        ((SocketAcceptorConfig) acceptor.getDefaultConfig())                .setReuseAddress(true);        SocketAddress address = new InetSocketAddress("localhost",                AvailablePortFinder.getNextAvailable());        IoConnector connector = new SocketConnector();        FixedRandomInputStream stream = new FixedRandomInputStream(                4 * 1024 * 1024);        SenderHandler sender = new SenderHandler(stream);        ReceiverHandler receiver = new ReceiverHandler(stream.size);        acceptor.bind(address, sender);        synchronized (sender.lock) {            synchronized (receiver.lock) {                connector.connect(address, receiver);                sender.lock.wait();                receiver.lock.wait();            }        }        acceptor.unbind(address);        assertEquals(stream.bytesRead, receiver.bytesRead);        assertEquals(stream.size, receiver.bytesRead);        byte[] expectedMd5 = stream.digest.digest();        byte[] actualMd5 = receiver.digest.digest();        assertEquals(expectedMd5.length, actualMd5.length);        for (int i = 0; i < expectedMd5.length; i++) {            assertEquals(expectedMd5[i], actualMd5[i]);        }    }    private static class FixedRandomInputStream extends InputStream {        long size;        long bytesRead = 0;        Random random = new Random();        MessageDigest digest;        FixedRandomInputStream(long size) throws Exception {            this.size = size;            digest = MessageDigest.getInstance("MD5");        }        @Override        public int read() throws IOException {            if (isAllWritten())                return -1;            bytesRead++;            byte b = (byte) random.nextInt(255);            digest.update(b);            return b;        }        public long getBytesRead() {            return bytesRead;        }        public long getSize() {            return size;        }        public boolean isAllWritten() {            return bytesRead >= size;        }    }    private static class SenderHandler extends IoHandlerAdapter {        final Object lock = new Object();        InputStream inputStream;        StreamWriteFilter streamWriteFilter = new StreamWriteFilter();        SenderHandler(InputStream inputStream) {            this.inputStream = inputStream;        }        @Override        public void sessionCreated(IoSession session) throws Exception {            super.sessionCreated(session);            session.getFilterChain().addLast("codec", streamWriteFilter);        }        @Override        public void sessionOpened(IoSession session) throws Exception {            session.write(inputStream);        }        @Override        public void exceptionCaught(IoSession session, Throwable cause)                throws Exception {            synchronized (lock) {                lock.notifyAll();            }        }        @Override        public void sessionClosed(IoSession session) throws Exception {            synchronized (lock) {                lock.notifyAll();            }        }        @Override        public void sessionIdle(IoSession session, IdleStatus status)                throws Exception {            synchronized (lock) {                lock.notifyAll();            }        }        @Override        public void messageSent(IoSession session, Object message)                throws Exception {            if (message == inputStream) {                synchronized (lock) {                    lock.notifyAll();                }            }        }    }    private static class ReceiverHandler extends IoHandlerAdapter {        final Object lock = new Object();        long bytesRead = 0;        long size = 0;        MessageDigest digest;        ReceiverHandler(long size) throws Exception {            this.size = size;            digest = MessageDigest.getInstance("MD5");        }        @Override        public void sessionCreated(IoSession session) throws Exception {            super.sessionCreated(session);            session.setIdleTime(IdleStatus.READER_IDLE, 5);        }        @Override        public void sessionIdle(IoSession session, IdleStatus status)                throws Exception {            session.close();        }        @Override        public void exceptionCaught(IoSession session, Throwable cause)                throws Exception {            synchronized (lock) {                lock.notifyAll();            }        }        @Override        public void sessionClosed(IoSession session) throws Exception {            synchronized (lock) {                lock.notifyAll();            }        }        @Override        public void messageReceived(IoSession session, Object message)                throws Exception {            ByteBuffer buf = (ByteBuffer) message;            while (buf.hasRemaining()) {                digest.update(buf.get());                bytesRead++;            }            if (bytesRead >= size) {                session.close();            }        }    }    public static class WriteRequestMatcher extends AbstractMatcher {        @Override        protected boolean argumentMatches(Object expected, Object actual) {            if (expected instanceof WriteRequest                    && actual instanceof WriteRequest) {                WriteRequest w1 = (WriteRequest) expected;                WriteRequest w2 = (WriteRequest) actual;                return w1.getMessage().equals(w2.getMessage())                        && w1.getFuture().isWritten() == w2.getFuture()                                .isWritten();            }            return super.argumentMatches(expected, actual);        }    }    private static class DummyWriteFuture implements WriteFuture {        private boolean written;        public boolean isWritten() {            return written;        }        public void setWritten(boolean written) {            this.written = written;        }        public IoSession getSession() {            return null;        }        public Object getLock() {            return this;        }        public void join() {        }        public boolean join(long timeoutInMillis) {            return true;        }        public boolean isReady() {            return true;        }        public void addListener(IoFutureListener listener) {        }        public void removeListener(IoFutureListener listener) {        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -