📄 streamwritefiltertest.java
字号:
/* * Record expectations */ mockSession.reset(); session.getAttribute(StreamWriteFilter.CURRENT_STREAM); mockSession.setReturnValue(stream); session.removeAttribute(StreamWriteFilter.CURRENT_STREAM); mockSession.setReturnValue(stream); session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE); mockSession.setReturnValue(new DefaultWriteFuture(session)); session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE); mockSession.setReturnValue(queue); nextFilter.filterWrite(session, wrs[0]); session.getAttribute(StreamWriteFilter.CURRENT_STREAM); mockSession.setReturnValue(null); nextFilter.filterWrite(session, wrs[1]); session.getAttribute(StreamWriteFilter.CURRENT_STREAM); mockSession.setReturnValue(null); nextFilter.filterWrite(session, wrs[2]); session.getAttribute(StreamWriteFilter.CURRENT_STREAM); mockSession.setReturnValue(null); nextFilter.messageSent(session, stream); /* * Replay. */ mockNextFilter.replay(); mockSession.replay(); filter.messageSent(nextFilter, session, new Object()); assertEquals(0, queue.size()); /* * Verify. */ mockNextFilter.verify(); mockSession.verify(); } /** * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the * specified size. */ public void testSetWriteBufferSize() throws Exception { StreamWriteFilter filter = new StreamWriteFilter(); try { filter.setWriteBufferSize(0); fail("0 writeBuferSize specified. IllegalArgumentException expected."); } catch (IllegalArgumentException iae) { } try { filter.setWriteBufferSize(-100); fail("Negative writeBuferSize specified. IllegalArgumentException expected."); } catch (IllegalArgumentException iae) { } filter.setWriteBufferSize(1); assertEquals(1, filter.getWriteBufferSize()); filter.setWriteBufferSize(1024); assertEquals(1024, filter.getWriteBufferSize()); } public void testWriteUsingSocketTransport() throws Exception { IoAcceptor acceptor = new SocketAcceptor(); ((SocketAcceptorConfig) acceptor.getDefaultConfig()) .setReuseAddress(true); SocketAddress address = new InetSocketAddress("localhost", AvailablePortFinder.getNextAvailable()); IoConnector connector = new SocketConnector(); FixedRandomInputStream stream = new FixedRandomInputStream( 4 * 1024 * 1024); SenderHandler sender = new SenderHandler(stream); ReceiverHandler receiver = new ReceiverHandler(stream.size); acceptor.bind(address, sender); synchronized (sender.lock) { synchronized (receiver.lock) { connector.connect(address, receiver); sender.lock.wait(); receiver.lock.wait(); } } acceptor.unbind(address); assertEquals(stream.bytesRead, receiver.bytesRead); assertEquals(stream.size, receiver.bytesRead); byte[] expectedMd5 = stream.digest.digest(); byte[] actualMd5 = receiver.digest.digest(); assertEquals(expectedMd5.length, actualMd5.length); for (int i = 0; i < expectedMd5.length; i++) { assertEquals(expectedMd5[i], actualMd5[i]); } } private static class FixedRandomInputStream extends InputStream { long size; long bytesRead = 0; Random random = new Random(); MessageDigest digest; FixedRandomInputStream(long size) throws Exception { this.size = size; digest = MessageDigest.getInstance("MD5"); } @Override public int read() throws IOException { if (isAllWritten()) return -1; bytesRead++; byte b = (byte) random.nextInt(255); digest.update(b); return b; } public long getBytesRead() { return bytesRead; } public long getSize() { return size; } public boolean isAllWritten() { return bytesRead >= size; } } private static class SenderHandler extends IoHandlerAdapter { final Object lock = new Object(); InputStream inputStream; StreamWriteFilter streamWriteFilter = new StreamWriteFilter(); SenderHandler(InputStream inputStream) { this.inputStream = inputStream; } @Override public void sessionCreated(IoSession session) throws Exception { super.sessionCreated(session); session.getFilterChain().addLast("codec", streamWriteFilter); } @Override public void sessionOpened(IoSession session) throws Exception { session.write(inputStream); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { synchronized (lock) { lock.notifyAll(); } } @Override public void sessionClosed(IoSession session) throws Exception { synchronized (lock) { lock.notifyAll(); } } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { synchronized (lock) { lock.notifyAll(); } } @Override public void messageSent(IoSession session, Object message) throws Exception { if (message == inputStream) { synchronized (lock) { lock.notifyAll(); } } } } private static class ReceiverHandler extends IoHandlerAdapter { final Object lock = new Object(); long bytesRead = 0; long size = 0; MessageDigest digest; ReceiverHandler(long size) throws Exception { this.size = size; digest = MessageDigest.getInstance("MD5"); } @Override public void sessionCreated(IoSession session) throws Exception { super.sessionCreated(session); session.setIdleTime(IdleStatus.READER_IDLE, 5); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { session.close(); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { synchronized (lock) { lock.notifyAll(); } } @Override public void sessionClosed(IoSession session) throws Exception { synchronized (lock) { lock.notifyAll(); } } @Override public void messageReceived(IoSession session, Object message) throws Exception { ByteBuffer buf = (ByteBuffer) message; while (buf.hasRemaining()) { digest.update(buf.get()); bytesRead++; } if (bytesRead >= size) { session.close(); } } } public static class WriteRequestMatcher extends AbstractMatcher { @Override protected boolean argumentMatches(Object expected, Object actual) { if (expected instanceof WriteRequest && actual instanceof WriteRequest) { WriteRequest w1 = (WriteRequest) expected; WriteRequest w2 = (WriteRequest) actual; return w1.getMessage().equals(w2.getMessage()) && w1.getFuture().isWritten() == w2.getFuture() .isWritten(); } return super.argumentMatches(expected, actual); } } private static class DummyWriteFuture implements WriteFuture { private boolean written; public boolean isWritten() { return written; } public void setWritten(boolean written) { this.written = written; } public IoSession getSession() { return null; } public Object getLock() { return this; } public void join() { } public boolean join(long timeoutInMillis) { return true; } public boolean isReady() { return true; } public void addListener(IoFutureListener listener) { } public void removeListener(IoFutureListener listener) { } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -