378629846 发表于 2013-2-3 10:38:39

JavaNIO处理长连接

之前在IBM的网站上看到过一篇介绍NIO的文章,收获很大。但文中的代码只适合短连接的情况,长连接时就不适用了。
最近恰好要写一个处理长连接的服务,接收日志包,然后打包成syslog形式再转发,所以在它的基础上改了一下。
主要改了两个类,一个是Server,因为我们只关注read事件,所以write事件我们暂不处理。另外,在处理完ON_READ事件后,不能执行key.cancel()。
package nioserver;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.LinkedList;import java.util.List;import java.util.Set;/** * <p>Title: 主控服务线程,采用NIO实现的长连接服务器</p> * @author sxl * @version 1.0 */public class Server implements Runnable {    private static Selector selector;    private ServerSocketChannel sschannel;    private InetSocketAddress address;    protected Notifier notifier;    private int port;    /**   * 创建主控服务线程   * @param port 服务端口   * @throws java.lang.Exception   */    public static int MAX_THREADS = 4;    public Server(int port) throws Exception {      this.port = port;      // 获取事件触发器      notifier = Notifier.getNotifier();      // 创建读写线程池      for (int i = 0; i < MAX_THREADS; i++) {            Thread r = new Reader();            Thread w = new Writer();            Thread sys = new Syslog();            r.start();            w.start();            sys.start();      }      // 创建无阻塞网络套接      selector = Selector.open();      sschannel = ServerSocketChannel.open();      sschannel.configureBlocking(false);      address = new InetSocketAddress(port);      ServerSocket ss = sschannel.socket();      ss.bind(address);      sschannel.register(selector, SelectionKey.OP_ACCEPT);    }    public void run() {      System.out.println("Server started ...");      System.out.println("Server listening on port: " + port);      // 监听      while (true) {            try {                int num = 0;                num = selector.select();                if (num > 0) {                  Set selectedKeys = selector.selectedKeys();                  Iterator it = selectedKeys.iterator();                  while (it.hasNext()) {                        SelectionKey key = (SelectionKey) it.next();                        it.remove();                        // 处理IO事件                        if (key.isAcceptable()) {                           // Accept the new connection                           ServerSocketChannel ssc = (ServerSocketChannel) key.channel();                           notifier.fireOnAccept();                           SocketChannel sc = ssc.accept();                           sc.configureBlocking(false);                           // 触发接受连接事件                           Request request = new Request(sc);                           notifier.fireOnAccepted(request);                           // 注册读操作,以进行下一步的读操作                           sc.register(selector,SelectionKey.OP_READ, request);                     }                     else if (key.isReadable()) {                           Reader.processRequest(key);// 提交读服务线程读取客户端数据                     }                  }                }            }            catch (Exception e) {                continue;            }      }    }} 另一个改动的类是Reader,改变对-1的处理,这里不是break,而是抛出异常。在读取完buffer的数据后,将数据包传递给另外两个线程进行syslog的发送和入库操作。
 
package nioserver;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;import java.util.LinkedList;import java.util.List;/** * <p>Title: 读线程</p> * <p>Description: 该线程用于读取客户端数据</p> * @author sxl * @version 1.0 */public class Reader extends Thread {    private static List pool = new LinkedList();    private static Notifier notifier = Notifier.getNotifier();    public void run() {      while (true) {            try {                SelectionKey key;                synchronized (pool) {                  while (pool.isEmpty()) {                        pool.wait();                  }                  key = (SelectionKey) pool.remove(0);                }                // 读取数据                read(key);            }            catch (Exception e) {                continue;            }      }    }    /**   * 读取客户端发出请求数据   * @param sc 套接通道   */    private static int BUFFER_SIZE = 1024;    public static byte[] readRequest(SocketChannel sc) throws IOException {    ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);      int off = 0;      int r = 0;      byte[] data = new byte;      while ( true ) {            buffer.clear();            r = sc.read(buffer);            if(r == 0) break;            if(r == -1)//如果是流的末尾,则抛出异常            throw new IOException();            if ((off + r) > data.length) {//数组扩容                data = grow(data, BUFFER_SIZE * 10);            }            byte[] buf = buffer.array();            System.arraycopy(buf, 0, data, off, r);            off += r;      }      byte[] req = new byte;      System.arraycopy(data, 0, req, 0, off);      return req;    }    /**   * 处理连接数据读取   * @param key SelectionKey   */    public void read(SelectionKey key) {    SocketChannel sc = null;      try {            // 读取客户端数据            sc = (SocketChannel) key.channel();            byte[] clientData =readRequest(sc);            if(clientData.length > 0){//有数据才处理                Request request = (Request)key.attachment();                request.setDataInput(clientData);                // 提交到数据库写入线程                Writer.processRequest(request);                // 提交到Syslog线程,发送syslog                Syslog.processRequest(request);            }      }      catch (Exception e) {      if(sc != null)try {sc.socket().close();sc.close();} catch (IOException e1) {e1.printStackTrace();}      }    }    /**   * 处理客户请求,管理用户的联结池,并唤醒队列中的线程进行处理   */    public static void processRequest(SelectionKey key) {      synchronized (pool) {            pool.add(pool.size(), key);            pool.notifyAll();      }    }    /**   * 数组扩容   * @param src byte[] 源数组数据   * @param size int 扩容的增加量   * @return byte[] 扩容后的数组   */    public static byte[] grow(byte[] src, int size) {      byte[] tmp = new byte;      System.arraycopy(src, 0, tmp, 0, src.length);      return tmp;    }}  
 
页: [1]
查看完整版本: JavaNIO处理长连接