⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 serialsessionimpl.java

📁 mina是以Java实现的一个开源的网络程序框架
💻 JAVA
字号:
/* *  Licensed to the Apache Software Foundation (ASF) under one *  or more contributor license agreements.  See the NOTICE file *  distributed with this work for additional information *  regarding copyright ownership.  The ASF licenses this file *  to you under the Apache License, Version 2.0 (the *  "License"); you may not use this file except in compliance *  with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * *  Unless required by applicable law or agreed to in writing, *  software distributed under the License is distributed on an *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *  KIND, either express or implied.  See the License for the *  specific language governing permissions and limitations *  under the License. * */package org.apache.mina.transport.serial;import gnu.io.SerialPort;import gnu.io.SerialPortEvent;import gnu.io.SerialPortEventListener;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.util.TooManyListenersException;import org.apache.mina.core.session.AbstractIoSession;import org.apache.mina.core.filterchain.DefaultIoFilterChain;import org.apache.mina.core.service.DefaultTransportMetadata;import org.apache.mina.util.ExceptionMonitor;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.filterchain.IoFilterChain;import org.apache.mina.core.service.IoHandler;import org.apache.mina.core.service.IoProcessor;import org.apache.mina.core.service.IoService;import org.apache.mina.core.service.IoServiceListenerSupport;import org.apache.mina.core.service.TransportMetadata;import org.apache.mina.core.write.WriteRequest;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * An imlpementation of {@link SerialSession}. * * @author The Apache MINA Project (dev@mina.apache.org) * @version $Rev: 751630 $, $Date: 2009-03-09 10:20:18 +0100 (Mon, 09 Mar 2009) $ */class SerialSessionImpl extends AbstractIoSession implements        SerialSession, SerialPortEventListener {    static final TransportMetadata METADATA =        new DefaultTransportMetadata(                "rxtx", "serial", false, true, SerialAddress.class,                SerialSessionConfig.class, IoBuffer.class);    private final SerialSessionConfig config = new DefaultSerialSessionConfig();    private final IoProcessor<SerialSessionImpl> processor = new SerialIoProcessor();    private final IoHandler ioHandler;    private final IoFilterChain filterChain;    private final SerialConnector service;    private final IoServiceListenerSupport serviceListeners;    private final SerialAddress address;    private final SerialPort port;    private final Logger log;    private InputStream inputStream;    private OutputStream outputStream;    SerialSessionImpl(            SerialConnector service, IoServiceListenerSupport serviceListeners,            SerialAddress address, SerialPort port) {        this.service = service;        this.serviceListeners = serviceListeners;        ioHandler = service.getHandler();        filterChain = new DefaultIoFilterChain(this);        this.port = port;        this.address = address;        log = LoggerFactory.getLogger(SerialSessionImpl.class);    }    public SerialSessionConfig getConfig() {        return config;    }    public IoFilterChain getFilterChain() {        return filterChain;    }    public IoHandler getHandler() {        return ioHandler;    }    public TransportMetadata getTransportMetadata() {        return METADATA;    }    public SerialAddress getLocalAddress() {        return null; // not applicable    }    public SerialAddress getRemoteAddress() {        return address;    }    @Override    public SerialAddress getServiceAddress() {        return (SerialAddress) super.getServiceAddress();    }    public IoService getService() {        return service;    }    /**     * start handling streams     *     * @throws IOException     * @throws TooManyListenersException     */    void start() throws IOException, TooManyListenersException {        inputStream = port.getInputStream();        outputStream = port.getOutputStream();        ReadWorker w = new ReadWorker();        w.start();        port.addEventListener(this);        service.getIdleStatusChecker0().addSession(this);        try {        	getService().getFilterChainBuilder().buildFilterChain(getFilterChain());        	serviceListeners.fireSessionCreated(this);        } catch (Throwable e) {        	getFilterChain().fireExceptionCaught(e);        	processor.remove(this);        }    }    private final Object writeMonitor = new Object();    private WriteWorker writeWorker;    private class WriteWorker extends Thread {        @Override        public void run() {            while (isConnected() && !isClosing()) {                flushWrites();                // wait for more data                synchronized (writeMonitor) {                    try {                        writeMonitor.wait();                    } catch (InterruptedException e) {                        log.error("InterruptedException", e);                    }                }            }        }    }    private void flushWrites() {        for (; ;) {            WriteRequest req = getCurrentWriteRequest();            if (req == null) {                req = getWriteRequestQueue().poll(this);                if (req == null) {                    break;                }            }            IoBuffer buf = (IoBuffer) req.getMessage();            if (buf.remaining() == 0) {                setCurrentWriteRequest(null);                buf.reset();                this.getFilterChain().fireMessageSent(req);                continue;            }            int writtenBytes = buf.remaining();            try {            	outputStream.write(buf.array(), buf.position(), writtenBytes);                buf.position(buf.position() + writtenBytes);                req.getFuture().setWritten();            } catch (IOException e) {                this.getFilterChain().fireExceptionCaught(e);            }        }    }    private final Object readReadyMonitor = new Object();    private class ReadWorker extends Thread {        @Override        public void run() {            while (isConnected() && !isClosing()) {                synchronized (readReadyMonitor) {                    try {                        readReadyMonitor.wait();                    } catch (InterruptedException e) {                        log.error("InterruptedException", e);                    }                    if (isClosing() || !isConnected()) {                        break;                    }                    int dataSize;                    try {                        dataSize = inputStream.available();                        byte[] data = new byte[dataSize];                        int readBytes = inputStream.read(data);                        if (readBytes > 0) {                            IoBuffer buf = IoBuffer                                    .wrap(data, 0, readBytes);                            buf.put(data, 0, readBytes);                            buf.flip();                            getFilterChain().fireMessageReceived(                                    buf);                        }                    } catch (IOException e) {                        getFilterChain().fireExceptionCaught(                                e);                    }                }            }        }    }    public void serialEvent(SerialPortEvent evt) {        if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {            synchronized (readReadyMonitor) {                readReadyMonitor.notifyAll();            }        }    }    @Override    public IoProcessor<SerialSessionImpl> getProcessor() {        return processor;    }    private class SerialIoProcessor implements IoProcessor<SerialSessionImpl> {        public void add(SerialSessionImpl session) {            // It's already added when the session is constructed.        }        public void flush(SerialSessionImpl session) {            if (writeWorker == null) {                writeWorker = new WriteWorker();                writeWorker.start();            } else {                synchronized (writeMonitor) {                    writeMonitor.notifyAll();                }            }        }        public void remove(SerialSessionImpl session) {            try {                inputStream.close();            } catch (IOException e) {                ExceptionMonitor.getInstance().exceptionCaught(e);            }            try {                outputStream.close();            } catch (IOException e) {                ExceptionMonitor.getInstance().exceptionCaught(e);            }            port.close();            flush(session);            synchronized (readReadyMonitor) {                readReadyMonitor.notifyAll();            }            serviceListeners.fireSessionDestroyed(SerialSessionImpl.this);        }        public void updateTrafficControl(SerialSessionImpl session) {            throw new UnsupportedOperationException();        }        public void dispose() {            // Nothing to dispose        }        public boolean isDisposed() {            return false;        }        public boolean isDisposing() {            return false;        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -