filebacking.java
来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,170 行 · 第 1/2 页
JAVA
1,170 行
* * @param id the object's unique id. * @param is the input stream to the serialized object * @param length the length object the serialized object * @param expireInterval how long the object lives w/o objectAccess */ public void store(HashKey id, HashKey storeId, InputStream is, int length, byte []dataHash, byte []oldDataHash, long expireInterval, int primary, int secondary, int tertiary) { ClusterConnection conn = null; try { conn = getConnection(); // The binary stream can be reused because it won't actually be // read on a failure if (oldDataHash != null) { // If the old hash exists, then we should just be updating to // a matching old-hash. // Try to update with lock first, then update, and insert if fail. if (storeUpdateLock(conn, id, is, length, dataHash, oldDataHash)) { if (log.isLoggable(Level.FINE)) log.fine(this + " save(update) " + id + " (length=" + length + ")"); return; } log.warning(this + " store validation failed, possible lock/data" + " loss detected (id=" + id + ",hash=" + new HashKey(oldDataHash) + ")"); if (storeUpdateOverride(conn, id, is, length, dataHash)) { if (log.isLoggable(Level.FINE)) { log.fine(this + " save(update-override) " + id + " (length=" + length + ")"); } return; } else if (storeInsert(conn, id, storeId, is, length, dataHash, expireInterval, primary, secondary, tertiary)) { if (log.isLoggable(Level.FINE)) { log.fine(this + " save(insert-override) " + id + " (length=" + length + ")"); } return; } else { log.warning(this + " store update failed, probable data loss" + " for id=" + id); } } else { // if the old-hash is null, then we're inserting or // replacing an invalidated session if (storeUpdateOverride(conn, id, is, length, dataHash)) { if (log.isLoggable(Level.FINE)) { log.fine(this + " save(update-override) " + id + " (length=" + length + ")"); } return; } else if (storeInsert(conn, id, storeId, is, length, dataHash, expireInterval, primary, secondary, tertiary)) { if (log.isLoggable(Level.FINE)) { log.fine(this + " save(insert) " + id + " (length=" + length + ",hash=" + new HashKey(dataHash) + ")"); } return; } else { log.warning(this + " store insert failed, probable data loss" + " for id=" + id); } } } catch (SQLException e) { log.log(Level.FINE, e.toString(), e); } finally { if (conn != null) conn.close(); } } /** * Stores the cluster object on the local objectStore using an updateImpl query. * * @param conn the database connection * @param id the object's unique id. * @param is the input stream to the serialized object * @param length the length object the serialized object * @param oldHash the hash of the old contents (for optimistic locking) * @param newHash the hash of the new contents */ private boolean storeUpdateLock(ClusterConnection conn, HashKey id, InputStream is, int length, byte []newHash, byte []oldHash) { try { PreparedStatement stmt = conn.prepareUpdateSaveLock(); stmt.setBytes(1, newHash); stmt.setBinaryStream(2, is, length); long now = Alarm.getCurrentTime(); stmt.setLong(3, now); stmt.setInt(4, _version); stmt.setBytes(5, id.getHash()); stmt.setBytes(6, oldHash); int count = stmt.executeUpdate(); if (count > 0) return true; } catch (SQLException e) { log.log(Level.WARNING, e.toString(), e); } return false; } /** * Stores the cluster object on the local objectStore using an updateImpl query. * * @param conn the database connection * @param id the object's unique id. * @param is the input stream to the serialized object * @param length the length object the serialized object * @param oldHash the hash of the old contents (for optimistic locking) * @param newHash the hash of the new contents */ private boolean storeUpdateOverride(ClusterConnection conn, HashKey id, InputStream is, int length, byte []newHash) { try { PreparedStatement stmt = conn.prepareUpdateSaveOverride(); stmt.setBytes(1, newHash); stmt.setBinaryStream(2, is, length); long now = Alarm.getCurrentTime(); stmt.setLong(3, now); stmt.setInt(4, _version); stmt.setBytes(5, id.getHash()); int count = stmt.executeUpdate(); if (count > 0) return true; } catch (SQLException e) { log.log(Level.WARNING, e.toString(), e); } return false; } private boolean storeInsert(ClusterConnection conn, HashKey id, HashKey storeId, InputStream is, int length, byte []dataHash, long expireInterval, int primary, int secondary, int tertiary) { try { PreparedStatement stmt = conn.prepareInsertSave(); stmt.setBytes(1, id.getHash()); stmt.setBytes(2, storeId.getHash()); if (dataHash == null) throw new NullPointerException(); stmt.setBytes(3, dataHash); stmt.setBinaryStream(4, is, length); long now = Alarm.getCurrentTime(); stmt.setLong(5, now); stmt.setLong(6, expireInterval); stmt.setInt(7, primary); stmt.setInt(8, secondary); stmt.setInt(9, tertiary); stmt.setInt(10, _version); stmt.executeUpdate(); return true; } catch (SQLException e) { log.log(Level.FINE, e.toString(), e); } return false; } /** * Stores the cluster object on the local * objectStore using an updateImpl query. * * @param conn the database connection * @param id the object's unique id. * @param is the input stream to the serialized object * @param length the length object the serialized object */ public boolean storeData(HashKey id, byte []dataHash, InputStream is, int length) { ClusterConnection conn = null; try { conn = getConnection(); // XXX: locking? PreparedStatement stmt = conn.prepareUpdateSaveOverride(); stmt.setBytes(1, dataHash); stmt.setBinaryStream(2, is, length); long now = Alarm.getCurrentTime(); stmt.setLong(3, now); stmt.setInt(4, _version); stmt.setBytes(5, id.getHash()); int count = stmt.executeUpdate(); if (count > 0) { if (log.isLoggable(Level.FINE)) log.fine(this + " save-data(update) " + id + " length:" + length); return true; } } catch (SQLException e) { log.log(Level.WARNING, e.toString(), e); } finally { if (conn != null) conn.close(); } return false; } /** * Updates the metadata from a cluster peer. * * @param id the object's unique id. * @param expireInterval how long the object lives w/o objectAccess */ public boolean updateMetadata(HashKey id, HashKey storeId, byte []dataHash, long expireInterval, int primary, int secondary, int tertiary, boolean isDead) { ClusterConnection conn = null; try { conn = getConnection(); // If the metadata matches, just update the version PreparedStatement stmt = conn.prepareUpdateMetadataVersion(); long now = Alarm.getCurrentTime(); stmt.setLong(1, now); stmt.setInt(2, _version); stmt.setBytes(3, id.getHash()); stmt.setBytes(4, dataHash); if (stmt.executeUpdate() > 0) return true; } catch (SQLException e) { log.log(Level.FINE, e.toString(), e); } finally { if (conn != null) conn.close(); } try { conn = getConnection(); // If the metadata mismatches, mark as invalid and return false // so the data can be fetched PreparedStatement stmt = conn.prepareUpdateMetadata(); stmt.setBytes(1, id.getHash()); stmt.setBytes(2, dataHash); if (stmt.executeUpdate() > 0) return false; } catch (SQLException e) { log.log(Level.FINE, e.toString(), e); } finally { if (conn != null) conn.close(); } try { conn = getConnection(); // If no record exists, add one PreparedStatement stmt = conn.prepareInsertMetadata(); stmt.setBytes(1, id.getHash()); stmt.setBytes(2, storeId.getHash()); stmt.setBoolean(3, isDead); stmt.setBytes(4, dataHash); stmt.setLong(5, Alarm.getCurrentTime()); stmt.setLong(6, expireInterval); stmt.setInt(7, primary); stmt.setInt(8, secondary); stmt.setInt(9, tertiary); stmt.setInt(10, _version); if (stmt.executeUpdate() > 0) return false; } catch (SQLException e) { log.log(Level.FINE, e.toString(), e); } finally { if (conn != null) conn.close(); } return false; } // // statistics // public long getObjectCount() throws SQLException { ClusterConnection conn = getConnection(); try { PreparedStatement stmt = conn.prepareCount(); ResultSet rs = stmt.executeQuery(); if (rs != null && rs.next()) { long value = rs.getLong(1); rs.close(); return value; } return -1; } catch (SQLException e) { log.log(Level.FINE, e.toString(), e); } finally { conn.close(); } return -1; } public void destroy() { _dataSource = null; _freeConn = null; } private ClusterConnection getConnection() throws SQLException { ClusterConnection cConn = _freeConn.allocate(); if (cConn == null) { Connection conn = _dataSource.getConnection(); cConn = new ClusterConnection(conn); } return cConn; } public String serverNameToTableName(String serverName) { if (serverName == null) return "srun"; StringBuilder cb = new StringBuilder(); cb.append("srun_"); for (int i = 0; i < serverName.length(); i++) { char ch = serverName.charAt(i); if ('a' <= ch && ch <= 'z') { cb.append(ch); } else if ('A' <= ch && ch <= 'Z') { cb.append(ch); } else if ('0' <= ch && ch <= '9') { cb.append(ch); } else if (ch == '_') { cb.append(ch); } else cb.append('_'); } return cb.toString(); } @Override public String toString() { return getClass().getSimpleName() + "[" + _tableName + "]"; } class ClusterConnection { private Connection _conn; private PreparedStatement _accessStatement; private PreparedStatement _countStatement; private PreparedStatement _loadIfVersionStatement; private PreparedStatement _loadStatement; private PreparedStatement _insertSaveStatement; private PreparedStatement _insertMetadataStatement; private PreparedStatement _removeStatement; private PreparedStatement _setExpiresStatement; private PreparedStatement _timeoutStatement; private PreparedStatement _updateMetadataStatement; private PreparedStatement _updateMetadataVersionStatement; private PreparedStatement _updateSaveLockStatement; private PreparedStatement _updateSaveOverrideStatement; ClusterConnection(Connection conn) { _conn = conn; } PreparedStatement prepareAccess() throws SQLException { if (_accessStatement == null) _accessStatement = _conn.prepareStatement(_accessQuery); return _accessStatement; } PreparedStatement prepareCount() throws SQLException { if (_countStatement == null) _countStatement = _conn.prepareStatement(_countQuery); return _countStatement; } PreparedStatement prepareLoad() throws SQLException { if (_loadStatement == null) _loadStatement = _conn.prepareStatement(_loadQuery); return _loadStatement; } PreparedStatement prepareLoadIfVersion() throws SQLException { if (_loadIfVersionStatement == null) _loadIfVersionStatement = _conn.prepareStatement(_loadIfVersionQuery); return _loadIfVersionStatement; } PreparedStatement prepareInsertMetadata() throws SQLException { if (_insertMetadataStatement == null) { _insertMetadataStatement = _conn.prepareStatement(_insertMetadataQuery); } return _insertMetadataStatement; } PreparedStatement prepareInsertSave() throws SQLException { if (_insertSaveStatement == null) _insertSaveStatement = _conn.prepareStatement(_insertSaveQuery); return _insertSaveStatement; } PreparedStatement prepareRemove() throws SQLException { if (_removeStatement == null) _removeStatement = _conn.prepareStatement(_removeQuery); return _removeStatement; } PreparedStatement prepareSetExpireInterval() throws SQLException { if (_setExpiresStatement == null) _setExpiresStatement = _conn.prepareStatement(_setExpiresQuery); return _setExpiresStatement; } PreparedStatement prepareTimeout() throws SQLException { if (_timeoutStatement == null) _timeoutStatement = _conn.prepareStatement(_timeoutQuery); return _timeoutStatement; } PreparedStatement prepareUpdateMetadata() throws SQLException { if (_updateMetadataStatement == null) { _updateMetadataStatement = _conn.prepareStatement(_updateMetadataQuery); } return _updateMetadataStatement; } PreparedStatement prepareUpdateMetadataVersion() throws SQLException { if (_updateMetadataVersionStatement == null) { _updateMetadataVersionStatement = _conn.prepareStatement(_updateMetadataVersionQuery); } return _updateMetadataVersionStatement; } PreparedStatement prepareUpdateSaveLock() throws SQLException { if (_updateSaveLockStatement == null) _updateSaveLockStatement = _conn.prepareStatement(_updateSaveLockQuery); return _updateSaveLockStatement; } PreparedStatement prepareUpdateSaveOverride() throws SQLException { if (_updateSaveOverrideStatement == null) { _updateSaveOverrideStatement = _conn.prepareStatement(_updateSaveOverrideQuery); } return _updateSaveOverrideStatement; } void close() { if (_freeConn != null) _freeConn.free(this); } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?