📄 socketchannelhelper.java
字号:
/**
* helper class for socket
* 创建日期 2005-3-10
* FuninhandCommonLib
*
*/
package org.apache.ftpserver.util;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.log4j.Logger;
//import com.funinhand.server.v2.ClientSocketConnectedIn;
public final class SocketChannelHelper {
private static final SocketChannelHelper myInstance = new SocketChannelHelper();
private SocketChannelHelper() {
}
public static final SocketChannelHelper getInstance() {
return myInstance;
}
/**
* send data out
*
* @param socketChannel
* @param buffer
* @param timeout
* 发送超时,如果小于等于0,则永不超时
* @return int[] int[0] data length that sended out int[1] ms that send out cost
*/
public final int[] sendData(SocketChannel socketChannel, ByteBuffer buffer, long timeout)
throws IOException, InterruptedException
{
try {
if (buffer.remaining() <= 0) {
// nothing to to
return new int[2];
}
if (timeout <= 0)
timeout = Long.MAX_VALUE;
int position = buffer.position();
int limit = buffer.limit();
int totalLength = limit - position;
int writed = 0, totalWrited = 0;
long currTime = System.currentTimeMillis();
totalWrited = socketChannel.write(buffer);
boolean isTimeOut = false;
while (totalWrited < totalLength) {
buffer.position(totalWrited + position);
writed = socketChannel.write(buffer);
if (writed == 0) {
if (System.currentTimeMillis() - currTime > timeout) {
isTimeOut = true;
break;
}
Thread.sleep(10);
continue;
}
totalWrited += writed;
currTime = System.currentTimeMillis();
}
if (isTimeOut) {
String strError = "send data dest is time out:" + socketChannel;
IOException exp = new IOException(strError);
Logger.getLogger(this.getClass()).warn(strError,exp);
throw exp;
}
//socketChannel.socket().getOutputStream().flush();
long cost = System.currentTimeMillis() - currTime;
int[] result = new int[2];
result[0] = totalWrited;
result[1] = (int) cost;
socketChannel.socket().getOutputStream().flush();
return result;
}
finally {
}
}
/**
* send data out
*
* @param socketChannel
* @param buffer
* @param timeout
* 发送超时,如果小于等于0,则永不超时
* @return int[] int[0] data length that sended out int[1] ms that send out cost
*/
public final int[] sendData(Socket socket, byte[] buffer)
throws IOException
{
try {
if(buffer.length == 0)
return new int[2];
int[] result = new int[2];
long startTime = System.currentTimeMillis();
socket.getOutputStream().write(buffer);
result[0] = buffer.length;
result[1] = (int)(System.currentTimeMillis() - startTime);
return result;
}
finally {
}
}
// /**
// * send data out
// *
// * @param socketChannel
// * @param buffer
// * @param timeout
// * 发送超时,如果小于等于0,则永不超时
// * @param 限速
// * Kbps,如果 <=0 将不限速
// * @return int[] int[0] data length that sended out int[1] ms that send out cost
// */
// public final int[] sendData(ClientSocketConnectedIn socketChannel, ByteBuffer buffer, long timeout,
// int pkgSpeedKbps) throws IOException, InterruptedException
// {
// if (buffer.remaining() <= 0) {
// // nothing to to
// return new int[2];
// }
// if (timeout <= 0)
// timeout = Long.MAX_VALUE;
// int position = buffer.position();
// int limit = buffer.limit();
// int totalLength = limit - position;
// int writed = 0, totalWrited = 0;
// long currTime = System.currentTimeMillis();
// totalWrited = socketChannel.getChannel().write(buffer);
// socketChannel.markActive();
// boolean isTimeOut = false;
// while (totalWrited < totalLength) {
// buffer.position(totalWrited + position);
// int currLimit = totalWrited + position + 20480;
// if (currLimit > limit)
// currLimit = limit;
// buffer.limit(currLimit);
// writed = socketChannel.getChannel().write(buffer);
// if (writed == 0) {
// if (System.currentTimeMillis() - currTime > timeout) {
// isTimeOut = true;
// break;
// }
// Thread.sleep(10);
// continue;
// }
// socketChannel.markActive();
// totalWrited += writed;
// currTime = System.currentTimeMillis();
// }
// if (isTimeOut) {
// String strError = "send data dest is time out:" + socketChannel;
// throw new IOException(strError);
// }
// long cost = System.currentTimeMillis() - currTime;
// int[] result = new int[2];
// result[0] = totalWrited;
// result[1] = (int) cost;
// return result;
// }
/**
*
* @param socketChannel
* @param dataLength
* if <=0 will read all data back max data length = 1MB(1048576)
* @param timeOut
* time in ms
* @return ByteBuffer's position=0 ByteBuffer's limit=data that want readed
*/
public final ByteBuffer readFixPackage(final SocketChannel socketChannel, final int totalDataLength,
final long timeOut, ByteBuffer buffer) throws IOException, InterruptedException
{
final int maxLength = ((totalDataLength <= 0) ? 1048576 : totalDataLength);
final int bufferLength = (maxLength > 16384) ? 16384 : maxLength;
long currTime = System.currentTimeMillis();
if (buffer == null || buffer.capacity() < bufferLength)
buffer = ByteBuffer.allocate(bufferLength);
else
buffer.clear();
// contain byte[]
List listData = new ArrayList(maxLength / bufferLength + 1);
int currReaded = 0;
int byteRemain = maxLength, totalReaded = 0;
buffer.limit(bufferLength);
boolean isTimeOut = false;
while (true) {
int readed = socketChannel.read(buffer);
if (readed < 0) {
if (currReaded > 0) {
buffer.position(0);
buffer.limit(currReaded);
byte[] datas = new byte[currReaded];
buffer.get(datas);
listData.add(datas);
buffer.clear();
}
break;
}
else if (readed > 0) {
currReaded += readed;
totalReaded += readed;
if (currReaded == bufferLength) {
// one buffer readed
buffer.position(0);
buffer.limit(bufferLength);
byte[] datas = new byte[bufferLength];
buffer.get(datas);
listData.add(datas);
buffer.clear();
byteRemain -= bufferLength;
if (byteRemain == 0)
break;
if (byteRemain >= bufferLength)
buffer.limit(bufferLength);
else
buffer.limit(byteRemain);
currReaded = 0;
}
currTime = System.currentTimeMillis();
}
else {
if (totalReaded >= maxLength) {
if (currReaded > 0) {
buffer.position(0);
buffer.limit(currReaded);
byte[] datas = new byte[currReaded];
buffer.get(datas);
listData.add(datas);
buffer.clear();
}
break;
}
Thread.sleep(3);
if (System.currentTimeMillis() - currTime > timeOut) {
isTimeOut = true;
break;
}
}
}
if (isTimeOut) {
String strErr = "read data from destination is time out,remote:" + socketChannel;
throw new IOException(strErr);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -