📄 main2.java
字号:
package com.dog3.prettyBoy;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Iterator;
import java.util.Properties;
/**
* message type define: 0 ligin, 1 mo/mt, 2 status report , 4 active test , 16
* active test response, echoes back whatever it reads. A single Selector object
* is used to listen to the server socket (to accept new connections) and all
* the active socket channels.
*
* @author Ron Hitchens (ron@ronsoft.com)
* @version $Id: SelectSockets.java,v 1.5 2002/05/20 07:24:29 ron Exp $
*/
public class Main2 {
private String remoteMachine;
private ByteBuffer downloadBuffer;
// private ByteBuffer uploadBuffer;
// private ByteBuffer uploadBuffer;
private ByteBuffer MtSmsBuffer;
private MtPacket MtSmsPacket;
private ByteBuffer MtSoTestBuffer;
private SoTest activeTestPacket;
private ByteBuffer MtRespBuffer;
private SocketChannel channel;
private Selector selector;
private Statement stmt;
private PreparedStatement pstmt;
private Connection conn;
private long startMills = 0;
// private SoTest activeTestPacket = null;
/**
*
* @param argv
* @throws Exception
*/
public static void main(String[] argv) throws Exception {
Properties config = new Properties();
if (argv.length < 1) {
System.out.println(" please input config file path");
return;
}
try {
config.load(new FileInputStream(argv[0]));
} catch (Exception e) {
e.printStackTrace();
System.out.println("错误,配置文件不存在. filePath:" + argv[0]);
}
for (int i = 0; i < 100; i++) {
Main2 process = null;
try {
process = new Main2();
process.initDB(config.getProperty("jdbcUrl"), config
.getProperty("databaseUser"), config
.getProperty("databasePassword"));
process.initRemoteMachine(config.getProperty("remoteMachine"),
config.getProperty("remotePort"));
process.loop();
process.close();
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(2000);
if (process != null) {
process.close();
process = null;
}
}
}
}
public Main2() {
// downloadBuffer = ByteBuffer.allocate(Define2.Download_Packet_length);
// uploadBuffer = ByteBuffer.allocate(Define2.Upload_Packet_length);
// downloadPacket2 = new DownloadPacket2();
// uploadPacket2 = new UploadPacket2();
MtSmsBuffer = ByteBuffer.allocate(Define2.MT_Packet_length);
MtSmsPacket = new MtPacket();
}
/**
* @param string
* @param string2
* @param string3
* @throws Exception
*/
private void initDB(String jdbcUrl, String user, String password)
throws Exception {
Class.forName("com.microsoft.jdbc.sqlserver.SQLServerDriver");
conn = DriverManager.getConnection(jdbcUrl, user, password);
stmt = conn.createStatement();
pstmt = conn
.prepareStatement("{call request_proc2(?,?,?,?,?,?,?,?,?,?,?)}");
}
/**
*
* @throws Exception
*/
// 建立SOKET连接
public void initRemoteMachine(String remoteMachine1, String port1)
throws Exception {
remoteMachine = remoteMachine1;
System.out.println("Connecting to remote " + remoteMachine + " port "
+ port1);
channel = SocketChannel.open(new InetSocketAddress(remoteMachine,
Integer.parseInt(port1)));
channel.socket().setKeepAlive(true);
System.out.println("Connected success !!! ");
channel.configureBlocking(false);
selector = Selector.open();
channel.register(selector, SelectionKey.OP_READ);
}
/**
*
* @param argv
* @throws Exception
*/
public void loop() throws Exception {
MtSmsBuffer.clear();
startMills = System.currentTimeMillis();
while (true) {
processActiveTest();
processMtSms();
if (selector.select(250) == 0) {
continue;
}
/*
* Iterator it = selector.selectedKeys().iterator(); while
* (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); if
* (key.isReadable()) { if (readDataFromSocket(key) == -1) { return; } }
* it.remove(); }
*/
}
}
/**
* @throws Exception
*
*/
private void processActiveTest() throws Exception {
if (System.currentTimeMillis() - startMills > 3000) {
startMills = System.currentTimeMillis();
sendActiveTest();
}
}
/**
* @throws Exception
*
*/
private void processMtSms() throws Exception {
ResultSet rs = stmt.executeQuery("{call response_proc2}");
MtPacket myPacket = new MtPacket();
if (rs.next()) {
myPacket.reuse();
System.out.println("\n=begin process download packet");
System.out.println("ID:" + rs.getString("id"));
System.out.println("Type :" + rs.getString("Type"));
System.out.println("ThirdPartyID:" + rs.getString("ThirdPartyID"));
System.out.println("Sender :" + rs.getString("Sender"));
System.out.println("Sendto:" + rs.getString("Sendto"));
System.out.println("Msg :" + rs.getString("Msg"));
System.out.println("Worker :" + rs.getString("Worker"));
myPacket.setID(rs.getInt("id"));
myPacket.setType(rs.getBytes("Type"));
myPacket.setThirdPartyID(rs.getBytes("ThirdPartyID"));
myPacket.setFromTel(rs.getBytes("Sender"));
myPacket.setToTel(rs.getBytes("Sendto"));
myPacket.setSMSContent(rs.getBytes("Msg"));
myPacket.setWorker(rs.getBytes("Worker"));
String tableName = rs.getString("table_name");
// 将相关的消息写入到数据包
int result = channel.write((ByteBuffer) myPacket.getByteBuffer()
.position(Define2.MT_Packet_length).flip());
System.out.println("write buffer :" + result);
int id = rs.getInt("Id");
rs.close();
int updateRow = stmt.executeUpdate("{call response_update_proc2("
+ id + ",'"+tableName+"')}");
System.out.println("=end update count:" + updateRow + " by id:"
+ id);
} else {
rs.close();
}
Thread.sleep(50);
}
/**
* @throws Exception
*
*/
private void close() throws Exception {
if (channel != null) {
channel.close();
channel = null;
}
if (stmt != null) {
stmt.close();
stmt = null;
}
if (conn != null) {
conn.close();
conn = null;
}
}
/**
*
* @param key
* @return
* @throws Exception
*/
protected int readDataFromSocket(SelectionKey key) throws Exception {
int readCount = 0;
while (true) {
readCount = channel.read(MtRespBuffer);
System.out.println("read length is :" + readCount);
if (readCount < 0) {
channel.close();
System.out.println("remote machine socket is closed!");
readCount = -1;
break;
}
if (readCount == 0) {
break;
}
while (MtRespBuffer.position() >= 11) {
MtRespBuffer.flip();
ByteBuffer onePacketBuffer = ByteBuffer.allocate(11);
MtRespBuffer.get(onePacketBuffer.array());
processRequest(onePacketBuffer);
MtRespBuffer.compact();
}
}
return readCount;
}
/**
*
* @param buffer2
* @throws Exception
*/
private void processRequest(ByteBuffer ByteBuffer1) throws Exception {
MtRespPacket Packet1 = new MtRespPacket(ByteBuffer1);
System.out.println("\n-begin process MtRespPacket packet");
pstmt.clearParameters();
System.out.println("ID:" + Packet1.getID());
System.out.println("Type:" + new String(Packet1.getType()));
System.out.println("ThirdParty:" + Packet1.getThirdPaty());
System.out.println("Result:" + new String(Packet1.getResult()));
pstmt.setInt(1, Packet1.getID());
pstmt.setBytes(2, Packet1.getType());
pstmt.setBytes(3, Packet1.getThirdPaty());
pstmt.setBytes(4, Packet1.getResult());
int insertRow = pstmt.executeUpdate();
System.out.println("-end process upload packet:" + insertRow);
}
/**
*
* @param channel
* @throws Exception
*/
private void sendActiveTest() throws Exception {
SoTest activeTestPacket = new SoTest();
activeTestPacket.setID("10001".getBytes());
activeTestPacket.setType("1".getBytes());
activeTestPacket.setResult("2".getBytes());
int loginWrite = channel.write((ByteBuffer) activeTestPacket
.getByteBuffer().position(6).flip());
System.out.println("心跳消息大小:" + loginWrite);
}
private void sendPacket() {
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -