⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udpmessagereceiver.java

📁 提供ESB 应用mule源代码 提供ESB 应用mule源代码
💻 JAVA
字号:
/* * $Id: UdpMessageReceiver.java 12833 2008-10-01 22:54:08Z dfeist $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc.  All rights reserved.  http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.udp;import org.mule.DefaultMuleMessage;import org.mule.api.MuleException;import org.mule.api.MuleMessage;import org.mule.api.config.MuleProperties;import org.mule.api.endpoint.InboundEndpoint;import org.mule.api.lifecycle.CreateException;import org.mule.api.lifecycle.Disposable;import org.mule.api.service.Service;import org.mule.api.transport.Connector;import org.mule.api.transport.MessageAdapter;import org.mule.config.i18n.CoreMessages;import org.mule.transport.AbstractMessageReceiver;import org.mule.transport.ConnectException;import org.mule.transport.udp.i18n.UdpMessages;import java.io.IOException;import java.net.DatagramPacket;import java.net.DatagramSocket;import java.net.InetAddress;import java.net.SocketAddress;import java.net.SocketTimeoutException;import java.net.URI;import java.net.UnknownHostException;import java.util.List;import javax.resource.spi.work.Work;import javax.resource.spi.work.WorkException;import javax.resource.spi.work.WorkManager;/** <code>UdpMessageReceiver</code> receives UDP message packets. */public class UdpMessageReceiver extends AbstractMessageReceiver implements Work{    protected DatagramSocket socket = null;    protected InetAddress inetAddress;    protected int bufferSize;    private URI uri;    protected List responseTransformers = null;    public UdpMessageReceiver(Connector connector, Service service, InboundEndpoint endpoint)            throws CreateException    {        super(connector, service, endpoint);        bufferSize = ((UdpConnector) connector).getReceiveBufferSize();        uri = endpoint.getEndpointURI().getUri();        try        {            if (!"null".equalsIgnoreCase(uri.getHost()))            {                inetAddress = InetAddress.getByName(uri.getHost());            }        }        catch (UnknownHostException e)        {            throw new CreateException(UdpMessages.failedToLocateHost(uri), e, this);        }        responseTransformers = getResponseTransformers();    }    protected void doConnect() throws Exception    {        try        {            socket = ((UdpConnector) connector).getServerSocket(endpoint);        }        catch (Exception e)        {            throw new ConnectException(UdpMessages.failedToBind(uri), e, this);        }        try        {            getWorkManager().scheduleWork(this, WorkManager.INDEFINITE, null, connector);        }        catch (WorkException e)        {            throw new ConnectException(CoreMessages.failedToScheduleWork(), e, this);        }    }    protected void doDisconnect() throws Exception    {        // this will cause the server thread to quit        disposing.set(true);        if (socket != null)        {            socket.close();        }    }    protected void doStart() throws MuleException    {        // nothing to do    }    protected void doStop() throws MuleException    {        // nothing to do    }    protected List getResponseTransformers()    {        List transformers = endpoint.getResponseTransformers();        if (transformers == null)        {            return connector.getDefaultResponseTransformers();        }        return transformers;    }    protected DatagramSocket createSocket(URI uri, InetAddress inetAddress) throws IOException    {        return new DatagramSocket(uri.getPort(), inetAddress);    }    /** Obtain the serverSocket */    public DatagramSocket getSocket()    {        return socket;    }    protected DatagramPacket createPacket()    {        DatagramPacket packet = new DatagramPacket(new byte[bufferSize], bufferSize);//        if (uri.getPort() > 0)//        {//            packet.setPort(uri.getPort());//        }//        packet.setAddress(inetAddress);        return packet;    }    public void run()    {        while (!disposing.get())        {            if (connector.isStarted())            {                try                {                    DatagramPacket packet = createPacket();                    try                    {                        if (logger.isDebugEnabled())                        {                            logger.debug("Receiving packet on " + uri);                        }                        socket.receive(packet);                        if (logger.isTraceEnabled())                        {                            logger.trace("Received packet on: " + uri);                        }                        Work work = createWork(packet);                        try                        {                            getWorkManager().scheduleWork(work, WorkManager.INDEFINITE, null, connector);                        }                        catch (WorkException e)                        {                            logger.error("Udp receiver interrupted: " + e.getMessage(), e);                        }                    }                    catch (SocketTimeoutException e)                    {                        // ignore                    }                }                catch (Exception e)                {                    if (!connector.isDisposed() && !disposing.get())                    {                        logger.debug("Accept failed on socket: " + e, e);                        handleException(e);                    }                }            }        }    }    public void release()    {        dispose();    }    protected void doDispose()    {        if (socket != null && !socket.isClosed())        {            logger.debug("Closing Udp connection: " + uri);            socket.close();            logger.info("Closed Udp connection: " + uri);        }    }    protected Work createWork(DatagramPacket packet) throws IOException    {        return new UdpWorker(new DatagramSocket(0), packet);    }    protected class UdpWorker implements Work, Disposable    {        private DatagramSocket socket = null;        private DatagramPacket packet;        public UdpWorker(DatagramSocket socket, DatagramPacket packet)        {            this.socket = socket;            this.packet = packet;        }        public void release()        {            dispose();        }        public void dispose()        {            if (socket != null && !socket.isClosed())            {                try                {                    socket.close();                }                catch (Exception e)                {                    logger.error("Socket close failed", e);                }            }            socket = null;        }        /** Accept requests from a given Udp address */        public void run()        {            MuleMessage returnMessage = null;            try            {                MessageAdapter adapter = connector.getMessageAdapter(packet);                final SocketAddress clientAddress = socket.getRemoteSocketAddress();                if (clientAddress != null)                {                    adapter.setProperty(MuleProperties.MULE_REMOTE_CLIENT_ADDRESS, clientAddress);                }                returnMessage = routeMessage(new DefaultMuleMessage(adapter), endpoint.isSynchronous());                if (returnMessage != null)                {                    byte[] data;                    if (responseTransformers != null)                    {                        returnMessage.applyTransformers(responseTransformers);                        Object response = returnMessage.getPayload();                        if (response instanceof byte[])                        {                            data = (byte[]) response;                        }                        else                        {                            data = response.toString().getBytes();                        }                    }                    else                    {                        data = returnMessage.getPayloadAsBytes();                    }                    DatagramPacket result = new DatagramPacket(data, data.length, packet.getAddress(),                            packet.getPort());                    socket.send(result);                }            }            catch (Exception e)            {                if (!disposing.get())                {                    handleException(e);                }            }            finally            {                dispose();            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -