📄 monetconnection.java
字号:
* @return the exact row read as requested or null if the requested row * is out of the scope of the result set * @throws SQLException if an database error occurs */ String getLine(int row) throws SQLException { if (row >= tuplecount || row < 0) return null; int block = (row - blockOffset) / cacheSize; int blockLine = (row - blockOffset) % cacheSize; // do we have the right block loaded? (optimistic try) DataBlockResponse rawr; // load block if appropriate if ((rawr = resultBlocks[block]) == null) { /// TODO: ponder about a maximum number of blocks to keep /// in memory when dealing with random access to /// reduce memory blow-up // if we're running forward only, we can discard the old // block loaded if (parent.rstype == ResultSet.TYPE_FORWARD_ONLY) { for (int i = 0; i < block; i++) resultBlocks[i] = null; if (MonetConnection.seqCounter - 1 == seqnr) { // there has no query been issued after this // one, so we can consider this a uninterrupted // continuation request. Let's increase the // blocksize if it was not explicitly set, // as the chances are high that we won't bother // anyone else by doing so, and just gaining // some performance. if (!cacheSizeSetExplicitly) { // store the previous position in the // blockOffset variable blockOffset += cacheSize; // increase the cache size (a lot) cacheSize *= 10; // Java string are UTF-16, right? long free = Runtime.getRuntime().freeMemory() / 2; // we run the risk that we kill ourselves // here, but maybe that's better to continue // as long as possible, than asking too much // too soon if (cacheSize > free) cacheSize = (int)free; // it must fit // by changing the cacheSize, we also // change the block measures. Luckily // we don't care about previous blocks // because we have a forward running // pointer only. However, we do have // to recalculate the block number, to // ensure the next call to find this // new block. block = (row - blockOffset) / cacheSize; blockLine = (row - blockOffset) % cacheSize; } } } // ok, need to fetch cache block first parent.executeQuery( commandTempl, "export " + id + " " + ((block * cacheSize) + blockOffset) + " " + cacheSize ); rawr = resultBlocks[block]; if (rawr == null) throw new AssertionError("block " + block + " should have been fetched by now :("); } return(rawr.getRow(blockLine)); } /** * Closes this Response by sending an Xclose to the server indicating * that the result can be closed at the server side as well. */ public void close() { if (closed) return; // send command to server indicating we're done with this // result only if we had an ID in the header and this result // was larger than the reply size try { if (destroyOnClose) sendControlCommand("close " + id); } catch (SQLException e) { // probably a connection error... } // close the data block associated with us for (int i = 1; i < resultBlocks.length; i++) { DataBlockResponse r = resultBlocks[i]; if (r != null) r.close(); } closed = true; } /** * Returns whether this Response is closed * * @return whether this Response is closed */ boolean isClosed() { return(closed); } protected void finalize() throws Throwable { close(); super.finalize(); } } // }}} /** * The DataBlockResponse is tabular data belonging to a * ResultSetResponse. Tabular data from the server typically looks * like: * <pre> * [ "value", 56 ] * </pre> * where each column is separated by ",\t" and each tuple surrounded * by brackets ("[" and "]"). A DataBlockResponse object holds the * raw data as read from the server, in a parsed manner, ready for * easy retrieval. * <br /><br /> * This object is not intended to be queried by multiple threads * synchronously. It is designed to work for one thread retrieving * rows from it. When multiple threads will retrieve rows from this * object, it is possible for threads to get the same data. */ // {{{ DataBlockResponse class implementation class DataBlockResponse implements Response { /** The String array to keep the data in */ private String[] data; /** The counter which keeps the current position in the data array */ private int pos; /** Whether we can discard lines as soon as we have read them */ private boolean forwardOnly; /** * Constructs a DataBlockResponse object * @param size the size of the data array to create * @param forward whether this is a forward only result */ DataBlockResponse(int size, boolean forward) { pos = -1; data = new String[size]; forwardOnly = forward; } /** * addLine adds a String of data to this object's data array. * Note that an IndexOutOfBoundsException can be thrown when an * attempt is made to add more than the original construction size * specified. * * @param line the header line as String * @param linetype the line type according to the MAPI protocol * @return a non-null String if the line is invalid, * or additional lines are not allowed. */ public String addLine(String line, int linetype) { if (linetype != MonetSocketBlockMode.RESULT) return("protocol violation: unexpected line in data block: " + line); // add to the backing array data[++pos] = line; // all is well return(null); } /** * Returns whether this Reponse expects more lines to be added * to it. * * @return true if a next line should be added, false otherwise */ public boolean wantsMore() { // remember: pos is the value already stored return(pos + 1 < data.length); } /** * Indicates that no more header lines will be added to this * Response implementation. In most cases this is a redundant * operation because the data array is full. However... it can * happen that this is NOT the case! * * @throws SQLException if not all rows are filled */ public void complete() throws SQLException { if ((pos + 1) != data.length) throw new SQLException("Inconsistent state detected! Current block capacity: " + data.length + ", block usage: " + (pos + 1) + ". Did MonetDB send what it promised to?"); } /** * Instructs the Response implementation to close and do the * necessary clean up procedures. * * @throws SQLException */ public void close() { // feed all rows to the garbage collector for (int i = 0; i < data.length; i++) data[i] = null; } /** * Retrieves the required row. Warning: if the requested rows * is out of bounds, an IndexOutOfBoundsException will be * thrown. * * @param line the row to retrieve * @return the requested row as String */ String getRow(int line) { if (forwardOnly) { String ret = data[line]; data[line] = null; return(ret); } else { return(data[line]); } } } // }}} /** * The AffectedRowsResponse represents an update or schema message. * It keeps an additional count field that represents the affected * rows for update statements, and a success flag (negative numbers, * actually) for schema messages.<br /> * <tt>&2 af</tt> */ // {{{ AffectedRowsResponse class implementation class AffectedRowsResponse implements Response { public final int count; public AffectedRowsResponse(int cnt) { // fill the blank final this.count = cnt; } public String addLine(String line, int linetype) { return("Header lines are not supported for an AffectedRowsResponse"); } public boolean wantsMore() { return(false); } public void complete() { // we're empty, because we don't need to check anything... } public void close() { // nothing to do here... } } // }}} /** * The AutoCommitResponse represents a transaction message. It * stores (a change in) the server side auto commit mode.<br /> * <tt>&3 (t|f)</tt> */ // {{{ AutoCommitResponse class implementation class AutoCommitResponse extends AffectedRowsResponse { public final boolean autocommit; public AutoCommitResponse(boolean ac) { super(Statement.SUCCESS_NO_INFO); // fill the blank final this.autocommit = ac; } } // }}} /** * A list of Response objects. Responses are added to this list. * Methods of this class are not synchronized. This is left as * responsibility to the caller to prevent concurrent access. */ // {{{ ResponseList class implementation class ResponseList { /** The cache size (number of rows in a DataBlockResponse object) */ final int cachesize; /** The maximum number of results for this query */ final int maxrows; /** The ResultSet type to produce */ final int rstype; /** The ResultSet concurrency to produce */ final int rsconcur; /** The sequence number of this ResponseList */ final int seqnr; /** A list of the Responses associated with the query, * in the right order */ private List responses; /** A map of ResultSetResponses, used for additional * DataBlockResponse mapping */ private Map rsresponses; /** The current header returned by getNextResponse() */ private int curResponse; /** * Main constructor. The query argument can either be a String * or List. An SQLException is thrown if another object * instance is supplied. * * @param cachesize overall cachesize to use * @param maxrows maximum number of rows to allow in the set * @param rstype the type of result sets to produce * @param rsconcur the concurrency of result sets to produce */ ResponseList( int cachesize, int maxrows, int rstype, int rsconcur ) throws SQLException { this.cachesize = cachesize; this.maxrows = maxrows; this.rstype = rstype; this.rsconcur = rsconcur; responses = new ArrayList(); curResponse = -1; seqnr = MonetConnection.seqCounter++; } /** * Retrieves the next available response, or null if there are * no more responses. * * @return the next Response available or null */ Response getNextResponse() throws SQLException { curResponse++; if (curResponse >= responses.size()) { // ResponseList is obviously completed so, there are no // more responses return(null); } else { // return this response return((Response)(responses.get(curResponse))); } } /** * Closes the Reponse at index i, if not null. * * @param i the index position of the header to close */ void closeResponse(int i) { if (i < 0 || i >= responses.size()) return; Response tmp = (Response)(responses.get(i)); if (tmp != null) tmp.close(); } /** * Closes the current response. */ void closeCurrentResponse() { closeResponse(curResponse); } /** * Closes the current and previous responses. */ void closeCurOldResponses() { for (int i = curResponse; i >= 0; i--) { closeResponse(i); } } /** * Closes this ResponseList by closing all the Responses in this * ResponseList. */ void close() { for (int i = 0; i < responses.size(); i++) { closeResponse(i); } } protected void finalize() throws Throwable { close(); super.finalize(); } /** * Executes the query contained in this ResponseList, and * stores the Responses resulting from this query in this * ResponseList. * * @throws SQLException if a database error occurs */ void processQuery(String query) throws SQLException { executeQuery(queryTempl, query); } /** * Internal executor of queries. * * @param templ the template to fill in * @param the query to execute * @throws SQLException if a database error occurs */ void executeQuery(String
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -