📄 mcastserviceimpl.java
字号:
/* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.catalina.tribes.membership;import java.io.IOException;import java.net.DatagramPacket;import java.net.InetAddress;import java.net.MulticastSocket;import org.apache.catalina.tribes.MembershipListener;/** * A <b>membership</b> implementation using simple multicast. * This is the representation of a multicast membership service. * This class is responsible for maintaining a list of active cluster nodes in the cluster. * If a node fails to send out a heartbeat, the node will be dismissed. * This is the low level implementation that handles the multicasting sockets. * Need to fix this, could use java.nio and only need one thread to send and receive, or * just use a timeout on the receive * @author Filip Hanik * @version $Revision: 356540 $, $Date: 2005-12-13 10:53:40 -0600 (Tue, 13 Dec 2005) $ */public class McastServiceImpl{ private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( McastService.class ); /** * Internal flag used for the listen thread that listens to the multicasting socket. */ protected boolean doRun = false; /** * Socket that we intend to listen to */ protected MulticastSocket socket; /** * The local member that we intend to broad cast over and over again */ protected MemberImpl member; /** * The multicast address */ protected InetAddress address; /** * The multicast port */ protected int port; /** * The time it takes for a member to expire. */ protected long timeToExpiration; /** * How often to we send out a broadcast saying we are alive, must be smaller than timeToExpiration */ protected long sendFrequency; /** * Reuse the sendPacket, no need to create a new one everytime */ protected DatagramPacket sendPacket; /** * Reuse the receivePacket, no need to create a new one everytime */ protected DatagramPacket receivePacket; /** * The membership, used so that we calculate memberships when they arrive or don't arrive */ protected McastMembership membership; /** * The actual listener, for callback when shits goes down */ protected MembershipListener service; /** * Thread to listen for pings */ protected ReceiverThread receiver; /** * Thread to send pings */ protected SenderThread sender; /** * When was the service started */ protected long serviceStartTime = System.currentTimeMillis(); protected int mcastTTL = -1; protected int mcastSoTimeout = -1; protected InetAddress mcastBindAddress = null; /** * Create a new mcast service impl * @param member - the local member * @param sendFrequency - the time (ms) in between pings sent out * @param expireTime - the time (ms) for a member to expire * @param port - the mcast port * @param bind - the bind address (not sure this is used yet) * @param mcastAddress - the mcast address * @param service - the callback service * @throws IOException */ public McastServiceImpl( MemberImpl member, long sendFrequency, long expireTime, int port, InetAddress bind, InetAddress mcastAddress, int ttl, int soTimeout, MembershipListener service) throws IOException { this.member = member; address = mcastAddress; this.port = port; this.mcastSoTimeout = soTimeout; this.mcastTTL = ttl; this.mcastBindAddress = bind; setupSocket(); sendPacket = new DatagramPacket(new byte[1000],1000); sendPacket.setAddress(address); sendPacket.setPort(port); receivePacket = new DatagramPacket(new byte[1000],1000); receivePacket.setAddress(address); receivePacket.setPort(port); membership = new McastMembership(member); timeToExpiration = expireTime; this.service = service; this.sendFrequency = sendFrequency; } protected void setupSocket() throws IOException { if (mcastBindAddress != null) socket = new MulticastSocket(new java.net. InetSocketAddress(mcastBindAddress, port)); else socket = new MulticastSocket(port); if (mcastBindAddress != null) { if(log.isInfoEnabled()) log.info("Setting multihome multicast interface to:" + mcastBindAddress); socket.setInterface(mcastBindAddress); } //end if if ( mcastSoTimeout >= 0 ) { if(log.isInfoEnabled()) log.info("Setting cluster mcast soTimeout to "+mcastSoTimeout); socket.setSoTimeout(mcastSoTimeout); } if ( mcastTTL >= 0 ) { if(log.isInfoEnabled()) log.info("Setting cluster mcast TTL to " + mcastTTL); socket.setTimeToLive(mcastTTL); } } /** * Start the service * @param level 1 starts the receiver, level 2 starts the sender * @throws IOException if the service fails to start * @throws IllegalStateException if the service is already started */ public synchronized void start(int level) throws IOException { if ( sender != null && receiver != null ) throw new IllegalStateException("Service already running."); if ( level == 1 ) { socket.joinGroup(address); doRun = true; receiver = new ReceiverThread(); receiver.setDaemon(true); receiver.start(); } if ( level==2 ) { serviceStartTime = System.currentTimeMillis(); sender = new SenderThread(sendFrequency); sender.setDaemon(true); sender.start(); } } /** * Stops the service * @throws IOException if the service fails to disconnect from the sockets */ public synchronized void stop() throws IOException { socket.leaveGroup(address); doRun = false; sender = null; receiver = null; serviceStartTime = Long.MAX_VALUE; } /** * Receive a datagram packet, locking wait * @throws IOException */ public void receive() throws IOException { socket.receive(receivePacket); byte[] data = new byte[receivePacket.getLength()]; System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length); MemberImpl m = MemberImpl.getMember(data); if(log.isDebugEnabled()) log.debug("Mcast receive ping from member " + m); if ( membership.memberAlive(m) ) { if(log.isDebugEnabled()) log.debug("Mcast add member " + m); service.memberAdded(m); } MemberImpl[] expired = membership.expire(timeToExpiration); for ( int i=0; i<expired.length; i++) { if(log.isDebugEnabled()) log.debug("Mcast exipre member " + m); service.memberDisappeared(expired[i]); } } /** * Send a ping * @throws Exception */ public void send() throws Exception{ member.inc(); if(log.isDebugEnabled()) log.debug("Mcast send ping from member " + member); byte[] data = member.getData(); DatagramPacket p = new DatagramPacket(data,data.length); p.setAddress(address); p.setPort(port); socket.send(p); } public long getServiceStartTime() { return this.serviceStartTime; } public class ReceiverThread extends Thread { public ReceiverThread() { super(); setName("Cluster-MembershipReceiver"); } public void run() { while ( doRun ) { try { receive(); } catch ( Exception x ) { log.warn("Error receiving mcast package. Sleeping 500ms",x); try { Thread.sleep(500); } catch ( Exception ignore ){} } } } }//class ReceiverThread public class SenderThread extends Thread { long time; public SenderThread(long time) { this.time = time; setName("Cluster-MembershipSender"); } public void run() { while ( doRun ) { try { send(); } catch ( Exception x ) { log.warn("Unable to send mcast message.",x); } try { Thread.sleep(time); } catch ( Exception ignore ) {} } } }//class SenderThread}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -