⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 protocol.java

📁 用java实现的一个socket服务器。采用非阻塞模式
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package com.huawei.comm.smap;

// $Id: Protocol.java,v 1.38 2006/04/05 05:33:09 belaban Exp $

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.Map;
import java.util.Properties;
import java.util.Vector;

class UpHandler
    extends Thread
{
  private Queue mq = null;
  private Protocol handler = null;
  private ProtocolObserver observer = null;
  private Protocol prot = null;
  protected final Log log = LogFactory.getLog(this.getClass());

  public UpHandler(Protocol prot, Queue mq, Protocol handler,
                   ProtocolObserver observer)
  {
    super();
    this.mq = mq;
    this.handler = handler;
    this.observer = observer;
    this.prot = prot;
    if (handler != null)
    {
      setName("UpHandler (" + handler.getName() + ')');
    }
    else
    {
      setName("UpHandler");
    }
    setDaemon(true);
  }

  public void setObserver(ProtocolObserver observer)
  {
    this.observer = observer;
  }

  /** Removes events from mq and calls handler.up(evt) */
  public void run()
  {
    System.out.println("UpHandler-----线程启动");
    while (!mq.closed())
    {

      try
      {
        Event evt = (Event) mq.remove();
        if (evt == null)
        {
          if (log.isWarnEnabled())
          {
            log.warn("removed null event");
          }
          continue;
        }

        if (observer != null)
        { // call debugger hook (if installed)
          if (observer.up(evt, mq.size()) == false)
          { // false means discard event
            return;
          }
        }
        int type = evt.getType();
        if (type != Event.START && type != Event.STOP)
        {
          evt = prot.handleUpEvent(evt);
        }
        if (null != evt)
        {
          handler.up(evt);
           System.out.println("smap UpHandler-队列数:"+mq.size());
        }

      }
      catch (QueueClosedException queue_closed)
      {
        break;
      }
      catch (Throwable e)
      {
        if (log.isErrorEnabled())
        {
          log.error(getName() + " caught exception", e);
        }
      }
    }
    System.out.println("UpHandler-----线程exit");
  }

}

class DownHandler
    extends Thread
{
  private Queue mq = null;
  private Protocol handler = null;
  private Protocol prot = null;
  private ProtocolObserver observer = null;
  protected final Log log = LogFactory.getLog(this.getClass());

  public DownHandler(Protocol prot, Queue mq, Protocol handler,
                     ProtocolObserver observer)
  {
    super();
    this.mq = mq;
    this.handler = handler;
    this.observer = observer;
    this.prot = prot;
    if (handler != null)
    {
      setName("DownHandler (" + handler.getName() + ')');
    }
    else
    {
      setName("DownHandler");
    }
    setDaemon(true);
  }

  public void setObserver(ProtocolObserver observer)
  {
    this.observer = observer;
  }

  /** Removes events from mq and calls handler.down(evt) */
  public void run()
  {
    System.out.println("DownHandler-----线程启动");
    while (!mq.closed())
    {

      try
      {
        Event evt = (Event) mq.remove();
        if (evt == null)
        {
          if (log.isWarnEnabled())
          {
            log.warn("removed null event");
          }
          continue;
        }

        if (observer != null)
        { // call debugger hook (if installed)
          if (observer.down(evt, mq.size()) == false)
          { // false means discard event
            continue;
          }
        }

        int type = evt.getType();
        if (type == Event.START || type == Event.STOP)
        {
          if (handler.handleSpecialDownEvent(evt) == false)
          {
            continue;
          }
        }
        else
        {
          evt = prot.handleDownEvent(evt);
        }
        if (null != evt)
        {
          handler.down(evt);
        }

      }
      catch (QueueClosedException queue_closed)
      {
        break;
      }
      catch (Throwable e)
      {
        if (log.isErrorEnabled())
        {
          log.error(getName() + " caught exception", e);
        }
      }
    }
    System.out.println("DownHandler-----线程退出");
  }

}

/**
 * The Protocol class provides a set of common services for protocol layers. Each layer has to
 * be a subclass of Protocol and override a number of methods (typically just <code>up()</code>,
 * <code>Down</code> and <code>getName</code>. Layers are stacked in a certain order to form
 * a protocol stack. <a href=org.jgroups.Event.html>Events</a> are passed from lower
 * layers to upper ones and vice versa. E.g. a Message received by the UDP layer at the bottom
 * will be passed to its higher layer as an Event. That layer will in turn pass the Event to
 * its layer and so on, until a layer handles the Message and sends a response or discards it,
 * the former resulting in another Event being passed down the stack.<p>
 * Each layer has 2 FIFO queues, one for up Events and one for down Events. When an Event is
 * received by a layer (calling the internal upcall <code>ReceiveUpEvent</code>), it is placed
 * in the up-queue where it will be retrieved by the up-handler thread which will invoke method
 * <code>Up</code> of the layer. The same applies for Events traveling down the stack. Handling
 * of the up-handler and down-handler threads and the 2 FIFO queues is donw by the Protocol
 * class, subclasses will almost never have to override this behavior.<p>
 * The important thing to bear in mind is that Events have to passed on between layers in FIFO
 * order which is guaranteed by the Protocol implementation and must be guranteed by subclasses
 * implementing their on Event queuing.<p>
 * <b>Note that each class implementing interface Protocol MUST provide an empty, public
 * constructor !</b>
 */
public abstract class Protocol
{
  protected final Properties props = new Properties();
  protected Protocol up_prot = null, down_prot = null;
  protected ProtocolStack stack = null;
  protected final Queue up_queue = new Queue();
  protected final Queue down_queue = new Queue();
  protected UpHandler up_handler = null;
  protected int up_thread_prio = -1;
  protected DownHandler down_handler = null;
  protected int down_thread_prio = -1;
  protected ProtocolObserver observer = null; // hook for debugger
  private final static long THREAD_JOIN_TIMEOUT = 1000;
  protected boolean down_thread = true; // determines whether the down_handler thread should be started
  protected boolean up_thread = true; // determines whether the up_handler thread should be started
  protected boolean stats = true; // determines whether to collect statistics (and expose them via JMX)
  protected final Log log = LogFactory.getLog(this.getClass());
  protected boolean trace = log.isTraceEnabled();
  protected boolean warn = log.isWarnEnabled();

  /**
   * Configures the protocol initially. A configuration string consists of name=value
   * items, separated by a ';' (semicolon), e.g.:<pre>
   * "loopback=false;unicast_inport=4444"
   * </pre>
   */
  public boolean setProperties(Properties props)
  {
    if (props != null)
    {
      this.props.putAll(props);
    }
    return true;
  }

  /** Called by Configurator. Removes 2 properties which are used by the Protocol directly and then
   *	calls setProperties(), which might invoke the setProperties() method of the actual protocol instance.
   */
  public boolean setPropertiesInternal(Properties props)
  {
    this.props.putAll(props);

    String str = props.getProperty("down_thread");
    if (str != null)
    {
      down_thread = Boolean.valueOf(str).booleanValue();
      props.remove("down_thread");
    }

    str = props.getProperty("down_thread_prio");
    if (str != null)
    {
      down_thread_prio = Integer.parseInt(str);
      props.remove("down_thread_prio");
    }

    str = props.getProperty("up_thread");
    if (str != null)
    {
      up_thread = Boolean.valueOf(str).booleanValue();
      props.remove("up_thread");
    }

    str = props.getProperty("up_thread_prio");
    if (str != null)
    {
      up_thread_prio = Integer.parseInt(str);
      props.remove("up_thread_prio");
    }

    str = props.getProperty("stats");
    if (str != null)
    {
      stats = Boolean.valueOf(str).booleanValue();
      props.remove("stats");
    }

    return setProperties(props);
  }

  public Properties getProperties()
  {
    return props;
  }

  public boolean isTrace()
  {
    return trace;
  }

  public void setTrace(boolean trace)
  {
    this.trace = trace;
  }

  public boolean isWarn()
  {
    return warn;
  }

  public void setWarn(boolean warn)
  {
    this.warn = warn;
  }

  public boolean upThreadEnabled()
  {
    return up_thread;
  }

  public boolean enableUpThread(boolean flag)
  {
    return up_thread = flag;
  }

  public boolean enableDownThread(boolean flag)
  {
    return down_thread = flag;
  }

  public boolean downThreadEnabled()
  {
    return down_thread;
  }

  public boolean statsEnabled()
  {
    return stats;
  }

  public void enableStats(boolean flag)
  {
    stats = flag;
  }

  public void resetStats()
  {
    ;
  }

  public String printStats()
  {
    return null;
  }

  public Map dumpStats()
  {
    return null;
  }

  public void setObserver(ProtocolObserver observer)
  {
    this.observer = observer;
    observer.setProtocol(this);
    if (up_handler != null)
    {
      up_handler.setObserver(observer);
    }
    if (down_handler != null)
    {
      down_handler.setObserver(observer);
    }
  }

  /**
   * Called after instance has been created (null constructor) and before protocol is started.
   * Properties are already set. Other protocols are not yet connected and events cannot yet be sent.
   * @exception Exception Thrown if protocol cannot be initialized successfully. This will cause the
   *                      ProtocolStack to fail, so the channel constructor will throw an exception
   */
  public void init()
      throws Exception
  {
  }

  /**
   * This method is called on a {@link org.jgroups.Channel#connect(String)}. Starts work.
   * Protocols are connected and queues are ready to receive events.
   * Will be called <em>from bottom to top</em>. This call will replace
   * the <b>START</b> and <b>START_OK</b> events.
   * @exception Exception Thrown if protocol cannot be started successfully. This will cause the ProtocolStack
   *                      to fail, so {@link org.jgroups.Channel#connect(String)} will throw an exception
   */
  public void start()
      throws Exception
  {
  }

  /**
   * This method is called on a {@link org.jgroups.Channel#disconnect()}. Stops work (e.g. by closing multicast socket).
   * Will be called <em>from top to bottom</em>. This means that at the time of the method invocation the
   * neighbor protocol below is still working. This method will replace the
   * <b>STOP</b>, <b>STOP_OK</b>, <b>CLEANUP</b> and <b>CLEANUP_OK</b> events. The ProtocolStack guarantees that
   * when this method is called all messages in the down queue will have been flushed
   */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -