abstractbyteprotocol.java

来自「提供ESB 应用mule源代码 提供ESB 应用mule源代码」· Java 代码 · 共 218 行

JAVA
218
字号
/** $Id: AbstractByteProtocol.java 13051 2008-10-10 21:10:48Z dfeist $* --------------------------------------------------------------------------------------* Copyright (c) MuleSource, Inc.  All rights reserved.  http://www.mulesource.com** The software in this package is published under the terms of the CPAL v1.0* license, a copy of which has been included with this distribution in the* LICENSE.txt file.*/package org.mule.transport.tcp.protocols;import org.mule.ResponseOutputStream;import org.mule.api.transport.MessageAdapter;import org.mule.transport.tcp.TcpProtocol;import org.mule.util.ClassUtils;import org.mule.util.IOUtils;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.io.Serializable;import java.net.Socket;import java.net.SocketException;import java.net.SocketTimeoutException;import org.apache.commons.lang.SerializationUtils;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;/** * This Abstract class has been introduced so as to have the byte protocols (i.e. the * protocols that had only a single write method taking just an array of bytes as a * parameter) to inherit from since they will all behave the same, i.e. if the object * is serializable, serialize it into an array of bytes and send it. * <p/> * <p>Note that the raw write method has changed name from <code>write</code> to * <code>writeByteArray</code>.  This is to remove ambiguity from the code.  In almost * all cases it is possible to call {@link #write(java.io.OutputStream, Object)} which * will, via {@link #write(java.io.OutputStream, Object)}, dispatch to * {@link #writeByteArray(java.io.OutputStream, byte[])}.</p>. */public abstract class AbstractByteProtocol implements TcpProtocol{    private static final Log logger = LogFactory.getLog(DirectProtocol.class);    private static final long PAUSE_PERIOD = 100;    public static final int EOF = -1;    // make this really clear in subclasses, because otherwise people will forget    public static final boolean STREAM_OK = true;    public static final boolean NO_STREAM = false;    private boolean streamOk;    public AbstractByteProtocol(boolean streamOk)    {        this.streamOk = streamOk;    }    public void write(OutputStream os, Object data) throws IOException    {        if (data instanceof InputStream)        {            if (streamOk)            {                InputStream is = (InputStream) data;                IOUtils.copyLarge(is, os);                os.flush();                os.close();                is.close();            }            else            {                throw new IOException("TCP protocol " + ClassUtils.getSimpleName(getClass())                        + " cannot handle streaming");            }        }        else if (data instanceof MessageAdapter)        {            write(os, ((MessageAdapter) data).getPayload());        }        else if (data instanceof byte[])        {            writeByteArray(os, (byte[]) data);        }        else if (data instanceof String)        {            // TODO SF: encoding is lost/ignored; it is probably a good idea to have            // a separate "stringEncoding" property on the protocol            writeByteArray(os, ((String) data).getBytes());        }        else if (data instanceof Serializable)        {            writeByteArray(os, SerializationUtils.serialize((Serializable) data));        }        else        {            throw new IllegalArgumentException("Cannot serialize data: " + data);        }    }    protected void writeByteArray(OutputStream os, byte[] data) throws IOException    {        os.write(data);    }    /**     * Manage non-blocking reads and handle errors     *     * @param is     The input stream to read from     * @param buffer The buffer to read into     * @return The amount of data read (always non-zero, -1 on EOF or socket exception)     * @throws IOException other than socket exceptions     */    protected int safeRead(InputStream is, byte[] buffer) throws IOException    {        return safeRead(is, buffer, buffer.length);    }    /**     * Manage non-blocking reads and handle errors     *     * @param is     The input stream to read from     * @param buffer The buffer to read into     * @param size   The amount of data (upper bound) to read     * @return The amount of data read (always non-zero, -1 on EOF or socket exception)     * @throws IOException other than socket exceptions     */    protected int safeRead(InputStream is, byte[] buffer, int size) throws IOException    {        int len;        try        {            do            {                len = is.read(buffer, 0, size);                if (0 == len)                {                    // wait for non-blocking input stream                    // use new lock since not expecting notification                    try                    {                        Thread.sleep(PAUSE_PERIOD);                    }                    catch (InterruptedException e)                    {                        // no-op                    }                }            }            while (0 == len);            return len;        }        catch (SocketException e)        {            // do not pollute the log with a stacktrace, log only the message            logger.info("Socket exception occured: " + e.getMessage());            return EOF;        }        catch (SocketTimeoutException e)        {            logger.debug("Socket timeout.");            return EOF;        }    }    /**     * Make a single transfer from source to dest via a byte array buffer     *     * @param source Source of data     * @param buffer Buffer array for transfer     * @param dest   Destination of data     * @return Amount of data transferred, or -1 on eof or socket error     * @throws IOException On non-socket error     */    protected int copy(InputStream source, byte[] buffer, OutputStream dest) throws IOException    {        return copy(source, buffer, dest, buffer.length);    }    /**     * Make a single transfer from source to dest via a byte array buffer     *     * @param source Source of data     * @param buffer Buffer array for transfer     * @param dest   Destination of data     * @param size   The amount of data (upper bound) to read     * @return Amount of data transferred, or -1 on eof or socket error     * @throws IOException On non-socket error     */    protected int copy(InputStream source, byte[] buffer, OutputStream dest, int size) throws IOException    {        int len = safeRead(source, buffer, size);        if (len > 0)        {            dest.write(buffer, 0, len);        }        return len;    }    protected byte[] nullEmptyArray(byte[] data)    {        if (0 == data.length)        {            return null;        }        else        {            return data;        }    }    public ResponseOutputStream createResponse(Socket socket) throws IOException    {        return new ResponseOutputStream(socket, new ProtocolStream(this, streamOk, socket.getOutputStream()));    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?