📄 mtprofile.java
字号:
import java.io.IOException;
import java.net.UnknownHostException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class MTProfile {
/**
* @param args
*/
static int tc;
static int maxPerBatch;
static boolean dir;
static int timeToRestart;
static Date starting = new Date();
static Date ProgramStarting = new Date();
// static boolean killDead;
static final boolean generateID = true;
static List<Thread> threads;
static List<ProfileDL> instances;
static List<Map<Integer, String>> urls;
static boolean recurring=true;
public static void main(String[] args) {
tc = Integer.parseInt(args[0]);
maxPerBatch = Integer.parseInt(args[1]);
timeToRestart = Integer.parseInt(args[2]);
// killDead = Boolean.parseBoolean(args[3]);
Moderator.minInterval = Float.parseFloat(args[3]);
NetHelper.ipMode=Boolean.parseBoolean(args[4]);
ProfileDL.maxCount = maxPerBatch;
threads = new ArrayList<Thread>(tc);
instances = new ArrayList<ProfileDL>(tc);
urls = new ArrayList<Map<Integer, String>>(tc);
// if download to local db, synchronize industries, org tables...
new MTProfile().deamon();
}
private void deamon() {
long timeElapsed;
try {
// NetHelper.restartNet();
reset();
startThreads();
} catch (SQLException e) {
e.printStackTrace();
}
do {
System.out.println("-----------Total active thread: "
+ (Thread.activeCount()) + "-----------");
timeElapsed = new Date().getTime() - starting.getTime();
long sleepTime = timeToRestart*1000/10;
// if (Thread.activeCount() < 2 + (int) (tc /
// 1.5)||(timeElapsed>timeToRestart*800))
int workingThreads=getWorkingThreads();
if ((ProfileDL.alert > (int) (tc / 5)
|| workingThreads >0 && workingThreads <= (int) (tc / 5)
|| (timeElapsed > timeToRestart * 1000*0.9))&& (timeElapsed < timeToRestart * 1000)) {
try {
if (!ProfileDL.stop) {
ProfileDL.stop = true;
Thread.sleep(sleepTime);// wait for running threads to
exitThreads();// threads die hard
} // end
Thread.sleep(sleepTime/10);// wait for running threads to
} catch (Exception e1) {
e1.printStackTrace();
}
}
// restart if warned blocking, instead by number of threads or
// time elapsed.
// as a tradeoff method this can be considered too.
// if (Thread.activeCount() <= 2 + (int) (tc / 2)|| (timeElapsed
// > timeToRestart * 1000))
// restart net until all working thread exit safely or 2 times of
// timeout.
// it's should be safe if not that database tx leave uncommitted.
// if (workingThreads == 2
workingThreads=getWorkingThreads();
if (workingThreads <= (int) (tc / 10)||timeElapsed > timeToRestart * 1000) {
show();
try {
// NetHelper.restartNet(); // for local database strategy
// don't start ssh
NetHelper.switchIP();
reset();
restartThreads();
} catch (SQLException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (recurring);
}
private void exitThreads() {
new Thread(new ThreadHelper(tc,threads,instances)).start();
}
private int getWorkingThreads() {
int c=0;
for (int i = 0; i < tc; i++) {
if ( threads.get(i).isAlive())
c++;
}
return c;
}
private void show() {
System.out.println();
System.out.println();
float totalProgramTime = (float) (new Date().getTime() - ProgramStarting
.getTime()) / 3600000;
float speed = ((float) ProfileDL.totalofTotal / (float) (totalProgramTime * 3600));
System.out.println(">>>>>>>>>>>>>>>>>>Total pages:"
+ ProfileDL.totalofTotal + "<<<<<<<<<<<<<<<<<<<<<<<<");
System.out.println(">>>>>>>>>>>>>>>>>>Total time elapsed:"
+ totalProgramTime + " hours.<<<<<<<<<<<<<<<<<<");
System.out.println(">>>>>>>>>>>>>>>>>>Overall speed:" + speed
+ " per second.<<<<<<<<<<<<<<<<<<");
System.out.println();
System.out.println();
}
private static void reset() throws SQLException {
DataAccess.refreshConnection();
// new Thread(new DataAccess()).start();
ProfileDL.alert = 0;
ProfileDL.stop = false;
ProfileDL.totalPageCount = 0;
ProfileDL.totalLinkCount = 0;
ProfileDL.taskStart = new Date();
getUrls();
starting = new Date();
// System.out.println("get urls in ms:" + (new
// Date().getTime()-starting.getTime()));
// ProfileDL.getIDRange();
}
private static void getUrls() throws SQLException {
ResultSet rs = SearchResultData.getUrls(maxPerBatch);
int count = 0;
urls.clear();
for (int i = 0; i < tc; i++)
urls.add(new TreeMap<Integer, String>());
recurring=false;
rs.beforeFirst();
while (rs.next()&& count<(int)(maxPerBatch*1.3)) {
recurring=true;
urls.get(count % tc).put(rs.getInt(1), rs.getString(2));
count++;
}
for (Map<Integer,String> m:urls)
System.out.println("A thread is assigned " + m.size() + " Urls.");
}
/*
* private static void restartThreads() throws SQLException { int i; String
* threadsName; StringBuilder sb = new StringBuilder(); Thread[] threads =
* new Thread[Thread.activeCount()];
*
* Thread.enumerate(threads);
*
* sb.append("'"); for (Thread t : threads) { if (killDead) { if
* (t.getName().contains("wt")) { t.interrupt(); System.out.println("killing
* thread " + t.getName()); } } else { sb.append(t.getName());
* sb.append("'"); } } threadsName = sb.toString();
*
* for (i = 0; i < tc; i++) { // switch the direction. int j = dir ? i : tc -
* i - 1; if (!threadsName.contains("'wt" + String.valueOf(j) + "'"))
* runThreads2(j); } dir = !dir; }
*/
private static void startThreads() throws SQLException {
int i;
for (i = 0; i < tc; i++)
runThread(i);
//dir = !dir;
}
@SuppressWarnings("deprecation")
private static void restartThreads() throws SQLException {
int i;
// interrupt() can;t stop a thread blocked in IO.
// even stop() can't stop a thread
// alternatives: 1, use channel instead of IO classes, channels can be
// interrupted
// 2,call IO objects' close method, seems troublesome.
// use stop method, this will leave some unreleased resources
// like uncommitted txs. this will lock some rows and block other
// updates
// so never stop a thread.
/*
* if (killDead) { for (Thread t : threads) { if (t.isAlive()) {
* System.out.println("killing" + t.getName()); t.stop(); } } }
*
*/
for (i = 0; i < tc; i++) {
// switch the direction.
//int j = dir ? i : tc - i - 1;
// if (! threads.get(j).isAlive())
{
reRunThread(i);
}
}
//dir = !dir;
}
private static void reRunThread(int t) throws SQLException {
ProfileDL profileDL = new ProfileDL(t, tc, urls.get(t));
Thread thread = new Thread(profileDL, "wt" + String.valueOf(t));
instances.set(t, profileDL);
threads.set(t, thread);
thread.start();
}
private static void runThread(int t) throws SQLException {
ProfileDL profileDL = new ProfileDL(t, tc, urls.get(t));
Thread thread = new Thread(profileDL, "wt" + String.valueOf(t));
instances.add(t, profileDL);
threads.add(t, thread);
thread.start();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -