⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpserverthread.java

📁 非常棒的java数据库
💻 JAVA
字号:
/*
 * Copyright 2004-2008 H2 Group. Licensed under the H2 License, Version 1.0
 * (http://h2database.com/html/license.html).
 * Initial Developer: H2 Group
 */
package org.h2.server;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.Socket;
import java.sql.SQLException;

import org.h2.command.Command;
import org.h2.constant.ErrorCode;
import org.h2.constant.SysProperties;
import org.h2.engine.ConnectionInfo;
import org.h2.engine.Constants;
import org.h2.engine.Engine;
import org.h2.engine.Session;
import org.h2.engine.SessionRemote;
import org.h2.expression.Parameter;
import org.h2.jdbc.JdbcSQLException;
import org.h2.message.Message;
import org.h2.result.LocalResult;
import org.h2.result.ResultColumn;
import org.h2.util.ObjectArray;
import org.h2.util.SmallMap;
import org.h2.value.Transfer;
import org.h2.value.Value;

/**
 * One server thread is opened per client connection.
 */
public class TcpServerThread implements Runnable {
    private TcpServer server;
    private Session session;
    private boolean stop;
    private Thread thread;
    private Transfer transfer;
    private Command commit;
    private SmallMap cache = new SmallMap(SysProperties.SERVER_CACHED_OBJECTS);
    private int id;

    public TcpServerThread(Socket socket, TcpServer server, int id) {
        this.server = server;
        this.id = id;
        transfer = new Transfer(null);
        transfer.setSocket(socket);
    }

    private void log(String s) {
        server.log(this + " " + s);
    }

    public void run() {
        try {
            transfer.init();
            log("Connect");
            // TODO server: should support a list of allowed databases and a
            // list of allowed clients
            try {
                int version = transfer.readInt();
                if (!server.allow(transfer.getSocket())) {
                    throw Message.getSQLException(ErrorCode.REMOTE_CONNECTION_NOT_ALLOWED);
                }
                if (version != Constants.TCP_DRIVER_VERSION) {
                    throw Message.getSQLException(ErrorCode.DRIVER_VERSION_ERROR_2, new String[] { "" + version,
                            "" + Constants.TCP_DRIVER_VERSION });
                }
                String db = transfer.readString();
                String originalURL = transfer.readString();
                String baseDir = server.getBaseDir();
                if (baseDir == null) {
                    baseDir = SysProperties.getBaseDir();
                }
                ConnectionInfo ci = new ConnectionInfo(db);
                if (baseDir != null) {
                    ci.setBaseDir(baseDir);
                }
                if (server.getIfExists()) {
                    ci.setProperty("IFEXISTS", "TRUE");
                }
                ci.setOriginalURL(originalURL);
                ci.setUserName(transfer.readString());
                ci.setUserPasswordHash(transfer.readBytes());
                ci.setFilePasswordHash(transfer.readBytes());
                int len = transfer.readInt();
                for (int i = 0; i < len; i++) {
                    ci.setProperty(transfer.readString(), transfer.readString());
                }
                Engine engine = Engine.getInstance();
                session = engine.getSession(ci);
                transfer.setSession(session);
                transfer.writeInt(SessionRemote.STATUS_OK).flush();
                server.addConnection(id, originalURL, ci.getUserName());
                log("Connected");
            } catch (Throwable e) {
                sendError(e);
                stop = true;
            }
            while (!stop) {
                try {
                    process();
                } catch (Throwable e) {
                    sendError(e);
                }
            }
            log("Disconnect");
        } catch (Throwable e) {
            server.logError(e);
        } finally {
            close();
        }
    }

    private void closeSession() {
        if (session != null) {
            try {
                Command rollback = session.prepareLocal("ROLLBACK");
                rollback.executeUpdate();
                session.close();
                server.removeConnection(id);
            } catch (Exception e) {
                server.logError(e);
            } finally {
                session = null;
            }
        }
    }

    public void close() {
        try {
            stop = true;
            closeSession();
            transfer.close();
            log("Close");
        } catch (Exception e) {
            server.logError(e);
        }
        server.remove(this);
    }

    private void sendError(Throwable e) {
        try {
            SQLException s = Message.convert(e);
            StringWriter writer = new StringWriter();
            e.printStackTrace(new PrintWriter(writer));
            String trace = writer.toString();
            String message;
            String sql;
            if (e instanceof JdbcSQLException) {
                JdbcSQLException j = (JdbcSQLException) e;
                message = j.getOriginalMessage();
                sql = j.getSQL();
            } else {
                message = e.getMessage();
                sql = null;
            }
            transfer.writeInt(SessionRemote.STATUS_ERROR).writeString(s.getSQLState()).writeString(message)
                    .writeString(sql).writeInt(s.getErrorCode()).writeString(trace).flush();
        } catch (IOException e2) {
            server.logError(e2);
            // if writing the error does not work, close the connection
            stop = true;
        }
    }

    private void setParameters(Command command) throws IOException, SQLException {
        int len = transfer.readInt();
        ObjectArray params = command.getParameters();
        for (int i = 0; i < len; i++) {
            Parameter p = (Parameter) params.get(i);
            p.setValue(transfer.readValue());
        }
    }

    private void process() throws IOException, SQLException {
        int operation = transfer.readInt();
        switch (operation) {
        case SessionRemote.SESSION_PREPARE: {
            int id = transfer.readInt();
            String sql = transfer.readString();
            Command command = session.prepareLocal(sql);
            boolean readonly = command.isReadOnly();
            cache.addObject(id, command);
            boolean isQuery = command.isQuery();
            int paramCount = command.getParameters().size();
            transfer.writeInt(SessionRemote.STATUS_OK).writeBoolean(isQuery).writeBoolean(readonly)
                    .writeInt(paramCount).flush();
            break;
        }
        case SessionRemote.SESSION_CLOSE: {
            closeSession();
            transfer.writeInt(SessionRemote.STATUS_OK).flush();
            close();
            break;
        }
        case SessionRemote.COMMAND_COMMIT: {
            if (commit == null) {
                commit = session.prepareLocal("COMMIT");
            }
            commit.executeUpdate();
            transfer.writeInt(SessionRemote.STATUS_OK).flush();
            break;
        }
        case SessionRemote.COMMAND_GET_META_DATA: {
            int id = transfer.readInt();
            int objectId = transfer.readInt();
            Command command = (Command) cache.getObject(id, false);
            LocalResult result = command.getMetaDataLocal();
            cache.addObject(objectId, result);
            int columnCount = result.getVisibleColumnCount();
            transfer.writeInt(SessionRemote.STATUS_OK).writeInt(columnCount).writeInt(0);
            for (int i = 0; i < columnCount; i++) {
                ResultColumn.writeColumn(transfer, result, i);
            }
            transfer.flush();
            break;
        }
        case SessionRemote.COMMAND_EXECUTE_QUERY: {
            int id = transfer.readInt();
            int objectId = transfer.readInt();
            int maxRows = transfer.readInt();
            int fetchSize = transfer.readInt();
            Command command = (Command) cache.getObject(id, false);
            setParameters(command);
            LocalResult result = command.executeQueryLocal(maxRows);
            cache.addObject(objectId, result);
            int columnCount = result.getVisibleColumnCount();
            transfer.writeInt(SessionRemote.STATUS_OK).writeInt(columnCount);
            int rowCount = result.getRowCount();
            transfer.writeInt(rowCount);
            for (int i = 0; i < columnCount; i++) {
                ResultColumn.writeColumn(transfer, result, i);
            }
            int fetch = Math.min(rowCount, fetchSize);
            for (int i = 0; i < fetch; i++) {
                sendRow(result);
            }
            transfer.flush();
            break;
        }
        case SessionRemote.COMMAND_EXECUTE_UPDATE: {
            int id = transfer.readInt();
            Command command = (Command) cache.getObject(id, false);
            setParameters(command);
            int updateCount = command.executeUpdate();
            int status = SessionRemote.STATUS_OK;
            if (session.isClosed()) {
                status = SessionRemote.STATUS_CLOSED;
            }
            transfer.writeInt(status).writeInt(updateCount).writeBoolean(session.getAutoCommit());
            transfer.flush();
            break;
        }
        case SessionRemote.COMMAND_CLOSE: {
            int id = transfer.readInt();
            Command command = (Command) cache.getObject(id, true);
            if (command != null) {
                command.close();
                cache.freeObject(id);
            }
            break;
        }
        case SessionRemote.RESULT_FETCH_ROWS: {
            int id = transfer.readInt();
            int count = transfer.readInt();
            LocalResult result = (LocalResult) cache.getObject(id, false);
            transfer.writeInt(SessionRemote.STATUS_OK);
            for (int i = 0; i < count; i++) {
                sendRow(result);
            }
            transfer.flush();
            break;
        }
        case SessionRemote.RESULT_RESET: {
            int id = transfer.readInt();
            LocalResult result = (LocalResult) cache.getObject(id, false);
            result.reset();
            break;
        }
        case SessionRemote.RESULT_CLOSE: {
            int id = transfer.readInt();
            LocalResult result = (LocalResult) cache.getObject(id, true);
            if (result != null) {
                result.close();
                cache.freeObject(id);
            }
            break;
        }
        case SessionRemote.CHANGE_ID: {
            int oldId = transfer.readInt();
            int newId = transfer.readInt();
            Object obj = cache.getObject(oldId, false);
            cache.freeObject(oldId);
            cache.addObject(newId, obj);
            break;
        }
        default:
            server.logInternalError("Unknown operation: " + operation);
            server.log("Unknown operation: " + operation);
            closeSession();
            close();
        }
    }

    private void sendRow(LocalResult result) throws IOException, SQLException {
        if (result.next()) {
            transfer.writeBoolean(true);
            Value[] v = result.currentRow();
            for (int i = 0; i < result.getVisibleColumnCount(); i++) {
                transfer.writeValue(v[i]);
            }
        } else {
            transfer.writeBoolean(false);
        }
    }

    public void setThread(Thread thread) {
        this.thread = thread;
    }

    public Thread getThread() {
        return thread;
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -