📄 databaseserver.java
字号:
while(true) { char c = (char)buf.get(i++); if(c == ' ') break; portadd = portadd + c; } while(true) { char c = (char)buf.get(i++); if(c == '\0') break; TID = TID + c; } contor=i; sChannel.close(); selKey.cancel(); synchronized(Cache) { if(Cache.get(n) == null) { logger.info("Added to cache: "+n); Cache.put(n, new Server(ipadd, Integer.parseInt(portadd))); } } sChannel = connectToOther(ipadd, Integer.parseInt(portadd)); String message = "DG"+n+" "+TID+"\0"; DBWrite(message, sChannel); ByteBuffer buf2 = ByteBuffer.allocateDirect(1024); logger.info("DB wrote request to other DB "); try { synchronized(sinc) { this.selector.wakeup(); sChannel.register(this.selector, SelectionKey.OP_READ); //I want to wait for an answer } } catch (IOException e) {} } else if((char)buf.get(contor+1) == 'B') { logger.info("DB->Got lookup answer-sending modify"); String n = new String(""); String portadd = new String(""); String ipadd = new String(""); String val = new String(""); int i = contor+2; while(true) { char c = (char)buf.get(i++); if(c == ' ') break; n = n + c; } while(true) { char c = (char)buf.get(i++); if(c == ' ') break; ipadd = ipadd + c; } while(true) { char c = (char)buf.get(i++); if(c == ' ') break; portadd = portadd + c; } while(true) { char c = (char)buf.get(i++); if(c == '\0') break; val = val + c; } contor=i; sChannel.close(); selKey.cancel(); synchronized(Cache) { if(Cache.get(n) == null) { logger.info("Added to cache: "+n); Cache.put(n, new Server(ipadd, Integer.parseInt(portadd))); } } sChannel = connectToOther(ipadd, Integer.parseInt(portadd)); String message = "DM"+n+" "+val+"\0"; DBWrite(message, sChannel); logger.info("DB wrote request to modify to other DB "); } } else if((char)buf.get(contor) == 'D') { if((char)buf.get(contor+1) == 'G') { logger.info("DB->Got other DB request"); String n = new String(""); String TID = new String(""); int i = contor+2; while(true) { char c = (char)buf.get(i++); if(c == ' ') break; n = n + c; } while(true) { char c = (char)buf.get(i++); if(c == '\0') break; TID = TID + c; } contor=i; ByteBuffer buf2 = ByteBuffer.allocateDirect(1024); if(DB.get(n) == null) logger.info("N-am obiectul cerut!!!"); else { String message = "DA"+n+" "+DB.get(n)+" "+TID+"\0"; DBWrite(message, sChannel); logger.info("DB wrote answer to other DB "); } } else if((char)buf.get(contor+1) == 'A') { logger.info("DB->Got other DB answer"); String n = new String(""); String val = new String(""); String TID = new String(""); int i = contor+2; while(true) { char c = (char)buf.get(i++); if(c == ' ') break; n = n + c; } while(true) { char c = (char)buf.get(i++); if(c == ' ') break; val = val + c; } while(true) { char c = (char)buf.get(i++); if(c == '\0') break; TID = TID + c; } contor=i; //send an answer to the client with tid=TID; the client socket can be found in Data logger.info("Primit de la altDBserver info despre: "+n+" cu valoarea "+val); sChannel.close(); selKey.cancel(); if(CP.get(TID) == null) { logger.info("Eroare de trasmisie(DB), TID-ul nu exista!"); } else { String message = "A"+val+"\0"; DBWrite(message, CP.get(TID).sChannel); } } else if((char)buf.get(contor+1) == 'M') { logger.info("DB->Got DB request to modify"); String n = new String(""); String val = new String(""); int i = contor+2; while(true) { char c = (char)buf.get(i++); if(c == ' ') break; n = n + c; } while(true) { char c = (char)buf.get(i++); if(c == '\0') break; val = val + c; } contor=i; synchronized(DB) { DB.put(n, Integer.parseInt(val)); } sChannel.close(); selKey.cancel(); } } else if((char)buf.get(contor) == 'C') { //requests from clients if((char)buf.get(contor+1) == 'C') { String TID_s = ip + port + (new Integer((int)(Math.random()*2000)).toString()) + ip + port; synchronized(CP) { CP.put(TID_s, new Data(sChannel, new LinkedList<DatabaseObject>())); } logger.info("Sending TID: "+TID_s); String message ="C"+TID_s+"\0"; DBWrite(message, sChannel); contor+=3; } else if((char)buf.get(contor+1) == 'G') { String TID_s = new String(""); String n = new String(""); int i = contor+2; while(true) { char c = (char)buf.get(i++); if(c == ' ') break; TID_s = TID_s + c; } while(true) { char c = (char)buf.get(i++); if(c == '\0') break; n = n + c; } Data dbo; if((dbo=CP.get(TID_s)) == null) { logger.info("Transmission error(CL-G), TID-ul "+TID_s+" does not exist!"); } else { if(DB.get(n) != null) { String message ="A"+DB.get(n)+"\0"; DBWrite(message, sChannel); } else { //send request to lookup requestInfoLookup(n, TID_s); } } contor = i; } else if((char)buf.get(contor+1) == 'S') { String TID_s = new String(""); String n = new String(""); String val = new String(""); int i = contor+2; while(true) { char c = (char)buf.get(i++); if(c == ' ') break; TID_s = TID_s + c; } while(true) { char c = (char)buf.get(i++); if(c == ' ') break; n = n + c; } while(true) { char c = (char)buf.get(i++); if(c == '\0') break; val = val + c; } Data dbo; if((dbo=CP.get(TID_s)) == null) { logger.info("Transmission error(CL-S), the TID required does not exist!"); } else { dbo.add(new DatabaseObject(Integer.parseInt(val), n, true)); } contor = i; } else if((char)buf.get(contor+1) == 'M') { logger.info("DB-> commit"); String TID_s = new String(""); int i = contor+2; while(true) { char c = (char)buf.get(i++); if(c == '\0') break; TID_s = TID_s + c; } Data dbo; if((dbo = CP.get(TID_s)) == null) { logger.info("Transmission error(CL-M), the TID required does not exist!"); } else { for(DatabaseObject o : dbo.list) { if(DB.get(o.getName()) != null) { logger.info("DB Modifying my own object"); synchronized(DB) { DB.put(o.getName(), o.getValue()); } } else { Server sv = Cache.get(o.getName()); if(sv == null) { logger.info("Modifying->Can't find the server "+o.getName()+" in cache"); requestInfoLookupModify(o.getName(), new Integer(o.getValue()).toString()); } else { sChannel = connectToOther(sv.ip, sv.port); String message = "DM"+o.getName()+" "+o.getValue()+"\0"; DBWrite(message, sChannel); logger.info("DB wrote request to MODIFY to other DB "); } } } String message = "ok"; DBWrite(message, dbo.sChannel); dbo.sChannel.close(); synchronized(CP) { CP.remove(dbo); } } contor = i; } else if((char)buf.get(contor+1) == 'R') { String TID_s = new String(""); int i = contor+2; while(true) { char c = (char)buf.get(i++); if(c == '\0') break; TID_s = TID_s + c; } Data dbo; if((dbo=CP.get(TID_s)) == null) { logger.info("Transmission error(CL-R), the TID required does not exist!"); } else { String message = "ok"; DBWrite(message, dbo.sChannel); dbo.sChannel.close(); synchronized(CP) { CP.remove(dbo); } } contor = i; } else logger.info("OOPS! Read something else from client"); } else logger.info("OOPS! Read something else"); } } } } if (selKey.isValid() && selKey.isWritable()) { // Get channel that's ready for more bytes SocketChannel sChannel = (SocketChannel)selKey.channel(); } if (selKey.isValid() && selKey.isAcceptable()) { // Get channel that's ready for more bytes ServerSocketChannel sssChannel = (ServerSocketChannel)selKey.channel(); SocketChannel sChannel = sssChannel.accept(); ByteBuffer buf = ByteBuffer.allocateDirect(1024); if(sChannel != null) { logger.info("DB->Acceptable"); Socket socket = sChannel.socket(); sChannel.configureBlocking(false); // Register the new SocketChannel with our Selector, indicating // we'd like to be notified when there's data waiting to be read synchronized(sinc) { this.selector.wakeup(); sChannel.register(this.selector, SelectionKey.OP_READ); } logger.info("DB: Done accepting"); } } } catch(Exception ex) { } } public void DBWrite(String message, SocketChannel sChannel) { ByteBuffer buf2 = ByteBuffer.allocateDirect(1024); buf2.clear(); try { buf2.put(message.getBytes()); // Prepare the buffer for reading by the socket buf2.flip(); int numToWrite = message.getBytes().length, wr = 0; while(wr<numToWrite) { int numBytesWritten = sChannel.write(buf2); wr+=numBytesWritten; } } catch (IOException e) { logger.info("DB->Client closed connection"); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -