📄 monetsocketblockmode.java
字号:
/* * The contents of this file are subject to the MonetDB Public License * Version 1.1 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the * License for the specific language governing rights and limitations * under the License. * * The Original Code is the MonetDB Database System. * * The Initial Developer of the Original Code is CWI. * Portions created by CWI are Copyright (C) 1997-2007 CWI. * All Rights Reserved. */package nl.cwi.monetdb.jdbc;import java.io.*;import java.nio.*;import java.net.*;/** * A Socket for communicating with the MonetDB database in block mode. * <br /><br /> * This MonetSocket performs basic operations like sending the server a * message and/or receiving a line from it. A small interpretation of * all what is read is done, to supply some basic tools for the using * classes.<br /> * For each read line, it is determined what type of line it is * according to the MonetDB MAPI protocol.i This results in a line to * be PROMPT, HEADER, RESULT, ERROR or UNKNOWN.i Use the getLineType() * function to retrieve the type of the last line read. * <br /><br /> * For debugging purposes a socket level debugging is implemented where * each and every interaction to and from the MonetDB server is logged * to a file on disk.<br /> * Incoming messages are prefixed by "RX" (received by the driver), * outgoing messages by "TX" (transmitted by the driver). Special * decoded non-human readable messages are prefixed with "RD" and "TD" * instead. Following this two char prefix, a timestamp follows as * the number of milliseconds since the UNIX epoch. The rest of the * line is a String representation of the data sent or received. * <br /><br /> * This implementation of MonetSocket uses block mode on the mapi * protocol. It allows sending a multi line query as data is sent in * 'blocks' that are prefixed with a two byte integer indicating its * size. The least significant bit of this integer represents the last * block in a sequence. * * @author Fabian Groffen <Fabian.Groffen@cwi.nl> * @version 2.10 */final class MonetSocketBlockMode { /** Stream from the Socket for reading */ private InputStream fromMonetRaw; /** Stream from the Socket for writing */ private OutputStream toMonetRaw; /** The TCP Socket to Mserver */ private Socket con; /** Whether we are debugging or not */ private boolean debug = false; /** The Writer for the debug log-file */ private FileWriter log; /** The type of the last line read */ private int lineType; /** "there is currently no line", or the the type is unknown is represented by UNKNOWN */ final static int UNKNOWN = 0; /** a line starting with ! indicates ERROR */ final static int ERROR = 1; /** a line starting with % indicates HEADER */ final static int HEADER = 2; /** a line starting with [ indicates RESULT */ final static int RESULT = 3; /** a line which matches the pattern of prompt1 is a PROMPT1 */ final static int PROMPT1 = 4; /** a line which matches the pattern of prompt2 is a PROMPT2 */ final static int PROMPT2 = 5; /** a line starting with & indicates the start of a header block */ final static int SOHEADER = 6; /** a line starting with ^ indicates REDIRECT */ final static int REDIRECT = 7; /** a line starting with # indicates INFO */ final static int INFO = 8; /** The blocksize (hardcoded in compliance with stream.mx) */ final static int BLOCK = 8 * 1024 - 2; /** A buffer which holds the blocks read */ private StringBuffer readBuffer; /** The number of available bytes to read */ private short readState = 0; /** A short in two bytes for holding the block size in bytes */ private byte[] blklen = new byte[2]; MonetSocketBlockMode(String host, int port) throws IOException { con = new Socket(host, port); // set nodelay, as it greatly speeds up small messages (like we // often do) con.setTcpNoDelay(true); // note: Always use buffered streams, as they perform better, // even though you know exactly which blocks you have to fetch // from the stream. They are probably able to prefetch so the // IO is blocking while the program is still doing something // else. fromMonetRaw = new BufferedInputStream(con.getInputStream(), BLOCK + 2); toMonetRaw = new BufferedOutputStream(con.getOutputStream(), BLOCK + 2); readBuffer = new StringBuffer(); } /** * Enables logging to a file what is read and written from and to * MonetDB. * * @param filename the name of the file to write to * @throws IOException if the file could not be opened for writing */ public void debug(String filename) throws IOException { log = new FileWriter(filename); debug = true; } /** * write puts the given string on the stream as UTF-8 data. The * stream will not be flushed after the write. To flush the stream * use flush(), or use writeLine(). * * @param data the data to write to the stream * @throws IOException if writing to the stream failed * @see #flush() * @see #writeLine(String data) */ public void write(String data) throws IOException { write(data.getBytes("UTF-8")); } /** * Writes the given bytes to the stream * * @param data the bytes to be written * @throws IOException if writing to the stream failed */ public void write(byte[] data) throws IOException { synchronized (toMonetRaw) { toMonetRaw.write(data); // it's a bit nasty if an exception is thrown from the log, // but ignoring it can be nasty as well, so it is decided to // let it go so there is feedback about something going wrong // it's a bit nasty if an exception is thrown from the log, // but ignoring it can be nasty as well, so it is decided to // let it go so there is feedback about something going wrong if (debug) logTx(new String(data, "UTF-8")); // reset the lineType variable, since we've sent data now and // the last line isn't valid anymore lineType = UNKNOWN; } } /** * flushes the stream to monet, forcing all data in the buffer to be * actually written to the stream. * * @throws IOException if writing to the stream failed */ public void flush() throws IOException { synchronized (toMonetRaw) { toMonetRaw.flush(); // it's a bit nasty if an exception is thrown from the log, // but ignoring it can be nasty as well, so it is decided to // let it go so there is feedback about something going wrong // it's a bit nasty if an exception is thrown from the log, // but ignoring it can be nasty as well, so it is decided to // let it go so there is feedback about something going wrong if (debug) { log.flush(); } } } /** * writeLine puts the given string on the stream and flushes the * stream afterwards so the data will actually be sent. The given * data String is wrapped within the query template. * * @param templ the query template to apply * @param data the data to write to the stream * @throws IOException if writing to the stream failed */ public void writeLine(String[] templ, String data) throws IOException { synchronized (toMonetRaw) { // In the same way as we read chunks from the socket, we // write chunks to the socket, so the server can start // processing while sending the rest of the input. data = (templ[0] != null ? templ[0] : "") + data + (templ[1] != null ? templ[1] : ""); byte[] bytes = data.getBytes("UTF-8"); int len = bytes.length; int todo = len; short blocksize; while (todo > 0) { if (todo <= BLOCK) { // always fits, because of BLOCK's size blocksize = (short)todo; // this is the last block, so encode least // significant bit in the first byte (little-endian) blklen[0] = (byte)(blocksize << 1 & 0xFF | 1); blklen[1] = (byte)(blocksize >> 7); } else { // always fits, because of BLOCK's size blocksize = (short)BLOCK; // another block will follow, encode least // significant bit in the first byte (little-endian) blklen[0] = (byte)(blocksize << 1 & 0xFF); blklen[1] = (byte)(blocksize >> 7); } toMonetRaw.write(blklen); // write the actual block toMonetRaw.write(bytes, len - todo, blocksize); if (debug) { if (todo <= BLOCK) { logTd("write final block: " + blocksize + " bytes"); } else { logTd("write block: " + blocksize + " bytes"); } logTx(new String(bytes, len - todo, blocksize, "UTF-8")); } todo -= blocksize; } // flush the stream flush(); if (debug) log.flush(); // reset the lineType variable, since we've sent data now and // the last line isn't valid anymore lineType = UNKNOWN; } } /** * Reads up to count bytes from the stream, and returns them in a * byte array * * @param data a byte array, which should be filled with data from * the stream * @return the number of bytes actually read, never less than zero * @throws IOException if some IO error occurs */ public int read(byte[] data) throws IOException { synchronized (fromMonetRaw) { // read the data int size = fromMonetRaw.read(data); if (size == -1) throw new IOException("End of stream reached"); // it's a bit nasty if an exception is thrown from the log, // but ignoring it can be nasty as well, so it is decided to // let it go so there is feedback about something going wrong if (debug) logRx((new String(data, "UTF-8")).substring(0, size)); return(size); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -