📄 memberimpl.java
字号:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.membership;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.transport.SenderState;
/**
* A <b>membership</b> implementation using simple multicast.
* This is the representation of a multicast member.
* Carries the host, and port of the this or other cluster nodes.
*
* @author Filip Hanik
* @version $Revision: 467222 $, $Date: 2006-10-24 05:17:11 +0200 (mar., 24 oct. 2006) $
*/
public class MemberImpl implements Member, java.io.Externalizable {
/**
* Public properties specific to this implementation
*/
public static final transient String TCP_LISTEN_PORT = "tcpListenPort";
public static final transient String TCP_LISTEN_HOST = "tcpListenHost";
public static final transient String MEMBER_NAME = "memberName";
public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] {84, 82, 73, 66, 69, 83, 45, 66};
public static final transient byte[] TRIBES_MBR_END = new byte[] {84, 82, 73, 66, 69, 83, 45, 69};
/**
* The listen host for this member
*/
protected byte[] host;
protected transient String hostname;
/**
* The tcp listen port for this member
*/
protected int port;
/**
* The tcp/SSL listen port for this member
*/
protected int securePort = -1;
/**
* Counter for how many broadcast messages have been sent from this member
*/
protected int msgCount = 0;
/**
* The number of milliseconds since this members was
* created, is kept track of using the start time
*/
protected long memberAliveTime = 0;
/**
* For the local member only
*/
protected transient long serviceStartTime;
/**
* To avoid serialization over and over again, once the local dataPkg
* has been set, we use that to transmit data
*/
protected transient byte[] dataPkg = null;
/**
* Unique session Id for this member
*/
protected byte[] uniqueId = new byte[16];
/**
* Custom payload that an app framework can broadcast
* Also used to transport stop command.
*/
protected byte[] payload = new byte[0];
/**
* Command, so that the custom payload doesn't have to be used
* This is for internal tribes use, such as SHUTDOWN_COMMAND
*/
protected byte[] command = new byte[0];
/**
* Domain if we want to filter based on domain.
*/
protected byte[] domain = new byte[0];
/**
* Empty constructor for serialization
*/
public MemberImpl() {
}
/**
* Construct a new member object
* @param name - the name of this member, cluster unique
* @param domain - the cluster domain name of this member
* @param host - the tcp listen host
* @param port - the tcp listen port
*/
public MemberImpl(String host,
int port,
long aliveTime) throws IOException {
setHostname(host);
this.port = port;
this.memberAliveTime=aliveTime;
}
public MemberImpl(String host,
int port,
long aliveTime,
byte[] payload) throws IOException {
this(host,port,aliveTime);
setPayload(payload);
}
public boolean isReady() {
return SenderState.getSenderState(this).isReady();
}
public boolean isSuspect() {
return SenderState.getSenderState(this).isSuspect();
}
public boolean isFailing() {
return SenderState.getSenderState(this).isFailing();
}
/**
* Increment the message count.
*/
protected void inc() {
msgCount++;
}
/**
* Create a data package to send over the wire representing this member.
* This is faster than serialization.
* @return - the bytes for this member deserialized
* @throws Exception
*/
public byte[] getData() {
return getData(true);
}
/**
* Highly optimized version of serializing a member into a byte array
* Returns a cached byte[] reference, do not modify this data
* @param getalive boolean
* @return byte[]
*/
public byte[] getData(boolean getalive) {
return getData(getalive,false);
}
public int getDataLength() {
return TRIBES_MBR_BEGIN.length+ //start pkg
4+ //data length
8+ //alive time
4+ //port
4+ //secure port
1+ //host length
host.length+ //host
4+ //command length
command.length+ //command
4+ //domain length
domain.length+ //domain
16+ //unique id
4+ //payload length
payload.length+ //payload
TRIBES_MBR_END.length; //end pkg
}
/**
*
* @param getalive boolean - calculate memberAlive time
* @param reset boolean - reset the cached data package, and create a new one
* @return byte[]
*/
public byte[] getData(boolean getalive, boolean reset) {
if ( reset ) dataPkg = null;
//look in cache first
if ( dataPkg!=null ) {
if ( getalive ) {
//you'd be surprised, but System.currentTimeMillis
//shows up on the profiler
long alive=System.currentTimeMillis()-getServiceStartTime();
XByteBuffer.toBytes( (long) alive, dataPkg, TRIBES_MBR_BEGIN.length+4);
}
return dataPkg;
}
//package looks like
//start package TRIBES_MBR_BEGIN.length
//package length - 4 bytes
//alive - 8 bytes
//port - 4 bytes
//secure port - 4 bytes
//host length - 1 byte
//host - hl bytes
//clen - 4 bytes
//command - clen bytes
//dlen - 4 bytes
//domain - dlen bytes
//uniqueId - 16 bytes
//payload length - 4 bytes
//payload plen bytes
//end package TRIBES_MBR_END.length
byte[] addr = host;
long alive=System.currentTimeMillis()-getServiceStartTime();
byte hl = (byte)addr.length;
byte[] data = new byte[getDataLength()];
int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length - TRIBES_MBR_END.length - 4);
int pos = 0;
//TRIBES_MBR_BEGIN
System.arraycopy(TRIBES_MBR_BEGIN,0,data,pos,TRIBES_MBR_BEGIN.length);
pos += TRIBES_MBR_BEGIN.length;
//body length
XByteBuffer.toBytes(bodylength,data,pos);
pos += 4;
//alive data
XByteBuffer.toBytes((long)alive,data,pos);
pos += 8;
//port
XByteBuffer.toBytes(port,data,pos);
pos += 4;
//secure port
XByteBuffer.toBytes(securePort,data,pos);
pos += 4;
//host length
data[pos++] = hl;
//host
System.arraycopy(addr,0,data,pos,addr.length);
pos+=addr.length;
//clen - 4 bytes
XByteBuffer.toBytes(command.length,data,pos);
pos+=4;
//command - clen bytes
System.arraycopy(command,0,data,pos,command.length);
pos+=command.length;
//dlen - 4 bytes
XByteBuffer.toBytes(domain.length,data,pos);
pos+=4;
//domain - dlen bytes
System.arraycopy(domain,0,data,pos,domain.length);
pos+=domain.length;
//unique Id
System.arraycopy(uniqueId,0,data,pos,uniqueId.length);
pos+=uniqueId.length;
//payload
XByteBuffer.toBytes(payload.length,data,pos);
pos+=4;
System.arraycopy(payload,0,data,pos,payload.length);
pos+=payload.length;
//TRIBES_MBR_END
System.arraycopy(TRIBES_MBR_END,0,data,pos,TRIBES_MBR_END.length);
pos += TRIBES_MBR_END.length;
//create local data
dataPkg = data;
return data;
}
/**
* Deserializes a member from data sent over the wire
* @param data - the bytes received
* @return a member object.
*/
public static MemberImpl getMember(byte[] data, MemberImpl member) {
return getMember(data,0,data.length,member);
}
public static MemberImpl getMember(byte[] data, int offset, int length, MemberImpl member) {
//package looks like
//start package TRIBES_MBR_BEGIN.length
//package length - 4 bytes
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -