⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mmlclienthandler.java

📁 java实现的socket连接池,使用了apache的objectpool
💻 JAVA
字号:
// $dateCreated:2006-8-20-12:05:38 userCreated:pippo
package com.vms.ts.simulator.mmlserver;

import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.net.SocketTimeoutException;

import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.quickserver.net.AppException;
import org.quickserver.net.ConnectionLostException;
import org.quickserver.net.server.AuthStatus;
import org.quickserver.net.server.ClientEvent;
import org.quickserver.net.server.ClientHandler;
import org.quickserver.net.server.impl.BlockingClientHandler;
import org.quickserver.util.Assertion;
import org.quickserver.util.MyString;

/**
 * @author pippo
 */
public class MMLClientHandler extends BlockingClientHandler implements ClientHandler {

    private static Log logger = LogFactory.getLog(MMLClientHandler.class);

    public MMLClientHandler(int instanceCount) {
        super(instanceCount);
    }

    public MMLClientHandler() {
        super();
    }

    /*
     * (非 Javadoc)
     *
     * @see org.quickserver.net.server.impl.BasicClientHandler#setInputStream(java.io.InputStream)
     */
    protected void setInputStream(InputStream in) throws IOException {
        this.in = in;
    }

    /*
     * (非 Javadoc)
     *
     * @see java.lang.Runnable#run()
     */
    public void run() {
        if (unprocessedClientEvents.size() == 0) {
            logger.info("No unprocessed ClientEvents!");
            return;
        }

        ClientEvent currentEvent = (ClientEvent) unprocessedClientEvents.remove(0);

        if (logger.isInfoEnabled()) {
            StringBuffer sb = new StringBuffer();
            sb.append("Running ").append(getName());
            sb.append(" using ");
            sb.append(Thread.currentThread().getName());
            sb.append(" for ");

            synchronized (clientEvents) {
                if (clientEvents.size() > 1) {
                    sb.append(currentEvent + ", Current Events - " + clientEvents);
                } else {
                    sb.append(currentEvent);
                }
            }
            logger.info(sb.toString());
        }

        if (currentEvent == null) {
            threadEvent.set(null);
            return;
        } else {
            threadEvent.set(currentEvent);
        }

        try {
            if (socket == null)
                throw new SocketException("Socket was null!");
            prepareForRun();
            if (getThreadEvent() == ClientEvent.MAX_CON_BLOCKING)
                processMaxConnection(currentEvent);

            try {
                if (getThreadEvent() == ClientEvent.RUN_BLOCKING) {
                    clientEventHandler.gotConnected(this);

                    if (authorised == false) {
                        if (clientAuthenticationHandler == null && authenticator == null) {
                            authorised = true;
                        } else {
                            if (clientAuthenticationHandler != null) {
                                AuthStatus authStatus = null;
                                do {
                                    authStatus = processAuthorisation();
                                } while (authStatus == AuthStatus.FAILURE);

                                if (authStatus == AuthStatus.SUCCESS)
                                    authorised = true;
                            } else {
                                processAuthorisation();
                            }
                        }
                    }// end of authorised
                    processRead();
                }
            } catch (SocketException e) {
                logger.info("SocketException - Client [" + getHostAddress() + "]: " + e.getMessage());
                // e.printStackTrace();
                lost = true;
            } catch (AppException e) {
                // errors from Application
                logger.info("AppException " + Thread.currentThread().getName() + ": " + e.getMessage());
            } catch (javax.net.ssl.SSLException e) {
                lost = true;
                if (Assertion.isEnabled()) {
                    logger.info("SSLException - Client [" + getHostAddress() + "] " + Thread.currentThread().getName()
                            + ": " + e);
                } else {
                    logger.warn("SSLException - Client [" + getHostAddress() + "]: " + e);
                }
            } catch (ConnectionLostException e) {
                lost = true;
                if (e.getMessage() != null)
                    logger.info("Connection lost " + Thread.currentThread().getName() + ": " + e.getMessage());
                else
                    logger.info("Connection lost " + Thread.currentThread().getName());
            } catch (IOException e) {
                lost = true;
                logger.debug("IOError " + Thread.currentThread().getName() + ": " + e);
            } catch (AssertionError er) {
                logger.warn("[AssertionError] " + getName() + " " + er);
                if (logger.isInfoEnabled()) {
                    logger.info("StackTrace " + Thread.currentThread().getName() + ": " + MyString.getStackTrace(er));
                }
                assertionSystemExit();
            } catch (Error er) {
                logger.warn("[Error] " + er);
                if (logger.isInfoEnabled()) {
                    logger.info("StackTrace " + Thread.currentThread().getName() + ": " + MyString.getStackTrace(er));
                }
                if (Assertion.isEnabled()) {
                    assertionSystemExit();
                }
                lost = true;
            } catch (RuntimeException re) {
                logger.warn("[RuntimeException] " + MyString.getStackTrace(re));
                if (Assertion.isEnabled()) {
                    assertionSystemExit();
                }
                lost = true;
            }

            if (getThreadEvent() != ClientEvent.MAX_CON_BLOCKING) {
                notifyCloseOrLost();
            }

            if (connection) {
                logger.info(Thread.currentThread().getName() + " calling closeConnection()");
                closeConnection();
            }
        } catch (javax.net.ssl.SSLException se) {
            logger.warn("SSLException " + Thread.currentThread().getName() + " - " + se);
        } catch (IOException ie) {
            logger.warn("IOError " + Thread.currentThread().getName() + " - Closing Client : " + ie);
        } catch (RuntimeException re) {
            logger.warn("[RuntimeException] " + getName() + " " + Thread.currentThread().getName() + " - "
                    + MyString.getStackTrace(re));
            if (Assertion.isEnabled()) {
                assertionSystemExit();
            }
        } catch (Exception e) {
            logger.warn("Error " + Thread.currentThread().getName() + " - Event:" + getThreadEvent() + " - Socket:"
                    + socket + " : " + e);
            logger.debug("StackTrace: " + getName() + "\n" + MyString.getStackTrace(e));
            if (Assertion.isEnabled()) {
                assertionSystemExit();
            }
        } catch (Error e) {
            logger.warn("Error " + Thread.currentThread().getName() + " - Event:" + getThreadEvent() + " - Socket:"
                    + socket + " : " + e);
            logger.debug("StackTrace: " + getName() + "\n" + MyString.getStackTrace(e));
            if (Assertion.isEnabled()) {
                assertionSystemExit();
            }
        }

        synchronized (this) {
            try {
                if (socket != null && socket.isClosed() == false) {
                    logger.info("Closing Socket");
                    socket.close();
                }
            } catch (Exception re) {
                logger.warn("Error closing Socket/Channel: " + re);
            }
        }// end synchronized

        willClean = true;
        returnClientData();

        boolean returnClientHandler = false;
        synchronized (lockObj) {
            returnClientHandler = checkReturnClientHandler();
        }

        if (returnClientHandler) {
            returnClientHandler(); // return to pool
        }
    }

    /*
     * (非 Javadoc)
     *
     * @see org.quickserver.net.server.ClientHandler#sendClientBinary(byte[], int, int)
     */
    public void sendClientBinary(byte data[], int off, int len) throws IOException {
        if (isConnected()) {
            b_out.write(data, off, len);
            b_out.flush();
        } else {
            logger.warn("Client not connected.");
        }
    }

    /*
     * (非 Javadoc)
     *
     * @see org.quickserver.net.server.impl.BasicClientHandler#readInputStream()
     */
    protected byte[] readInputStream() throws IOException {
        if (logger.isDebugEnabled())
            logger.debug("begin read command from inputStream");
        byte[] head = new byte[8];
        byte[] b = null;
        in.read(head);
        int messageLength = Integer.parseInt(new String(ArrayUtils.subarray(head, 4, 8)), 16);
        if (logger.isDebugEnabled())
            logger.debug("received command length is:" + messageLength);
        b = new byte[messageLength + 8];
        in.read(b);
        if (messageLength == 4 && "HBHBB7BDB7BD".equals(new String(b))) {
            if (logger.isDebugEnabled())
                logger.debug("received ping command from mml client!");
            return readInputStream(in);
        } else
            return b;
    }

    /*
     * (非 Javadoc)
     *
     * @see org.quickserver.net.server.impl.BasicClientHandler#readInputStream(java.io.InputStream)
     */
    protected static byte[] readInputStream(InputStream _in) throws IOException {
        byte[] head = new byte[8];
        byte[] b = null;
        _in.read(head);
        int messageLength = Integer.parseInt(new String(ArrayUtils.subarray(head, 4, 8)), 16);
        if (logger.isDebugEnabled())
            logger.debug("received command length is:" + messageLength);
        b = new byte[messageLength + 8];
        _in.read(b);
        if (messageLength == 4 && "HBHBB7BDB7BD".equals(new String(b))) {
            if (logger.isDebugEnabled())
                logger.debug("received ping command from mml client!");
            return readInputStream(_in);
        } else
            return b;
    }

    /**
     * processRead
     *
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws AppException
     */
    private void processRead() throws IOException, ClassNotFoundException, AppException {
        if (logger.isDebugEnabled())
            logger.debug("begin processRead");
        AuthStatus authStatus = null;

        byte[] recByte = null; // 1.4
        while (connection) {
            try {
                recByte = this.readInputStream();
                if (recByte == null) {
                    lost = true;
                    break;
                }
                String command = new String(recByte);
                if (logger.isDebugEnabled())
                    logger.debug("recieved command is :" + command);
                clientCommandHandler.handleCommand(this, command);
                updateLastCommunicationTime();
                while (authStatus == AuthStatus.FAILURE)
                    authStatus = processAuthorisation();
                if (authStatus == AuthStatus.SUCCESS)
                    authorised = true;
            } catch (SocketTimeoutException e) {
                handleTimeout(e);
            }
        }
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -