filebacking.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,170 行 · 第 1/2 页

JAVA
1,170
字号
   *   * @param id the object's unique id.   * @param is the input stream to the serialized object   * @param length the length object the serialized object   * @param expireInterval how long the object lives w/o objectAccess   */  public void store(HashKey id, HashKey storeId,		    InputStream is, int length,		    byte []dataHash, byte []oldDataHash,		    long expireInterval,		    int primary, int secondary, int tertiary)  {    ClusterConnection conn = null;    try {      conn = getConnection();            // The binary stream can be reused because it won't actually be      // read on a failure      if (oldDataHash != null) {	// If the old hash exists, then we should just be updating to	// a matching old-hash.		// Try to update with lock first, then update, and insert if fail.	if (storeUpdateLock(conn, id, is, length, dataHash, oldDataHash)) {	  if (log.isLoggable(Level.FINE))	    log.fine(this + " save(update) " + id + " (length=" + length + ")");	  return;	}	log.warning(this + " store validation failed, possible lock/data"		    + " loss detected (id=" + id		    + ",hash=" + new HashKey(oldDataHash) + ")");	if (storeUpdateOverride(conn, id, is, length, dataHash)) {	  if (log.isLoggable(Level.FINE)) {	    log.fine(this + " save(update-override) "		     + id + " (length=" + length + ")");	  }	  	  return;	}	else if (storeInsert(conn, id, storeId, is, length, dataHash,			     expireInterval, primary, secondary, tertiary)) {	  if (log.isLoggable(Level.FINE)) {	    log.fine(this + " save(insert-override) "		     + id + " (length=" + length + ")");	  }	  	  return;	}	else {	  log.warning(this + " store update failed, probable data loss"		      + " for id=" + id);	}      }      else {	// if the old-hash is null, then we're inserting or	// replacing an invalidated session		if (storeUpdateOverride(conn, id, is, length, dataHash)) {	  if (log.isLoggable(Level.FINE)) {	    log.fine(this + " save(update-override) "		     + id + " (length=" + length + ")");	  }	  	  return;	}	else if (storeInsert(conn, id, storeId, is, length, dataHash,			     expireInterval, primary, secondary, tertiary)) {	  if (log.isLoggable(Level.FINE)) {	    log.fine(this + " save(insert) "		     + id + " (length=" + length		     + ",hash=" + new HashKey(dataHash) + ")");	  }	  return;	}	else {	  log.warning(this + " store insert failed, probable data loss"		      + " for id=" + id);	}      }    } catch (SQLException e) {      log.log(Level.FINE, e.toString(), e);    } finally {      if (conn != null)	conn.close();    }  }    /**   * Stores the cluster object on the local objectStore using an updateImpl query.   *   * @param conn the database connection   * @param id the object's unique id.   * @param is the input stream to the serialized object   * @param length the length object the serialized object   * @param oldHash the hash of the old contents (for optimistic locking)   * @param newHash the hash of the new contents   */  private boolean storeUpdateLock(ClusterConnection conn, HashKey id,				  InputStream is, int length,				  byte []newHash, byte []oldHash)  {    try {      PreparedStatement stmt = conn.prepareUpdateSaveLock();      stmt.setBytes(1, newHash);      stmt.setBinaryStream(2, is, length);      long now = Alarm.getCurrentTime();      stmt.setLong(3, now);      stmt.setInt(4, _version);            stmt.setBytes(5, id.getHash());      stmt.setBytes(6, oldHash);      int count = stmt.executeUpdate();              if (count > 0)	return true;    } catch (SQLException e) {      log.log(Level.WARNING, e.toString(), e);    }    return false;  }    /**   * Stores the cluster object on the local objectStore using an updateImpl query.   *   * @param conn the database connection   * @param id the object's unique id.   * @param is the input stream to the serialized object   * @param length the length object the serialized object   * @param oldHash the hash of the old contents (for optimistic locking)   * @param newHash the hash of the new contents   */  private boolean storeUpdateOverride(ClusterConnection conn, HashKey id,				      InputStream is, int length,				      byte []newHash)  {    try {      PreparedStatement stmt = conn.prepareUpdateSaveOverride();      stmt.setBytes(1, newHash);      stmt.setBinaryStream(2, is, length);      long now = Alarm.getCurrentTime();      stmt.setLong(3, now);      stmt.setInt(4, _version);            stmt.setBytes(5, id.getHash());      int count = stmt.executeUpdate();              if (count > 0)	return true;    } catch (SQLException e) {      log.log(Level.WARNING, e.toString(), e);    }    return false;  }    private boolean storeInsert(ClusterConnection conn, 			      HashKey id, HashKey storeId,			      InputStream is, int length, byte []dataHash,			      long expireInterval,			      int primary, int secondary, int tertiary)  {    try {      PreparedStatement stmt = conn.prepareInsertSave();              stmt.setBytes(1, id.getHash());      stmt.setBytes(2, storeId.getHash());      if (dataHash == null)	throw new NullPointerException();            stmt.setBytes(3, dataHash);      stmt.setBinaryStream(4, is, length);            long now = Alarm.getCurrentTime();            stmt.setLong(5, now);      stmt.setLong(6, expireInterval);      stmt.setInt(7, primary);      stmt.setInt(8, secondary);      stmt.setInt(9, tertiary);            stmt.setInt(10, _version);      stmt.executeUpdate();      return true;    } catch (SQLException e) {      log.log(Level.FINE, e.toString(), e);    }    return false;  }    /**   * Stores the cluster object on the local   * objectStore using an updateImpl query.   *   * @param conn the database connection   * @param id the object's unique id.   * @param is the input stream to the serialized object   * @param length the length object the serialized object   */  public boolean storeData(HashKey id, byte []dataHash,			   InputStream is, int length)  {    ClusterConnection conn = null;        try {      conn = getConnection();      // XXX: locking?      PreparedStatement stmt = conn.prepareUpdateSaveOverride();      stmt.setBytes(1, dataHash);      stmt.setBinaryStream(2, is, length);      long now = Alarm.getCurrentTime();      stmt.setLong(3, now);      stmt.setInt(4, _version);            stmt.setBytes(5, id.getHash());      int count = stmt.executeUpdate();              if (count > 0) {	if (log.isLoggable(Level.FINE)) 	  log.fine(this + " save-data(update) " + id + " length:" + length);	  	return true;      }    } catch (SQLException e) {      log.log(Level.WARNING, e.toString(), e);    } finally {      if (conn != null)	conn.close();    }    return false;  }  /**   * Updates the metadata from a cluster peer.   *   * @param id the object's unique id.   * @param expireInterval how long the object lives w/o objectAccess   */  public boolean updateMetadata(HashKey id,				HashKey storeId,				byte []dataHash,				long expireInterval,				int primary,				int secondary,				int tertiary,				boolean isDead)  {    ClusterConnection conn = null;    try {      conn = getConnection();      // If the metadata matches, just update the version      PreparedStatement stmt = conn.prepareUpdateMetadataVersion();      long now = Alarm.getCurrentTime();      stmt.setLong(1, now);      stmt.setInt(2, _version);            stmt.setBytes(3, id.getHash());      stmt.setBytes(4, dataHash);      if (stmt.executeUpdate() > 0)	return true;    } catch (SQLException e) {      log.log(Level.FINE, e.toString(), e);    } finally {      if (conn != null)	conn.close();    }    try {      conn = getConnection();            // If the metadata mismatches, mark as invalid and return false      // so the data can be fetched      PreparedStatement stmt = conn.prepareUpdateMetadata();      stmt.setBytes(1, id.getHash());      stmt.setBytes(2, dataHash);      if (stmt.executeUpdate() > 0)	return false;    } catch (SQLException e) {      log.log(Level.FINE, e.toString(), e);    } finally {      if (conn != null)	conn.close();    }    try {      conn = getConnection();            // If no record exists, add one      PreparedStatement stmt = conn.prepareInsertMetadata();      stmt.setBytes(1, id.getHash());      stmt.setBytes(2, storeId.getHash());      stmt.setBoolean(3, isDead);      stmt.setBytes(4, dataHash);      stmt.setLong(5, Alarm.getCurrentTime());      stmt.setLong(6, expireInterval);      stmt.setInt(7, primary);      stmt.setInt(8, secondary);      stmt.setInt(9, tertiary);      stmt.setInt(10, _version);      if (stmt.executeUpdate() > 0)	return false;    } catch (SQLException e) {      log.log(Level.FINE, e.toString(), e);    } finally {      if (conn != null)	conn.close();    }    return false;  }  //  // statistics  //  public long getObjectCount()    throws SQLException  {    ClusterConnection conn = getConnection();        try {      PreparedStatement stmt = conn.prepareCount();            ResultSet rs = stmt.executeQuery();      if (rs != null && rs.next()) {	long value = rs.getLong(1);	rs.close();	return value;      }            return -1;    } catch (SQLException e) {      log.log(Level.FINE, e.toString(), e);    } finally {      conn.close();    }        return -1;  }  public void destroy()  {    _dataSource = null;    _freeConn = null;  }  private ClusterConnection getConnection()    throws SQLException  {    ClusterConnection cConn = _freeConn.allocate();    if (cConn == null) {      Connection conn = _dataSource.getConnection();      cConn = new ClusterConnection(conn);    }    return cConn;  }  public String serverNameToTableName(String serverName)  {    if (serverName == null)      return "srun";        StringBuilder cb = new StringBuilder();    cb.append("srun_");        for (int i = 0; i < serverName.length(); i++) {      char ch = serverName.charAt(i);      if ('a' <= ch && ch <= 'z') {	cb.append(ch);      }      else if ('A' <= ch && ch <= 'Z') {	cb.append(ch);      }      else if ('0' <= ch && ch <= '9') {	cb.append(ch);      }      else if (ch == '_') {	cb.append(ch);      }      else	cb.append('_');    }    return cb.toString();  }  @Override  public String toString()  {    return getClass().getSimpleName() +  "[" + _tableName + "]";  }  class ClusterConnection {    private Connection _conn;        private PreparedStatement _accessStatement;    private PreparedStatement _countStatement;        private PreparedStatement _loadIfVersionStatement;    private PreparedStatement _loadStatement;        private PreparedStatement _insertSaveStatement;    private PreparedStatement _insertMetadataStatement;        private PreparedStatement _removeStatement;    private PreparedStatement _setExpiresStatement;    private PreparedStatement _timeoutStatement;        private PreparedStatement _updateMetadataStatement;    private PreparedStatement _updateMetadataVersionStatement;        private PreparedStatement _updateSaveLockStatement;    private PreparedStatement _updateSaveOverrideStatement;    ClusterConnection(Connection conn)    {      _conn = conn;    }    PreparedStatement prepareAccess()      throws SQLException    {      if (_accessStatement == null)	_accessStatement = _conn.prepareStatement(_accessQuery);      return _accessStatement;    }    PreparedStatement prepareCount()      throws SQLException    {      if (_countStatement == null)	_countStatement = _conn.prepareStatement(_countQuery);      return _countStatement;    }    PreparedStatement prepareLoad()      throws SQLException    {      if (_loadStatement == null)	_loadStatement = _conn.prepareStatement(_loadQuery);      return _loadStatement;    }    PreparedStatement prepareLoadIfVersion()      throws SQLException    {      if (_loadIfVersionStatement == null)	_loadIfVersionStatement = _conn.prepareStatement(_loadIfVersionQuery);      return _loadIfVersionStatement;    }    PreparedStatement prepareInsertMetadata()      throws SQLException    {      if (_insertMetadataStatement == null) {	_insertMetadataStatement	  = _conn.prepareStatement(_insertMetadataQuery);      }      return _insertMetadataStatement;    }    PreparedStatement prepareInsertSave()      throws SQLException    {      if (_insertSaveStatement == null)	_insertSaveStatement = _conn.prepareStatement(_insertSaveQuery);      return _insertSaveStatement;    }    PreparedStatement prepareRemove()      throws SQLException    {      if (_removeStatement == null)	_removeStatement = _conn.prepareStatement(_removeQuery);      return _removeStatement;    }    PreparedStatement prepareSetExpireInterval()      throws SQLException    {      if (_setExpiresStatement == null)	_setExpiresStatement = _conn.prepareStatement(_setExpiresQuery);      return _setExpiresStatement;    }    PreparedStatement prepareTimeout()      throws SQLException    {      if (_timeoutStatement == null)	_timeoutStatement = _conn.prepareStatement(_timeoutQuery);      return _timeoutStatement;    }    PreparedStatement prepareUpdateMetadata()      throws SQLException    {      if (_updateMetadataStatement == null) {	_updateMetadataStatement	  = _conn.prepareStatement(_updateMetadataQuery);      }      return _updateMetadataStatement;    }    PreparedStatement prepareUpdateMetadataVersion()      throws SQLException    {      if (_updateMetadataVersionStatement == null) {	_updateMetadataVersionStatement	  = _conn.prepareStatement(_updateMetadataVersionQuery);      }      return _updateMetadataVersionStatement;    }    PreparedStatement prepareUpdateSaveLock()      throws SQLException    {      if (_updateSaveLockStatement == null)	_updateSaveLockStatement = _conn.prepareStatement(_updateSaveLockQuery);      return _updateSaveLockStatement;    }    PreparedStatement prepareUpdateSaveOverride()      throws SQLException    {      if (_updateSaveOverrideStatement == null) {	_updateSaveOverrideStatement	  = _conn.prepareStatement(_updateSaveOverrideQuery);      }      return _updateSaveOverrideStatement;    }    void close()    {      if (_freeConn != null)	_freeConn.free(this);    }  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?