📄 sharedsocket.java
字号:
if (responseOwner == -1) { throw new IOException("Stream " + streamId + " attempting to read when no request has been sent"); } // // OK There should be data, check that it is for this stream // if (responseOwner != streamId) { // Error we are trying to read another thread's request. throw new IOException("Stream " + streamId + " is trying to read data that belongs to stream " + responseOwner); } // // Simple case we are reading our input directly from the server // return readPacket(buffer); } } /** * Save a packet buffer in a memory queue or to a disk queue if the global * memory limit for the driver has been exceeded. * * @param vsock the virtual socket owning this data * @param buffer the data to queue */ private void enqueueInput(VirtualSocket vsock, byte[] buffer) throws IOException { // // Check to see if we should start caching to disk // if (globalMemUsage + buffer.length > memoryBudget && vsock.pktQueue.size() >= minMemPkts && !securityViolation && vsock.diskQueue == null) { // Try to create a disk file for the queue try { vsock.queueFile = File.createTempFile("jtds", ".tmp"); vsock.queueFile.deleteOnExit(); vsock.diskQueue = new RandomAccessFile(vsock.queueFile, "rw"); // Write current cache contents to disk and free memory byte[] tmpBuf; while (vsock.pktQueue.size() > 0) { tmpBuf = (byte[]) vsock.pktQueue.removeFirst(); vsock.diskQueue.write(tmpBuf, 0, getPktLen(tmpBuf)); vsock.pktsOnDisk++; } } catch (java.lang.SecurityException se) { // Not allowed to cache to disk so carry on in memory securityViolation = true; vsock.queueFile = null; vsock.diskQueue = null; } } if (vsock.diskQueue != null) { // Cache file exists so append buffer to it vsock.diskQueue.write(buffer, 0, getPktLen(buffer)); vsock.pktsOnDisk++; } else { // Will cache in memory vsock.pktQueue.addLast(buffer); globalMemUsage += buffer.length; if (globalMemUsage > peakMemUsage) { peakMemUsage = globalMemUsage; } } vsock.inputPkts++; } /** * Read a cached packet from the in memory queue or from a disk based queue. * * @param vsock the virtual socket owning this data * @return a buffer containing the packet */ private byte[] dequeueInput(VirtualSocket vsock) throws IOException { byte[] buffer = null; if (vsock.pktsOnDisk > 0) { // Data is cached on disk if (vsock.diskQueue.getFilePointer() == vsock.diskQueue.length()) { // First read so rewind() file vsock.diskQueue.seek(0L); } vsock.diskQueue.readFully(hdrBuf, 0, TDS_HDR_LEN); int len = getPktLen(hdrBuf); buffer = new byte[len]; System.arraycopy(hdrBuf, 0, buffer, 0, TDS_HDR_LEN); vsock.diskQueue.readFully(buffer, TDS_HDR_LEN, len - TDS_HDR_LEN); vsock.pktsOnDisk--; if (vsock.pktsOnDisk < 1) { // File now empty so close and delete it try { vsock.diskQueue.close(); vsock.queueFile.delete(); } finally { vsock.queueFile = null; vsock.diskQueue = null; } } } else if (vsock.pktQueue.size() > 0) { buffer = (byte[]) vsock.pktQueue.removeFirst(); globalMemUsage -= buffer.length; } if (buffer != null) { vsock.inputPkts--; } return buffer; } /** * Read a physical TDS packet from the network. * * @param buffer a buffer to read the data into (if it fits) or null * @return either the incoming buffer if it was large enough or a newly * allocated buffer with the read packet */ private byte[] readPacket(byte buffer[]) throws IOException { // // Read rest of header try { getIn().readFully(hdrBuf); } catch (EOFException e) { throw new IOException("DB server closed connection."); } byte packetType = hdrBuf[0]; if (packetType != TdsCore.LOGIN_PKT && packetType != TdsCore.QUERY_PKT && packetType != TdsCore.REPLY_PKT) { throw new IOException("Unknown packet type 0x" + Integer.toHexString(packetType & 0xFF)); } // figure out how many bytes are remaining in this packet. int len = getPktLen(hdrBuf); if (len < TDS_HDR_LEN || len > 65536) { throw new IOException("Invalid network packet length " + len); } if (buffer == null || len > buffer.length) { // Create or expand the buffer as required buffer = new byte[len]; if (len > maxBufSize) { maxBufSize = len; } } // Preserve the packet header in the buffer System.arraycopy(hdrBuf, 0, buffer, 0, TDS_HDR_LEN); try { getIn().readFully(buffer, TDS_HDR_LEN, len - TDS_HDR_LEN); } catch (EOFException e) { throw new IOException("DB server closed connection."); } // // SQL Server 2000 < SP3 does not set the last packet // flag in the NT challenge packet. // If this is the first packet and the length is correct // force the last packet flag on. // if (++packetCount == 1 && serverType == Driver.SQLSERVER && "NTLMSSP".equals(new String(buffer, 11, 7))) { buffer[1] = 1; } synchronized (cancelMonitor) { // // If a cancel request is outstanding check that the last TDS packet // is a TDS_DONE with the "cancek ACK" flag set. If it isn't set the // "more packets" flag; this will ensure that the stream keeps // processing until the "cancel ACK" is processed. // if (cancelPending) { // // Move what we assume to be the TDS_DONE packet into doneBuffer // if (len >= TDS_DONE_LEN + TDS_HDR_LEN) { System.arraycopy(buffer, len - TDS_DONE_LEN, doneBuffer, 0, TDS_DONE_LEN); } else { // Packet too short so TDS_DONE record was split over // two packets. Need to reassemble. int frag = len - TDS_HDR_LEN; System.arraycopy(doneBuffer, frag, doneBuffer, 0, TDS_DONE_LEN - frag); System.arraycopy(buffer, TDS_HDR_LEN, doneBuffer, TDS_DONE_LEN - frag, frag); } // // If this is the last packet and there is a cancel pending see // if the last packet contains a TDS_DONE token with the cancel // ACK set. If not reset the last packet flag so that the dedicated // cancel packet is also read and processed. // if (buffer[1] == 1) { if ((doneBuffer[0] & 0xFF) < TDS_DONE_TOKEN) { throw new IOException("Expecting a TDS_DONE or TDS_DONEPROC."); } if ((doneBuffer[1] & TdsCore.DONE_CANCEL) != 0) { // OK have a cancel ACK packet cancelPending = false; } else { // Must be in next packet so // force client to read next packet buffer[1] = 0; } } } if (buffer[1] != 0) { // End of response; connection now free responseOwner = -1; } } return buffer; } /** * Retrieves the virtual socket with the given id. * * @param streamId id of the virtual socket to retrieve */ private VirtualSocket lookup(int streamId) { if (streamId < 0 || streamId > socketTable.size()) { throw new IllegalArgumentException("Invalid parameter stream ID " + streamId); } VirtualSocket vsock = (VirtualSocket)socketTable.get(streamId); if (vsock.owner != streamId) { throw new IllegalStateException("Internal error: bad stream ID " + streamId); } return vsock; } /** * Convert two bytes (in network byte order) in a byte array into a Java * short integer. * * @param buf array of data * @return the 16 bit unsigned value as an <code>int</code> */ static int getPktLen(byte buf[]) { int lo = ((int) buf[3] & 0xff); int hi = (((int) buf[2] & 0xff) << 8); return hi | lo; } /** * Set the socket timeout. * * @param timeout the timeout value in milliseconds */ protected void setTimeout(int timeout) throws SocketException { socket.setSoTimeout(timeout); } /** * Getter for {@link SharedSocket#in} field. * * @return {@link InputStream} used for communication */ protected DataInputStream getIn() { return in; } /** * Setter for {@link SharedSocket#in} field. * * @param in the {@link InputStream} to be used for communication */ protected void setIn(DataInputStream in) { this.in = in; } /** * Getter for {@link SharedSocket#out} field. * * @return {@link OutputStream} used for communication */ protected DataOutputStream getOut() { return out; } /** * Setter for {@link SharedSocket#out} field. * * @param out the {@link OutputStream} to be used for communication */ protected void setOut(DataOutputStream out) { this.out = out; } /** * Get the server host name. * * @return the host name as a <code>String</code> */ protected String getHost() { return this.host; } /** * Get the server port number. * * @return the host port as an <code>int</code> */ protected int getPort() { return this.port; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -