📄 process5.java
字号:
package cn.group;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
public class Process5{
Map idip;
private int id=6;
private int IP=9011;
TimeOut timeout=null;
boolean isOccupied=true;
private Thread tRecv=new Thread(new RecvThread());
private Thread tRecv2=new Thread(new RecvThread2());
private int incomingInt;
Process5()
{
idip=new HashMap();
idip.put(1, 9001);
idip.put(2, 9003);
idip.put(3, 9005);
idip.put(4, 9007);
idip.put(5, 9009);
idip.put(6, 9011);
idip.put(7, 9013);
tRecv.start();
tRecv2.start();
}
private void holdElection(){
timeout=new TimeOut(10);
timeout.start();
Iterator itit=idip.keySet().iterator();
while(itit.hasNext()){
Integer itNex=(Integer)itit.next();
if(itNex>this.id){
try {
Socket s=new Socket("",(Integer)idip.get(itNex));
System.out.println("Send election message to process "+itNex);
OutputStream os=s.getOutputStream();
DataOutputStream dos=new DataOutputStream(os);
dos.write(id);
dos.flush();
dos.close();
s.close();
} catch (IOException e)
{
System.out.println("Unavailable process ");
}
}
}
}
private void becomeCoordinator(){
Iterator itit=idip.keySet().iterator();
while(itit.hasNext()){
Integer itNex=(Integer)itit.next();
try {
Socket s=new Socket("",(Integer)idip.get(itNex)+1);
System.out.println("State occpuying the coordinator for"+itNex);
OutputStream os=s.getOutputStream();
DataOutputStream dos=new DataOutputStream(os);
dos.write(id+50);
dos.flush();
dos.close();
s.close();
} catch (IOException e)
{
System.out.println("Unavailable process ");
}
}
isOccupied=true;
}
//Acknowledge to what number is small relatively
private void Reply() {
try {
Socket s=new Socket("",(Integer)idip.get(incomingInt)+1);
System.out.println("Send OK to process "+incomingInt);
OutputStream os=s.getOutputStream();
DataOutputStream dos=new DataOutputStream(os);
dos.write(id);
dos.flush();
dos.close();
s.close();
} catch (IOException e)
{
System.out.println("Already dead");
}
if(isOccupied) {
holdElection();
isOccupied=false;
}
}
private class RecvThread implements Runnable{
public void run() {
// TODO Auto-generated method stub
ServerSocket ss=null;
try
{
ss = new ServerSocket(IP);
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
while(true){
try
{
Socket s=ss.accept();
DataInputStream dis=new DataInputStream(s.getInputStream());
incomingInt=dis.read();
if(incomingInt==0) {holdElection();}
else if(incomingInt<id) Reply();
dis.close();
s.close();
} catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("A process connect!");
}
}
}
private class RecvThread2 implements Runnable{
public void run() {
// TODO Auto-generated method stub
ServerSocket ss=null;
try
{
ss = new ServerSocket(IP+1);
} catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
while(true){
try {
Socket s=ss.accept();
DataInputStream dis=new DataInputStream(s.getInputStream());
int judgeValue=dis.read();
if(judgeValue>50) System.out.println(judgeValue-50+"is the new coordinator");
else {
System.out.println("Receive from process "+judgeValue);
if(timeout==null)
System.out.println("Counter does not work yet");
else{
timeout.cancel();
timeout=null;
isOccupied=true;
}
}
dis.close();
s.close();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception
{
Process5 pro=new Process5();
}
private class TimeOut {
private Timer timer = new Timer();
private int seconds;
public TimeOut(int sec)
{
this.seconds = sec;
}
public void start() {
timer.schedule(new TimerTask() {
public void run()
{
judge();
timer.cancel();
}
private void judge()
{
becomeCoordinator();
}
}, seconds* 1000);
}
public void cancel()
{
timer.cancel();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -