📄 sharedsocket.java
字号:
// Return any cached input
//
if (vsock.inputPkts > 0) {
return dequeueInput(vsock);
}
//
// Nothing cached see if we are expecting network data
//
if (responseOwner == -1) {
throw new IOException("Stream " + streamId +
" attempting to read when no request has been sent");
}
//
// OK There should be data, check that it is for this stream
//
if (responseOwner != streamId) {
// Error we are trying to read another thread's request.
throw new IOException("Stream " + streamId +
" is trying to read data that belongs to stream " +
responseOwner);
}
//
// Simple case we are reading our input directly from the server
//
return readPacket(buffer);
}
}
/**
* Save a packet buffer in a memory queue or to a disk queue if the global
* memory limit for the driver has been exceeded.
*
* @param vsock the virtual socket owning this data
* @param buffer the data to queue
*/
private void enqueueInput(VirtualSocket vsock, byte[] buffer)
throws IOException {
//
// Check to see if we should start caching to disk
//
if (globalMemUsage + buffer.length > memoryBudget &&
vsock.pktQueue.size() >= minMemPkts &&
!securityViolation &&
vsock.diskQueue == null) {
// Try to create a disk file for the queue
try {
vsock.queueFile = File.createTempFile("jtds", ".tmp", bufferDir);
vsock.queueFile.deleteOnExit();
vsock.diskQueue = new RandomAccessFile(vsock.queueFile, "rw");
// Write current cache contents to disk and free memory
byte[] tmpBuf;
while (vsock.pktQueue.size() > 0) {
tmpBuf = (byte[]) vsock.pktQueue.removeFirst();
vsock.diskQueue.write(tmpBuf, 0, getPktLen(tmpBuf));
vsock.pktsOnDisk++;
}
} catch (java.lang.SecurityException se) {
// Not allowed to cache to disk so carry on in memory
securityViolation = true;
vsock.queueFile = null;
vsock.diskQueue = null;
}
}
if (vsock.diskQueue != null) {
// Cache file exists so append buffer to it
vsock.diskQueue.write(buffer, 0, getPktLen(buffer));
vsock.pktsOnDisk++;
} else {
// Will cache in memory
vsock.pktQueue.addLast(buffer);
globalMemUsage += buffer.length;
if (globalMemUsage > peakMemUsage) {
peakMemUsage = globalMemUsage;
}
}
vsock.inputPkts++;
}
/**
* Read a cached packet from the in memory queue or from a disk based queue.
*
* @param vsock the virtual socket owning this data
* @return a buffer containing the packet
*/
private byte[] dequeueInput(VirtualSocket vsock)
throws IOException {
byte[] buffer = null;
if (vsock.pktsOnDisk > 0) {
// Data is cached on disk
if (vsock.diskQueue.getFilePointer() == vsock.diskQueue.length()) {
// First read so rewind() file
vsock.diskQueue.seek(0L);
}
vsock.diskQueue.readFully(hdrBuf, 0, TDS_HDR_LEN);
int len = getPktLen(hdrBuf);
buffer = new byte[len];
System.arraycopy(hdrBuf, 0, buffer, 0, TDS_HDR_LEN);
vsock.diskQueue.readFully(buffer, TDS_HDR_LEN, len - TDS_HDR_LEN);
vsock.pktsOnDisk--;
if (vsock.pktsOnDisk < 1) {
// File now empty so close and delete it
try {
vsock.diskQueue.close();
vsock.queueFile.delete();
} finally {
vsock.queueFile = null;
vsock.diskQueue = null;
}
}
} else if (vsock.pktQueue.size() > 0) {
buffer = (byte[]) vsock.pktQueue.removeFirst();
globalMemUsage -= buffer.length;
}
if (buffer != null) {
vsock.inputPkts--;
}
return buffer;
}
/**
* Read a physical TDS packet from the network.
*
* @param buffer a buffer to read the data into (if it fits) or null
* @return either the incoming buffer if it was large enough or a newly
* allocated buffer with the read packet
*/
private byte[] readPacket(byte buffer[])
throws IOException {
//
// Read rest of header
try {
getIn().readFully(hdrBuf);
} catch (EOFException e) {
throw new IOException("DB server closed connection.");
}
byte packetType = hdrBuf[0];
if (packetType != TdsCore.LOGIN_PKT
&& packetType != TdsCore.QUERY_PKT
&& packetType != TdsCore.REPLY_PKT) {
throw new IOException("Unknown packet type 0x" +
Integer.toHexString(packetType & 0xFF));
}
// figure out how many bytes are remaining in this packet.
int len = getPktLen(hdrBuf);
if (len < TDS_HDR_LEN || len > 65536) {
throw new IOException("Invalid network packet length " + len);
}
if (buffer == null || len > buffer.length) {
// Create or expand the buffer as required
buffer = new byte[len];
if (len > maxBufSize) {
maxBufSize = len;
}
}
// Preserve the packet header in the buffer
System.arraycopy(hdrBuf, 0, buffer, 0, TDS_HDR_LEN);
try {
getIn().readFully(buffer, TDS_HDR_LEN, len - TDS_HDR_LEN);
} catch (EOFException e) {
throw new IOException("DB server closed connection.");
}
//
// SQL Server 2000 < SP3 does not set the last packet
// flag in the NT challenge packet.
// If this is the first packet and the length is correct
// force the last packet flag on.
//
if (++packetCount == 1 && serverType == Driver.SQLSERVER
&& "NTLMSSP".equals(new String(buffer, 11, 7))) {
buffer[1] = 1;
}
synchronized (cancelMonitor) {
//
// If a cancel request is outstanding check that the last TDS packet
// is a TDS_DONE with the "cancek ACK" flag set. If it isn't set the
// "more packets" flag; this will ensure that the stream keeps
// processing until the "cancel ACK" is processed.
//
if (cancelPending) {
//
// Move what we assume to be the TDS_DONE packet into doneBuffer
//
if (len >= TDS_DONE_LEN + TDS_HDR_LEN) {
System.arraycopy(buffer, len - TDS_DONE_LEN, doneBuffer, 0,
TDS_DONE_LEN);
} else {
// Packet too short so TDS_DONE record was split over
// two packets. Need to reassemble.
int frag = len - TDS_HDR_LEN;
System.arraycopy(doneBuffer, frag, doneBuffer, 0,
TDS_DONE_LEN - frag);
System.arraycopy(buffer, TDS_HDR_LEN, doneBuffer,
TDS_DONE_LEN - frag, frag);
}
//
// If this is the last packet and there is a cancel pending see
// if the last packet contains a TDS_DONE token with the cancel
// ACK set. If not reset the last packet flag so that the dedicated
// cancel packet is also read and processed.
//
if (buffer[1] == 1) {
if ((doneBuffer[0] & 0xFF) < TDS_DONE_TOKEN) {
throw new IOException("Expecting a TDS_DONE or TDS_DONEPROC.");
}
if ((doneBuffer[1] & TdsCore.DONE_CANCEL) != 0) {
// OK have a cancel ACK packet
cancelPending = false;
} else {
// Must be in next packet so
// force client to read next packet
buffer[1] = 0;
}
}
}
if (buffer[1] != 0) {
// End of response; connection now free
responseOwner = -1;
}
}
return buffer;
}
/**
* Retrieves the virtual socket with the given id.
*
* @param streamId id of the virtual socket to retrieve
*/
private VirtualSocket lookup(int streamId) {
if (streamId < 0 || streamId > socketTable.size()) {
throw new IllegalArgumentException("Invalid parameter stream ID "
+ streamId);
}
VirtualSocket vsock = (VirtualSocket)socketTable.get(streamId);
if (vsock.owner != streamId) {
throw new IllegalStateException("Internal error: bad stream ID "
+ streamId);
}
return vsock;
}
/**
* Convert two bytes (in network byte order) in a byte array into a Java
* short integer.
*
* @param buf array of data
* @return the 16 bit unsigned value as an <code>int</code>
*/
static int getPktLen(byte buf[]) {
int lo = ((int) buf[3] & 0xff);
int hi = (((int) buf[2] & 0xff) << 8);
return hi | lo;
}
/**
* Set the socket timeout.
*
* @param timeout the timeout value in milliseconds
*/
protected void setTimeout(int timeout) throws SocketException {
socket.setSoTimeout(timeout);
}
/**
* Getter for {@link SharedSocket#in} field.
*
* @return {@link InputStream} used for communication
*/
protected DataInputStream getIn() {
return in;
}
/**
* Setter for {@link SharedSocket#in} field.
*
* @param in the {@link InputStream} to be used for communication
*/
protected void setIn(DataInputStream in) {
this.in = in;
}
/**
* Getter for {@link SharedSocket#out} field.
*
* @return {@link OutputStream} used for communication
*/
protected DataOutputStream getOut() {
return out;
}
/**
* Setter for {@link SharedSocket#out} field.
*
* @param out the {@link OutputStream} to be used for communication
*/
protected void setOut(DataOutputStream out) {
this.out = out;
}
/**
* Get the server host name.
*
* @return the host name as a <code>String</code>
*/
protected String getHost() {
return this.host;
}
/**
* Get the server port number.
*
* @return the host port as an <code>int</code>
*/
protected int getPort() {
return this.port;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -