📄 protocolcodecfilter.java
字号:
WriteRequest writeRequest) throws Exception { Object message = writeRequest.getMessage(); // Bypass the encoding if the message is contained in a ByteBuffer, // as it has already been encoded before if (message instanceof IoBuffer || message instanceof FileRegion) { nextFilter.filterWrite(session, writeRequest); return; } // Get the encoder in the session ProtocolEncoder encoder = getEncoder(session); ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest); try { // Now we can try to encode the response encoder.encode(session, message, encoderOut); // Send it directly ((ProtocolEncoderOutputImpl)encoderOut).flushWithoutFuture(); // Call the next filter nextFilter.filterWrite(session, new MessageWriteRequest( writeRequest)); } catch (Throwable t) { ProtocolEncoderException pee; // Generate the correct exception if (t instanceof ProtocolEncoderException) { pee = (ProtocolEncoderException) t; } else { pee = new ProtocolEncoderException(t); } throw pee; } } @Override public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { // Call finishDecode() first when a connection is closed. ProtocolDecoder decoder = getDecoder(session); ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); try { decoder.finishDecode(session, decoderOut); } catch (Throwable t) { ProtocolDecoderException pde; if (t instanceof ProtocolDecoderException) { pde = (ProtocolDecoderException) t; } else { pde = new ProtocolDecoderException(t); } throw pde; } finally { // Dispose everything disposeCodec(session); decoderOut.flush(nextFilter, session); } // Call the next filter nextFilter.sessionClosed(session); } private static class EncodedWriteRequest extends DefaultWriteRequest { private EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) { super(encodedMessage, future, destination); } } private static class MessageWriteRequest extends WriteRequestWrapper { private MessageWriteRequest(WriteRequest writeRequest) { super(writeRequest); } @Override public Object getMessage() { return EMPTY_BUFFER; } } private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput { public ProtocolDecoderOutputImpl() { } public void flush(NextFilter nextFilter, IoSession session) { Queue<Object> messageQueue = getMessageQueue(); while (!messageQueue.isEmpty()) { nextFilter.messageReceived(session, messageQueue.poll()); } } } private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput { private final IoSession session; private final NextFilter nextFilter; private final WriteRequest writeRequest; public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { this.session = session; this.nextFilter = nextFilter; this.writeRequest = writeRequest; } public WriteFuture flush() { Queue<Object> bufferQueue = getMessageQueue(); WriteFuture future = null; for (;;) { Object encodedMessage = bufferQueue.poll(); if (encodedMessage == null) { break; } // Flush only when the buffer has remaining. if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) { future = new DefaultWriteFuture(session); nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, writeRequest.getDestination())); } } if (future == null) { future = DefaultWriteFuture.newNotWrittenFuture( session, new NothingWrittenException(writeRequest)); } return future; } public void flushWithoutFuture() { Queue<Object> bufferQueue = getMessageQueue(); for (;;) { Object encodedMessage = bufferQueue.poll(); if (encodedMessage == null) { break; } // Flush only when the buffer has remaining. if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) { SocketAddress destination = writeRequest.getDestination(); WriteRequest writeRequest = new EncodedWriteRequest( encodedMessage, null, destination); nextFilter.filterWrite(session, writeRequest); } } } } //----------- Helper methods --------------------------------------------- /** * Initialize the encoder and the decoder, storing them in the * session attributes. */ private void initCodec(IoSession session) throws Exception { // Creates the decoder and stores it into the newly created session ProtocolDecoder decoder = factory.getDecoder(session); session.setAttribute(DECODER, decoder); // Creates the encoder and stores it into the newly created session ProtocolEncoder encoder = factory.getEncoder(session); session.setAttribute(ENCODER, encoder); } /** * Dispose the encoder, decoder, and the callback for the decoded * messages. */ private void disposeCodec(IoSession session) { // We just remove the two instances of encoder/decoder to release resources // from the session disposeEncoder(session); disposeDecoder(session); // We also remove the callback disposeDecoderOut(session); } /** * dispose the encoder, removing its instance from the * session's attributes, and calling the associated * dispose method. */ private void disposeEncoder(IoSession session) { ProtocolEncoder encoder = (ProtocolEncoder) session .removeAttribute(ENCODER); if (encoder == null) { return; } try { encoder.dispose(session); } catch (Throwable t) { logger.warn( "Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')'); } } /** * Get the decoder instance from a given session. * * @param session The associated session we will get the decoder from * @return The decoder instance */ private ProtocolDecoder getDecoder(IoSession session) { return (ProtocolDecoder) session.getAttribute(DECODER); } /** * dispose the decoder, removing its instance from the * session's attributes, and calling the associated * dispose method. */ private void disposeDecoder(IoSession session) { ProtocolDecoder decoder = (ProtocolDecoder) session .removeAttribute(DECODER); if (decoder == null) { return; } try { decoder.dispose(session); } catch (Throwable t) { logger.warn( "Falied to dispose: " + decoder.getClass().getName() + " (" + decoder + ')'); } } /** * Return a reference to the decoder callback. If it's not already created * and stored into the session, we create a new instance. */ private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) { ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT); if (out == null) { // Create a new instance, and stores it into the session out = new ProtocolDecoderOutputImpl(); session.setAttribute(DECODER_OUT, out); } return out; } private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT); if (out == null) { // Create a new instance, and stores it into the session out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest); session.setAttribute(ENCODER_OUT, out); } return out; } /** * Remove the decoder callback from the session's attributes. */ private void disposeDecoderOut(IoSession session) { session.removeAttribute(DECODER_OUT); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -