📄 ssh2sftpclient.java
字号:
/****************************************************************************** * * Copyright (c) 1999-2003 AppGate Network Security AB. All Rights Reserved. * * This file contains Original Code and/or Modifications of Original Code as * defined in and that are subject to the MindTerm Public Source License, * Version 2.0, (the 'License'). You may not use this file except in compliance * with the License. * * You should have received a copy of the MindTerm Public Source License * along with this software; see the file LICENSE. If not, write to * AppGate Network Security AB, Otterhallegatan 2, SE-41118 Goteborg, SWEDEN * *****************************************************************************/package com.mindbright.ssh2;import java.io.InputStream;import java.io.OutputStream;import java.io.IOException;import java.io.RandomAccessFile;import java.util.Hashtable;import java.util.Vector;import java.util.Enumeration;import com.mindbright.util.Queue;/** * Implements the client side of the sftp protocol. File reads and writes * can be either synchronous (blocking) or asynchronous (non-blocking). */public final class SSH2SFTPClient extends SSH2SFTP { private class ReplyLock { protected int expectType; protected SFTPPacket replyPkt; protected ReplyLock(int expectType) { this.expectType = expectType; this.replyPkt = null; } protected synchronized SFTPPacket expect() throws SFTPException { while(replyPkt == null) { try { this.wait(); } catch (InterruptedException e) { } } checkType(replyPkt, expectType); return replyPkt; } protected synchronized void received(SFTPPacket replyPkt) { this.replyPkt = replyPkt; this.notify(); } protected synchronized void cancel() { this.replyPkt = createPacket(SSH_FXP_STATUS); this.replyPkt.writeInt(SSH_FX_CONNECTION_LOST); this.notify(); } } private class WriteReplyLock extends ReplyLock { private FileHandle handle; private int len; protected WriteReplyLock(FileHandle handle, int len) { super(SSH_FXP_STATUS); this.handle = handle; this.len = len; handle.asyncStart(len); } protected synchronized void received(SFTPPacket replyPkt) { try { if(!handle.isOpen()) { /* Ignore and discard packets after close */ return; } checkType(replyPkt, expectType); handle.asyncEnd(len); } catch (SFTPException e) { handle.asyncException(e); } releasePacket(replyPkt); } protected synchronized void cancel() { handle.asyncException(new SFTPDisconnectException()); this.notify(); } } private class ReadReplyLock extends ReplyLock { private FileHandle handle; private long fileOffset; private byte[] buf; private int off; private int len; private RandomAccessFile fileTarget; private OutputStream strmTarget; private ReadReplyLock(FileHandle handle, long fileOffset, int len) { super(SSH_FXP_DATA); this.handle = handle; this.fileOffset = fileOffset; this.len = len; handle.asyncStart(len); } protected ReadReplyLock(FileHandle handle, long fileOffset, OutputStream strmTarget, int len) { this(handle, fileOffset, len); this.strmTarget = strmTarget; } protected ReadReplyLock(FileHandle handle, long fileOffset, RandomAccessFile fileTarget, int len) { this(handle, fileOffset, len); this.fileTarget = fileTarget; } protected ReadReplyLock(FileHandle handle, long fileOffset, byte[] buf, int off, int len) { this(handle, fileOffset, len); this.buf = buf; this.off = off; } protected synchronized void received(SFTPPacket replyPkt) { try { int n; if(!handle.isOpen()) { /* Ignore and discard packets after close */ return; } checkType(replyPkt, expectType); if(fileTarget != null) { n = replyPkt.readInt(); fileTarget.seek(fileOffset); fileTarget.write(replyPkt.getData(), replyPkt.getRPos(), n); } else if(strmTarget != null) { if(handle.lastOffset != fileOffset) { handle.asyncException(new SFTPException( "Out of order packets can't be handled yet!")); } n = replyPkt.readInt(); strmTarget.write(replyPkt.getData(), replyPkt.getRPos(), n); handle.lastOffset = fileOffset + n; } else { n = replyPkt.readString(buf, off); } if(n < len) { resend(replyPkt, n); } else { handle.asyncEnd(len); releasePacket(replyPkt); } } catch (IOException e) { handle.asyncException(new SFTPException(e.getMessage())); } catch (SFTPEOFException e) { handle.asyncReadEOF(); } catch (SFTPException e) { handle.asyncException(e); } } private void resend(SFTPPacket pkt, int n) { int i = getNextId(); Integer id = new Integer(i); fileOffset += n; len -= n; off += n; pkt.reset(SSH_FXP_READ, i); pkt.writeString(handle.getHandle()); pkt.writeLong(fileOffset); pkt.writeInt(len); replyLocks.put(id, this); txQueue.putLast(pkt); } protected synchronized void cancel() { handle.asyncException(new SFTPDisconnectException()); this.notify(); } } private final static int POOL_SIZE = 16; private SSH2Connection connection; private SSH2SessionChannel session; private Queue txQueue; private int id; private int version; private boolean isBlocking; private boolean isOpen; private boolean transmitterIsRunning = false; private boolean receiverIsRunning = false; private Hashtable replyLocks; private SFTPPacket[] pktPool; private int pktPoolCnt; /** * @param connection Connection to run over. * @param isBlocking True if read and write should be asynchronous. */ public SSH2SFTPClient(SSH2Connection connection, boolean isBlocking) throws SFTPException { this.connection = connection; this.id = 0; this.isBlocking = isBlocking; this.restart(); // INIT pkt don't have an id but version is in same place // SFTPPacket pkt = createPacket(); pkt.reset(SSH_FXP_INIT, SSH_FILEXFER_VERSION); pkt.writeTo(session.getStdIn()); pkt.reset(); pkt.failsafeReadFrom(session.getStdOut()); checkType(pkt, SSH_FXP_VERSION); version = pkt.readInt(); releasePacket(pkt); if(!isBlocking) { startNonblocking(); } } /** * Terminate the connection and abort any asynchronous calls which * are in progress. */ public synchronized void terminate() { receiverIsRunning = false; isOpen = false; if(session != null) { session.close(); } cancelAllAsync(); if (transmitterIsRunning) { transmitterIsRunning = false; txQueue.setBlocking(false); } session = null; if(pktPool != null) { // Be nice to the GC for(int i = 0; i < POOL_SIZE; i++) { pktPool[i] = null; } } pktPoolCnt = 0; } /** * Reopens the connection to the server. Any outstanding * asynchronous operations are aborted. */ public void restart() throws SFTPException { terminate(); session = connection.newSession(); if(!session.doSubsystem("sftp")) { // !!! TODO: fix throw new SFTPException("sftp subsystem couldn't be started on server"); } isOpen = true; pktPool = new SFTPPacket[POOL_SIZE]; pktPoolCnt = POOL_SIZE; for(int i = 0; i < POOL_SIZE; i++) { pktPool[i] = new SFTPPacket(); } } /** * Open a file on the server. * * @param name Name of file * @param flags Mode to open file with. Valid values are * <code>SSH2SFTP.SSH_FXF_*</code>. * @param attrs File attributes for new files. * * @return A handle identifying the file. */ public FileHandle open(String name, int flags, FileAttributes attrs) throws SFTPException { SFTPPacket pkt = createPacket(SSH_FXP_OPEN); pkt.writeString(name); pkt.writeInt(flags); pkt.writeAttrs(attrs); pkt = transmitExpectReply(pkt, SSH_FXP_HANDLE); FileHandle handle = new FileHandle(name, pkt.readString(), false); releasePacket(pkt); return handle; } /** * Close a file. * * @param handle Handle identifying file. */ public void close(FileHandle handle) throws SFTPException { SFTPPacket pkt = createPacket(SSH_FXP_CLOSE, handle); pkt = transmitExpectReply(pkt, SSH_FXP_STATUS); releasePacket(pkt); handle.asyncClose(); } /** * Read data from an open file on the server and stores it in a * local file. The data is stored at the same position in the * local file as it is read from in the remote file. * * @param handle Handle identifying file. * @param fileOffset Where in the file to start to read. * @param fileTarget Local file to write the data into. * @param len Number of bytes to read. * * @return The number of read bytes. */ public int read(FileHandle handle, long fileOffset, RandomAccessFile fileTarget, int len) throws SFTPException, IOException { if(!handle.isOpen()) { throw new SFTPAsyncAbortException(); } SFTPPacket pkt = createPacket(SSH_FXP_READ, handle); pkt.writeLong(fileOffset); pkt.writeInt(len); if(isBlocking) { try { pkt = transmitExpectReply(pkt, SSH_FXP_DATA); len = pkt.readInt(); fileTarget.seek(fileOffset); fileTarget.write(pkt.getData(), pkt.getRPos(), len); return len; } catch (SFTPEOFException e) { return 0; } finally { if(pkt != null) releasePacket(pkt); } } else { Integer id = new Integer(pkt.getId()); ReplyLock reply = new ReadReplyLock(handle, fileOffset, fileTarget, len); replyLocks.put(id, reply); txQueue.putLast(pkt); return len; } } /** * Read data from an open file on the server and stores it in a * local buffer. * * @param handle Handle identifying file. * @param fileOffset Where in the file to start to read. * @param buf Local buffer to store data in. Must hold * <code>len</code> bytes at the given offset. * @param off Offset in buffer to store data at. * @param len Number of bytes to read. * * @return The number of read bytes. */ public int read(FileHandle handle, long fileOffset, byte[] buf, int off, int len) throws SFTPException { if(!handle.isOpen()) { throw new SFTPAsyncAbortException(); } SFTPPacket pkt = createPacket(SSH_FXP_READ, handle); pkt.writeLong(fileOffset); pkt.writeInt(len); if(isBlocking) { try { pkt = transmitExpectReply(pkt, SSH_FXP_DATA); return pkt.readString(buf, off); } catch (SFTPEOFException e) { return 0; } finally { if(pkt != null) releasePacket(pkt); } } else { if(!isOpen) { throw new SFTPDisconnectException(); } Integer id = new Integer(pkt.getId()); ReplyLock reply = new ReadReplyLock(handle, fileOffset, buf, off, len); replyLocks.put(id, reply); txQueue.putLast(pkt); return len; } } /** * Read the entire file on the server and store in a local stream. * * @param handle Handle identifying file. * @param out Stream to store data in. * * @return Number of bytes read. */ public int readFully(FileHandle handle, OutputStream out) throws SFTPException, IOException { if(!handle.isOpen()) { throw new SFTPAsyncAbortException(); } FileAttributes attrs = fstat(handle); int len = (int)attrs.size; int foffs = 0; int cnt = 0; try { while(foffs < len) { int n = (32768 < (len - foffs) ? 32768 : (int)(len - foffs)); SFTPPacket pkt = createPacket(SSH_FXP_READ, handle); pkt.writeLong(foffs); pkt.writeInt(n); if(isBlocking) { try { pkt = transmitExpectReply(pkt, SSH_FXP_DATA); n = pkt.readInt(); out.write(pkt.getData(), pkt.getRPos(), n); } finally { if(pkt != null) releasePacket(pkt); } } else { Integer id = new Integer(pkt.getId()); ReplyLock reply = new ReadReplyLock(handle, foffs, out, n); replyLocks.put(id, reply); txQueue.putLast(pkt); } foffs += n; if(!isBlocking && ++cnt == 24) { cnt = 0; handle.asyncWait(12); } } if(!isBlocking) { handle.asyncWait(); } } finally { close(handle); } return (int)attrs.size; } /** * Internal write function. */ protected void writeInternal(FileHandle handle, SFTPPacket pkt, int len) throws SFTPException { if(isBlocking) { pkt = transmitExpectReply(pkt, SSH_FXP_STATUS); releasePacket(pkt); } else { if(!isOpen) { throw new SFTPDisconnectException(); } Integer id = new Integer(pkt.getId()); ReplyLock reply = new WriteReplyLock(handle, len); replyLocks.put(id, reply); txQueue.putLast(pkt); } } /** * Write data to a remote file. * * @param handle Handle identifying file. * @param fileOffset Offset to store data at. * @param buf Buffer containing data to write. * @param off Offset in <code>buf</code> to read data at. * @param len Number of bytes to write. */ public void write(FileHandle handle, long fileOffset, byte[] buf, int off, int len)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -