找回密码
 立即注册
首页 业界区 业界 Netty基础—2.网络编程基础二

Netty基础—2.网络编程基础二

后沛若 2025-6-4 22:40:55
大纲
1.网络编程简介
2.BIO网络编程
3.AIO网络编程
4.NIO网络编程之Buffer
5.NIO网络编程之实战
6.NIO网络编程之Reactor模式
 
1.网络编程简介
既然是通信,那么肯定会有两个对端。在网络编程里提供服务的一方叫服务端,连接服务端使用服务的另一方叫客户端。
 
如果类的名字有Server或ServerSocket,则表示这个类是给服务端用的。如果类的名字只有Socket的,则表示这个类是负责具体的网络读写的。
 
对于服务端来说ServerSocket只是个场所,具体和客户端沟通的是Socket。所以在网络编程里,ServerSocket并不负责具体的网络读写,ServerSocket只负责接收客户端连接后分配一个Socket处理具体网络读写。这一点对所有模式(BIO、AIO、NIO)的网络编程都适用。
 
在网络编程里,开发者关注的其实就三个事情:
一.建立网络连接
客户端连接服务端,服务端等待和接收连接。
二.读网络数据
三.写网络数据
 
所有模式的通信编程(BIO、AIO、NIO)都是围绕上面这三件事情进行的。
 
2.BIO网络编程
(1)BIO即阻塞IO模型
(2)BIO编程介绍
(3)BIO模型的问题与改进
(4)BIO编程的标准模式
(5)BIO编程的例子
 
(1)BIO即阻塞IO模型
1.png
(2)BIO编程介绍
服务端提供IP和端口,客户端通过连接操作向服务端发起连接请求。如果通过三次握手成功建立TCP连接,双方就可以通过套接字进行通信。
 
在传统的同步阻塞模型(BIO)开发中:服务端的ServerSocket负责绑定IP地址 + 启动监听端口,客户端的Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。
 
BIO模型的服务端,通常由一个Acceptor线程负责监听客户端的连接。Acceptor线程接收到每个客户端连接请求后都会创建一个新的线程进行处理,请求处理完毕再通过输出流返回响应给客户端,然后再销毁线程。这就是典型的:一个请求一个响应模型。
 
(3)BIO模型的问题与改进
BIO模型的最大问题是缺乏弹性伸缩能力。当客户端并发访问量增加后,服务端线程个数和客户端并发访问数呈1:1关系。线程是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降。随着访问量的继续增大,系统最终可能会挂掉。
 
为了改进这种一个连接一个线程的模型,可以使用线程池来管理这些线程,从而实现1个或多个线程处理N个客户端的模型。但是这种改进后的模型的底层还是使用同步阻塞IO,通常这种通过线程池进行改进的模型被称为伪异步IO模型。
 
如果线程池使用的是CachedThreadPool线程池,那么除了能自动管理线程外,客户端 : 线程数看起来也像是1 : 1。如果线程池使用的是FixedThreadPool线程池,那么就能有效控制线程数量,实现客户端 : 线程数 = N : M的伪异步IO模型。
 
但是正因为限制了线程数量,如果发生读取数据慢(比如数据量大、网络传输慢等)且存在大量并发的情况,那么对其他连接请求的处理就只能一直等待,这就是最大的弊端。
2.png
(4)BIO编程的标准模式
说明一:客户端指定服务端的IP地址和端口,对服务端发起连接请求。
 
说明二:服务端需要有一个ServerSocket,来处理客户端发起的连接请求。服务端收到连接请求后,ServerSocket的accept()方法会创建一个Socket。然后将这个Socket打包成一个Runable型的任务并放到一个线程池里执行。
 
说明三:在这个Runable任务里便会根据这个Socket进行具体的读写和业务处理。
 
以上就是BIO编程的标准模式。BIO受限于并发下的线程数量,一般用于客户端较多。尽管早期的Tomcat也使用了线程池的BIO,但毕竟性能不高。
 
(5)BIO编程的例子
  1. //类说明: 客户端
  2. //允许客户端在控制台输入数据然后送往服务器, 然后读取服务端输出的数据
  3. //这里的示例是一个长连接, 如果要实现短连接可以在每次客户端发起完请求后将socket关闭即可
  4. public class BioClient {
  5.     //服务器端口号
  6.     public static int DEFAULT_PORT = 12345;
  7.     public static String DEFAULT_SERVER_IP = "127.0.0.1";
  8.     public static void main(String[] args) throws InterruptedException, IOException {
  9.         //通过构造函数创建Socket, 并且连接指定地址和端口的服务端
  10.         Socket socket = new Socket(DEFAULT_SERVER_IP, DEFAULT_PORT);
  11.         System.out.println("请输入请求消息: ");
  12.         //启动读取服务端输出数据的线程
  13.         new ReadMsg(socket).start();
  14.         //允许客户端在控制台输入数据, 然后送往服务器
  15.         PrintWriter pw = null;
  16.         while(true) {
  17.             pw = new PrintWriter(socket.getOutputStream());
  18.             pw.println(new Scanner(System.in).next());
  19.             pw.flush();//把数据通过socket刷到网络上去
  20.         }
  21.     }
  22.     //读取服务端输出数据的线程: 进行连接的发起和读写
  23.     private static class ReadMsg extends Thread {
  24.         Socket socket;
  25.         public ReadMsg(Socket socket) {
  26.             this.socket = socket;
  27.         }
  28.         @Override
  29.         public void run() {
  30.             //负责处理socket读写的输入流, 使用try-resources语法
  31.             try (BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
  32.                 String line = null;
  33.                 //通过输入流读取服务端传输的数据
  34.                 //如果已经读到输入流尾部, 返回null, 退出循环; 如果得到非空值, 就将结果进行业务处理
  35.                 while ((line = br.readLine()) != null) {
  36.                     System.out.printf("客户端业务处理: %s\n", line);
  37.                 }
  38.             } catch (SocketException e) {
  39.                 System.out.printf("%s\n", "服务器断开了你的连接");
  40.             } catch (Exception e) {
  41.                 e.printStackTrace();
  42.             } finally {
  43.                 //使用try-resources语法, 可以不用下面这段判断对资源进行关闭
  44.                 clear();
  45.             }
  46.         }
  47.         //必要的资源清理工作
  48.         private void clear() {
  49.             if (socket != null) {
  50.                 try {
  51.                     socket.close();
  52.                 } catch (IOException e) {
  53.                     e.printStackTrace();
  54.                 }
  55.             }
  56.         }
  57.     }
  58. }
  59. //类说明: BIO的服务端主程序
  60. //接收客户的连接, 通过ServerSocket来进行处理socket
  61. //把客户发送的连接打包成任务交给线程池去执行处理
  62. public class BioServer {
  63.     //服务器端口号
  64.     public static int DEFAULT_PORT = 12345;
  65.     public static String DEFAULT_SERVER_IP = "127.0.0.1";
  66.     //服务器端必须: 用来接待用户请求, 其中的accept()方法就会产生一个socket
  67.     private static ServerSocket server;
  68.     //线程池: 处理每个客户端的请求
  69.     private static ExecutorService executorService = Executors.newFixedThreadPool(5);
  70.     public static void main(String[] args) throws IOException {
  71.         start();
  72.     }
  73.    
  74.     //启动服务器端
  75.     private static void start() throws IOException {
  76.         try {
  77.             //通过构造函数创建ServerSocket
  78.             //如果端口合法且空闲, 服务端就监听成功
  79.             server = new ServerSocket(DEFAULT_PORT);
  80.             System.out.println("服务器已启动, 端口号: " + DEFAULT_PORT);
  81.             while(true) {
  82.                 //通过accept()方法产生一个socket和新的客户端进行沟通, 这是一个长连接
  83.                 Socket socket= server.accept();
  84.                 System.out.println("有新的客户端连接----" );
  85.                 //当有新的客户端接入时, 将socket打包成一个任务, 投入到线程池
  86.                 executorService.execute(new BioServerHandler(socket));
  87.             }
  88.         } finally {
  89.             if (server != null) {
  90.                 server.close();
  91.             }
  92.         }
  93.     }
  94. }
  95. //类说明: 线程池中用来处理客户端具体socket的输入输出流的线程
  96. public class BioServerHandler implements Runnable {
  97.     private Socket socket;
  98.     public BioServerHandler(Socket socket) {
  99.         this.socket = socket;
  100.     }
  101.     public void run() {
  102.         //负责socket读写的输出、输入流, 使用try-resources语法
  103.         try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
  104.             PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {
  105.             String message;
  106.             String result;
  107.             //通过输入流读取客户端传输的数据
  108.             //如果已经读到输入流尾部, 返回null, 退出循环; 如果得到非空值, 就将结果进行业务处理
  109.             while((message = in.readLine()) != null) {
  110.                 System.out.println("服务端接收到信息: " + message);
  111.                 result = "返回客户端信息: 你好, " + message + ", 现在时间是: " + new Date(System.currentTimeMillis()).toString() ;
  112.                 //将业务结果通过输出流返回给客户端
  113.                 out.println(result);
  114.             }
  115.         } catch (Exception e) {
  116.             e.printStackTrace();
  117.         } finally {
  118.             //使用try-resources语法, 可以不用下面这段判断对资源进行关闭
  119.             if (socket != null) {
  120.                 try {
  121.                     socket.close();
  122.                 } catch (IOException e) {
  123.                     e.printStackTrace();
  124.                 }
  125.                 socket = null;
  126.             }
  127.         }
  128.     }
  129. }
复制代码
 
3.AIO网络编程
(1)AIO即异步IO模型
(2)AIO编程介绍
(3)客户端AsynchronousSocketChannel
(4)服务端AsynchronousServerSocketChannel
(5)AIO、BIO和NIO对比
 
(1)AIO即异步IO模型
3.png
(2)AIO编程介绍
AIO核心类有两个:
AsynchronousSocketChannel(客户端)
AsynchronousServerSocketChannel(服务端)
 
AIO为TCP通信提供了异步Channel。AsynchronousServerSocketChannel创建成功后,类似于ServerSocket,也是调用accept()方法来接受来自客户端的连接。
 
由于异步IO实际的IO操作是交给操作系统来做的,用户进程只负责通知操作系统进行IO和接受操作系统IO完成的通知。所以调用异步的ServerSocketChannel的accept()方法,当前线程不会阻塞。但程序不知道accept()方法何时能接收到客户端请求且操作系统完成网络IO。为解决这个问题,AIO中ServerSocketChannel的accept()方法是这样的。
  1. public abstract  void accept(
  2.     A attachment,
  3.     CompletionHandler handler
  4. );
  5. public interface CompletionHandler<V,A> {
  6.     //当IO完成时触发该方法
  7.     //该方法的第一个参数代表IO操作返回的对象,第二个参数代表发起IO操作时传入的附加参数
  8.     void completed(V result, A attachment);
  9.     //当IO失败时触发该方法,第一个参数代表IO操作失败引发的异常或错误
  10.     void failed(Throwable exc, A attachment);
  11. }
复制代码
AIO中ServerSocketChannel的accept()方法会接受来自客户端请求,连接成功或失败都会触发CompletionHandler对象的相应方法。
 
(3)客户端AsynchronousSocketChannel
AsynchronousSocketChannel的的用法与Socket类似,有三个方法:
一.connect()连接指定端口 + IP地址的服务器
二.read()处理读
三.write()处理写
4.png
下面是AIO的客户端的代码示例:
  1. //类说明: AIO的客户端主程序
  2. public class AioClient {
  3.     //IO通信处理器
  4.     private static AioClientHandler clientHandle;
  5.     public static void start() {
  6.         if (clientHandle != null) {
  7.             return;
  8.         }
  9.         clientHandle = new AioClientHandler(DEFAULT_SERVER_IP, DEFAULT_PORT);
  10.         //负责网络通讯的线程
  11.         new Thread(clientHandle,"Client").start();
  12.     }
  13.     //向服务器发送消息
  14.     public static boolean sendMsg(String msg) throws Exception {
  15.         if (msg.equals("q")) {
  16.             return false;
  17.         }
  18.         clientHandle.sendMessage(msg);
  19.         return true;
  20.     }
  21.     public static void main(String[] args) throws Exception {
  22.         AioClient.start();
  23.         System.out.println("请输入发送给服务端的请求消息: ");
  24.         Scanner scanner = new Scanner(System.in);
  25.         while(AioClient.sendMsg(scanner.nextLine()));
  26.     }
  27. }
  28. //1.clientChannel.connect向服务端发起连接,是异步的
  29. //连接成功/失败系统会调用completed/failed方法进行通知
  30. //类说明: IO通信处理器, 1.负责连接服务器, 2.提供对外暴露的可以向服务端发送数据的API
  31. //AioClientHandler也会被打包成一个任务放到线程池里面去执行, 所以要实现Runnable
  32. //由于客户端发起的连接是基于AIO的, 需要提示出连接成功或者失败的返回说明, 因此也要实现CompletionHandler
  33. public class AioClientHandler implements CompletionHandler<Void, AioClientHandler>, Runnable {
  34.     private AsynchronousSocketChannel clientChannel;//客户端进行异步操作的socket
  35.     private String host;
  36.     private int port;
  37.     //防止线程退出不好演示效果, 因为任务是一个异步IO, 方法执行完后会迅速返回,
  38.     //为了不让客户端程序过早退出, 所以这里用CountDownLatch阻塞一下
  39.     private CountDownLatch latch;
  40.    
  41.     public AioClientHandler(String host, int port) {
  42.         this.host = host;
  43.         this.port = port;
  44.         try {
  45.             //创建一个实际异步的客户端通道, 下面会基于这个通道发起连接
  46.             //连接成功会执行completed方法, 连接失败会执行failed方法
  47.             clientChannel = AsynchronousSocketChannel.open();
  48.         } catch (IOException e) {
  49.             e.printStackTrace();
  50.         }
  51.     }
  52.     @Override
  53.     public void run() {
  54.         //创建CountDownLatch, 因为是异步调用, 下面的connect不会阻塞, 那么整个run方法会迅速结束, 那么负责网络通讯的线程也会迅速结束
  55.         //所以使用这个CountDownLatch就是为了进行阻塞
  56.         latch = new CountDownLatch(1);
  57.         //发起异步连接操作, 回调参数就是这个实例本身, 如果连接成功会回调这个实例的completed方法
  58.         clientChannel.connect(new InetSocketAddress(host, port), null,this);
  59.         try {
  60.             latch.await();
  61.             clientChannel.close();
  62.         } catch (InterruptedException e) {
  63.             e.printStackTrace();
  64.         } catch (IOException e) {
  65.             e.printStackTrace();
  66.         }
  67.     }
  68.     //连接成功, 这个方法会被系统调用
  69.     @Override
  70.     public void completed(Void result, AioClientHandler attachment) {
  71.         System.out.println("已经连接到服务端");
  72.     }
  73.     //连接失败, 这个方法会被系统调用
  74.     @Override
  75.     public void failed(Throwable exc, AioClientHandler attachment) {
  76.         System.err.println("连接到服务端失败");
  77.         exc.printStackTrace();
  78.         latch.countDown();
  79.         try {
  80.             clientChannel.close();
  81.         } catch (IOException e) {
  82.             e.printStackTrace();
  83.         }
  84.     }
  85.     //提供对外暴露的可以向服务端发送数据的API
  86.     public void sendMessage(String msg) {
  87.         //为了把msg变成可以在网络传输的格式: 把字符串变成字节数组, 再把字节数组放入缓冲区, 然后对缓冲区进行读
  88.         byte[] bytes = msg.getBytes();
  89.         ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  90.         writeBuffer.put(bytes);
  91.         writeBuffer.flip();
  92.         //进行异步写, 向服务端发送消息, 同样的这个方法会迅速返回
  93.         //而且也需要提供一个接口让系统在一次网络写操作完成后通知应用程序
  94.         //所以传入一个实现了CompletionHandler的AioClientWriteHandler
  95.         //第1个writeBuffer, 表示客户端要发送给服务器的数据
  96.         //第2个writeBuffer, 考虑到网络写有可能无法一次性将数据写完,需要进行多次网络写
  97.         //所以将writeBuffer作为附件传递给AioClientWriteHandler
  98.         clientChannel.write(writeBuffer, writeBuffer, new AioClientWriteHandler(clientChannel, latch));
  99.     }
  100. }
  101. //2.clientChannel.write向服务端写数据,也是异步的
  102. //类说明: 网络写的处理器, 负责异步写完之后的处理
  103. //在CompletionHandler<Integer, ByteBuffer>中
  104. //Integer: 表示本次网络写操作完成实际写入的字节数
  105. //ByteBuffer: 表示写操作的附件, 存储了写操作需要写入的数据
  106. public class AioClientWriteHandler implements CompletionHandler<Integer, ByteBuffer> {
  107.     private AsynchronousSocketChannel clientChannel;
  108.     private CountDownLatch latch;
  109.     public AioClientWriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) {
  110.         this.clientChannel = clientChannel;
  111.         this.latch = latch;
  112.     }
  113.     @Override
  114.     public void completed(Integer result, ByteBuffer buffer) {
  115.         //有可能无法一次性将数据写完, 需要检查缓冲区中是否还有数据需要继续进行网络写
  116.         if (buffer.hasRemaining()) {
  117.             clientChannel.write(buffer, buffer, this);
  118.         } else {
  119.             //写操作已经完成, 为读取服务端传回的数据建立缓冲区
  120.             ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  121.             //进行异步读, 这个方法会迅速返回, 而且也需要提供一个接口让系统在读操作完成后通知我们的应用程序
  122.             //所以要传入一个实现了CompletionHandler的AioClientReadHandler
  123.             clientChannel.read(readBuffer, readBuffer, new AioClientReadHandler(clientChannel, latch));
  124.         }
  125.     }
  126.     @Override
  127.     public void failed(Throwable exc, ByteBuffer attachment) {
  128.         System.err.println("数据发送失败...");
  129.         try {
  130.             clientChannel.close();
  131.             latch.countDown();
  132.         } catch (IOException e) {
  133.         }
  134.     }
  135. }
  136. //3.clientChannel.read读取服务端的数据,也是异步的
  137. //类说明: 网络读的处理器, 负责异步读完之后的处理
  138. //在CompletionHandler<Integer, ByteBuffer>中
  139. //Integer: 表示本次网络写操作完成实际写入的字节数
  140. //ByteBuffer: 表示写操作的附件, 存储了写操作需要写入的数据
  141. public class AioClientReadHandler implements CompletionHandler<Integer, ByteBuffer> {
  142.     private AsynchronousSocketChannel clientChannel;
  143.     private CountDownLatch latch;
  144.     public AioClientReadHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) {
  145.         this.clientChannel = clientChannel;
  146.         this.latch = latch;
  147.     }
  148.     @Override
  149.     public void completed(Integer result, ByteBuffer buffer) {
  150.         buffer.flip();
  151.         byte[] bytes = new byte[buffer.remaining()];
  152.         buffer.get(bytes);
  153.         String msg;
  154.         try {
  155.             msg = new String(bytes,"UTF-8");
  156.             System.out.println("读取到服务端的数据: " + msg);
  157.         } catch (UnsupportedEncodingException e) {
  158.             e.printStackTrace();
  159.         }
  160.     }
  161.     @Override
  162.     public void failed(Throwable exc, ByteBuffer attachment) {
  163.         System.err.println("数据读取失败...");
  164.         try {
  165.             clientChannel.close();
  166.             latch.countDown();
  167.         } catch (IOException e) {
  168.         }
  169.     }
  170. }
复制代码
(4)服务端AsynchronousServerSocketChannel
下面是服务端程序的代码示例:
  1. //类说明: 服务端主程序
  2. public class AioServer {
  3.     private static AioServerHandler serverHandle;
  4.     //统计客户端个数
  5.     public volatile static long clientCount = 0;
  6.     public static void start() {
  7.         if (serverHandle != null) {
  8.             return;
  9.         }
  10.         serverHandle = new AioServerHandler(DEFAULT_PORT);
  11.         new Thread(serverHandle, "Server").start();
  12.     }
  13.     public static void main(String[] args) {
  14.         AioServer.start();
  15.     }
  16. }
  17. //1.serverChannel.accept接收客户端的连接,是异步的
  18. //所以需要AioAcceptHandler实现CompletionHandler来让操作系统进行调用和通知
  19. //类说明: 响应网络操作的处理器, 接收客户端的连接
  20. public class AioServerHandler implements Runnable {
  21.     //使用latch进行阻塞, 防止任务跑完run方法后就退出了
  22.     public CountDownLatch latch;
  23.     //建立ServerSocket, 进行异步通信的通道
  24.     public AsynchronousServerSocketChannel serverChannel;
  25.     public AioServerHandler(int port) {
  26.         try {
  27.             //创建服务端通道
  28.             serverChannel = AsynchronousServerSocketChannel.open();
  29.             //绑定端口
  30.             serverChannel.bind(new InetSocketAddress(port));
  31.             System.out.println("服务端已经启动, 端口是: " + port);
  32.         } catch (IOException e) {
  33.             e.printStackTrace();
  34.         }
  35.     }
  36.     @Override
  37.     public void run() {
  38.         latch = new CountDownLatch(1);
  39.         //用于接收客户端的连接, 异步操作, 需要实现了CompletionHandler接口的处理器处理和客户端的连接操作
  40.         serverChannel.accept(this, new AioAcceptHandler());
  41.         try {
  42.             //使用latch进行阻塞, 防止任务跑完run方法后就退出了
  43.             latch.await();
  44.         } catch (InterruptedException e) {
  45.             e.printStackTrace();
  46.         }
  47.     }
  48. }
  49. //2.AioAcceptHandler的channel就是真正和客户端进行通信用的socket,区别于serverChannel仅作为中介来分配
  50. //不过要注意的是:每完成一次serverChannel.accept连接之后,需要重新再一次注册监听,让别的客户端也可以连接
  51. //类说明: 服务端用来处理接收客户端连接的处理器
  52. public class AioAcceptHandler implements CompletionHandler {
  53.     //这里的channel就是真正和客户端进行通信用的socket, 区别于serverChannel仅作为中介来分配
  54.     @Override
  55.     public void completed(AsynchronousSocketChannel channel, AioServerHandler serverHandler) {
  56.         AioServer.clientCount++;
  57.         System.out.println("连接的客户端数: " + AioServer.clientCount);
  58.         //每完成一次serverChannel.accept连接之后, 需要重新再一次注册监听, 让别的客户端也可以连接
  59.         serverHandler.serverChannel.accept(serverHandler,this);
  60.         ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  61.       
  62.         //异步读, read方法的参数如下:
  63.         //ByteBuffer dst: 接收缓冲区, 用于从异步Channel中读取数据包;
  64.         //A attachment: 异步Channel携带的附件, 通知回调的时候作为入参使用;
  65.         //CompletionHandler<Integer,? super A>: 系统回调的业务handler, 进行读操作;
  66.         channel.read(readBuffer, readBuffer, new AioReadHandler(channel));
  67.     }
  68.     @Override
  69.     public void failed(Throwable exc, AioServerHandler serverHandler) {
  70.         exc.printStackTrace();
  71.         serverHandler.latch.countDown();
  72.     }
  73. }
  74. //3.channel.read读取客户端具体的数据,也是异步的
  75. //下面代码使用一个内部类实现CompletionHandler来向客户端写数据
  76. //类说明: 读数据的处理器
  77. public class AioReadHandler implements CompletionHandler<Integer, ByteBuffer> {
  78.     private AsynchronousSocketChannel channel;
  79.     public AioReadHandler(AsynchronousSocketChannel channel) {
  80.         this.channel = channel;
  81.     }
  82.     //读取到消息后的处理
  83.     @Override
  84.     public void completed(Integer result, ByteBuffer attachment) {
  85.         //如果条件成立, 说明客户端主动终止了TCP套接字, 这时服务端终止就可以了
  86.         if (result == -1) {
  87.             try {
  88.                 channel.close();
  89.             } catch (IOException e) {
  90.                 e.printStackTrace();
  91.             }
  92.             return;
  93.         }
  94.         //flip操作
  95.         attachment.flip();
  96.         byte[] message = new byte[attachment.remaining()];
  97.         attachment.get(message);
  98.         try {
  99.             System.out.println(result);
  100.             String msg = new String(message,"UTF-8");
  101.             System.out.println("服务端读取到客户端的消息是: " + msg);
  102.             String responseStr = response(msg);
  103.             //向客户端发送消息
  104.             doWrite(responseStr);
  105.         } catch (Exception e) {
  106.             e.printStackTrace();
  107.         }
  108.     }
  109.     //向客户端发送消息
  110.     private void doWrite(String result) {
  111.         byte[] bytes = result.getBytes();
  112.         ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  113.         writeBuffer.put(bytes);
  114.         writeBuffer.flip();
  115.         //异步写数据
  116.         channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
  117.             @Override
  118.             public void completed(Integer result, ByteBuffer attachment) {
  119.                 if (attachment.hasRemaining()) {
  120.                     channel.write(attachment, attachment,this);
  121.                 } else {
  122.                     //读取客户端传回的数据
  123.                     ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  124.                     //异步读数据
  125.                     channel.read(readBuffer, readBuffer, new AioReadHandler(channel));
  126.                 }
  127.             }
  128.             @Override
  129.             public void failed(Throwable exc, ByteBuffer attachment) {
  130.                 try {
  131.                     channel.close();
  132.                 } catch (IOException e) {
  133.                     e.printStackTrace();
  134.                 }
  135.             }
  136.         });
  137.     }
  138.     @Override
  139.     public void failed(Throwable exc, ByteBuffer attachment) {
  140.         try {
  141.             this.channel.close();
  142.         } catch (IOException e) {
  143.             e.printStackTrace();
  144.         }
  145.     }
  146. }
复制代码
(5)AIO、BIO和NIO对比
说明一:客户端可以使用BIO,BIO编程最简单。但是BIO性能不行,无法应对高并发的情况,所以BIO适合客户端。
 
说明二:服务端通常不使用BIO,通常使用AIO或者NIO。但是这个AIO是个假的异步,因为Linux内核还不支持真正的异步IO,只有Windows下的IOCP才实现了异步IO。Linux下可以做到的异步IO效果也只是通过使用IO复用下的epoll模型,而IO复用实际上使用的还是同步IO,所以在Linux下进行网络编程常用的是NIO。
 
说明三:AIO从理论上讲性能最强,但编码最复杂,且Linux本身的实现并不优秀。所以一般在实际的Linux生产上,也很少使用AIO,主要是使用NIO。Netty就是基于NIO开发的。
 
5.NIO网络编程之Buffer
(1)Buffer的作用
Buffer的作用是方便读写通道(Channel)中的数据。首先数据是从通道(Channel)读入缓冲区,从缓冲区写入通道(Channel)的。应用程序发送数据时,会先将数据写入缓冲区,然后再通过通道发送缓冲区的数据。应用数据读取数据时,会先将数从通道中读到缓冲区,然后再读取缓冲区的数据。
 
缓冲区本质上是一块可以写入数据、可以读取数据的内存。这块内存被包装成NIO的Buffer对象,并提供了一组方法用来方便访问该块内存。所以Buffer的本质是一块可以写入数据、可以读取数据的内存。
 
(2)Buffer的重要属性
一.capacity
Buffer作为一个内存块有一个固定的大小值,叫capacity。我们只能往Buffer中写capacity个byte、long,char等类型。一旦Buffer满了,需要将其清空(读取数据或清除数据)才能继续写数据。
 
二.position
当往Buffer中写数据时,position表示当前的位置,position的初始值为0。当一个数据写到Buffer后,position会移动到下一个可插入数据的位置。所以position的最大值为capacity – 1。
 
当从Buffer中读取数据时,需要从某个特定的position位置读数据。如果将Buffer从写模式切换到读模式,那么position会被重置为0。当从Buffer的position处读到一个数据时,position会移动到下一个可读位置。
 
三.limit
在写模式下,Buffer的limit表示最多能往Buffer里写多少数据。在写模式下,Buffer的limit等于Buffer的capacity。
 
当Buffer从写模式切换到读模式时, limit表示最多能读到多少数据。因此,当Buffer切换到读模式时,limit会被设置成写模式下的position值。
5.png
(3)Buffer的分配
要想获得一个Buffer对象首先要进行分配,每一个Buffer类都有allocate()方法。可以在堆上分配,也可以在直接内存上分配。
  1. //分配一个capacity为48字节的ByteBuffer的例子
  2. ByteBuffer buf = ByteBuffer.allocate(48);
  3. //分配一个capacity为1024个字符的CharBuffer的例子
  4. CharBuffer buf = CharBuffer.allocate(1024);
复制代码
一般建议使用在堆上分配。如果应用偏计算,就用堆上分配。如果应用偏网络通讯频繁,就用直接内存。
 
wrap()方法可以把一个byte数组或byte数组的一部分包装成ByteBuffer对象。
  1. ByteBuffer wrap(byte [] array);
  2. ByteBuffer wrap(byte [] array, int offset, int length);
复制代码
Buffer分配的例子:
  1. //类说明: Buffer的分配
  2. public class AllocateBuffer {
  3.     //输出结果如下:
  4.     //----------Test allocate--------
  5.     //before allocate, 虚拟机可用的内存大小: 253386384
  6.     //buffer = java.nio.HeapByteBuffer[pos=0 lim=102400000 cap=102400000]
  7.     //after allocate, 虚拟机可用的内存大小: 150986368
  8.     //directBuffer = java.nio.DirectByteBuffer[pos=0 lim=102400000 cap=102400000]
  9.     //after direct allocate, 虚拟机可用的内存大小: 150986368
  10.     //----------Test wrap--------
  11.     //java.nio.HeapByteBuffer[pos=0 lim=32 cap=32]
  12.     //java.nio.HeapByteBuffer[pos=10 lim=20 cap=32]
  13.     public static void main(String[] args) {
  14.         System.out.println("----------Test allocate--------");
  15.         System.out.println("before allocate, 虚拟机可用的内存大小: " + Runtime.getRuntime().freeMemory());
  16.         //堆上分配
  17.         ByteBuffer buffer = ByteBuffer.allocate(102400000);
  18.         System.out.println("buffer = " + buffer);
  19.         System.out.println("after allocate, 虚拟机可用的内存大小: " + Runtime.getRuntime().freeMemory());
  20.         //这部分用的直接内存
  21.         ByteBuffer directBuffer = ByteBuffer.allocateDirect(102400000);
  22.         System.out.println("directBuffer = " + directBuffer);
  23.         System.out.println("after direct allocate, 虚拟机可用的内存大小: " + Runtime.getRuntime().freeMemory());
  24.         System.out.println("----------Test wrap--------");
  25.         byte[] bytes = new byte[32];
  26.         buffer = ByteBuffer.wrap(bytes);
  27.         System.out.println(buffer);
  28.         buffer = ByteBuffer.wrap(bytes, 10, 10);
  29.         System.out.println(buffer);
  30.     }
  31. }
复制代码
(4)Buffer的读写
一.向Buffer中写数据
将数据写到Buffer有两种方式:
方式一:从Channel读出数据写到Buffer
方式二:通过Buffer的put()方法将数据写到Buffer
  1. //从Channel写到Buffer的例子
  2. int bytesRead = inChannel.read(buf);//从channel读出数据写到buffer
  3. //通过put()方法将数据写到Buffer的例子
  4. buf.put(127);
复制代码
二.从Buffer中读取数据
从Buffer中读取数据有两种方式:
方式一:从Buffer中读取数据写入到Channel
方式二:使用get()方法从Buffer中读取数据
  1. //从Buffer读取数据到Channel的例子
  2. int bytesWritten = inChannel.write(buf);
  3. //使用get()方法从Buffer中读取数据的例子
  4. byte aByte = buf.get();
复制代码
三.使用Buffer读写数据常见步骤
步骤一:写入数据到Buffer
步骤二:调用flip()方法
步骤三:从Buffer中读取数据
步骤四:调用clear()方法或compact()方法
 
flip()方法会将Buffer从写模式切换到读模式,调用flip()方法会将position设回0,并将limit设置成之前的position值。
 
当向buffer写入数据时,buffer会记录下写了多少数据。一旦要读取数据,需要通过flip()方法将Buffer从写模式切换到读模式。在读模式下,可以读取之前写入到buffer的所有数据。一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。
 
有两种方式能清空缓冲区:调用clear()方法或compact()方法。clear()方法会清空整个缓冲区,compact()方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。
 
四.其他常用操作
操作一:rewind()方法
Buffer.rewind()将position设回0,所以可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少个元素(byte、char等)。
 
操作二:clear()与compact()方法
一旦读取完Buffer中的数据,需要让Buffer准备好再次被写入,这时候可以通过clear()方法或compact()方法来完成。
 
如果调用的是clear()方法,position将被设为0,limit被设为capacity的值。此时Buffer被认为是清空了,但是Buffer中的数据并未清除,只是这些标记能告诉我们可以从哪里开始往Buffer里写数据。
 
如果Buffer中有一些未读的数据,调用clear()方法,数据将被遗忘。意味着不再有任何标记会告诉你哪些数据被读过,哪些还没有。
 
如果Buffer中仍有未读的数据,且后续还需要这些数据,但是此时想要先写些数据,那么可以使用compact()方法。compact()方法会将所有未读的数据拷贝到Buffer起始处,然后将position设到最后一个未读元素正后面,limit属性依然像clear()方法一样设置成capacity。现在Buffer准备好写数据了,但是不会覆盖未读的数据。
 
操作三:mark()与reset()方法
通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position,之后可以通过调用Buffer.reset()方法恢复到这个position。
  1. //类说明: Buffer方法演示
  2. public class BufferMethod {
  3.     public static void main(String[] args) {
  4.         System.out.println("------Test get-------------");
  5.         ByteBuffer buffer = ByteBuffer.allocate(32);
  6.         buffer
  7.             .put((byte) 'a')//0
  8.             .put((byte) 'b')//1
  9.             .put((byte) 'c')//2
  10.             .put((byte) 'd')//3
  11.             .put((byte) 'e')//4
  12.             .put((byte) 'f');//5
  13.         //before flip()java.nio.HeapByteBuffer[pos=6 lim=32 cap=32]
  14.         System.out.println("before flip()" + buffer);
  15.         //转换为读取模式: pos置为0, lim置为转换前pos的值
  16.         buffer.flip();
  17.         //before get():java.nio.HeapByteBuffer[pos=0 lim=6 cap=32]
  18.         System.out.println("before get():" + buffer);
  19.         //get()会影响position的位置, 这是相对取;
  20.         System.out.println((char) buffer.get());
  21.         //after get():java.nio.HeapByteBuffer[pos=1 lim=6 cap=32]
  22.         System.out.println("after get():" + buffer);
  23.         //get(index)不影响position的值, 这是绝对取;
  24.         System.out.println((char) buffer.get(2));
  25.         //after get(index):java.nio.HeapByteBuffer[pos=1 lim=6 cap=32]
  26.         System.out.println("after get(index):" + buffer);
  27.         byte[] dst = new byte[10];
  28.         //position移动两位
  29.         buffer.get(dst, 0, 2);
  30.         //after get(dst, 0, 2):java.nio.HeapByteBuffer[pos=3 lim=6 cap=32]
  31.         System.out.println("after get(dst, 0, 2):" + buffer);
  32.         System.out.println("dst:" + new String(dst));//dst:bc
  33.         System.out.println("--------Test put-------");
  34.         ByteBuffer bb = ByteBuffer.allocate(32);
  35.         //before put(byte):java.nio.HeapByteBuffer[pos=0 lim=32 cap=32]
  36.         System.out.println("before put(byte):" + bb);
  37.         //put()不带索引会改变pos, after put(byte):java.nio.HeapByteBuffer[pos=1 lim=32 cap=32]
  38.         System.out.println("after put(byte):" + bb.put((byte) 'z'));
  39.         //put(2,(byte) 'c')不改变position的位置
  40.         bb.put(2, (byte) 'c');
  41.         //after put(2,(byte) 'c'):java.nio.HeapByteBuffer[pos=1 lim=32 cap=32]
  42.         System.out.println("after put(2,(byte) 'c'):" + bb);
  43.         System.out.println(new String(bb.array()));
  44.         //这里的buffer是abcdef[pos=3 lim=6 cap=32]
  45.         bb.put(buffer);
  46.         //after put(buffer):java.nio.HeapByteBuffer[pos=4 lim=32 cap=32]
  47.         System.out.println("after put(buffer):" + bb);
  48.         System.out.println(new String(bb.array()));
  49.         System.out.println("--------Test reset----------");
  50.         buffer = ByteBuffer.allocate(20);
  51.         System.out.println("buffer = " + buffer);
  52.         buffer.clear();
  53.         buffer.position(5);//移动position到5
  54.         buffer.mark();//记录当前position的位置
  55.         buffer.position(10);//移动position到10
  56.         System.out.println("before reset:" + buffer);
  57.         buffer.reset();//复位position到记录的地址
  58.         System.out.println("after reset:" + buffer);
  59.         System.out.println("--------Test rewind--------");
  60.         buffer.clear();
  61.         buffer.position(10);//移动position到10
  62.         buffer.limit(15);//限定最大可写入的位置为15
  63.         System.out.println("before rewind:" + buffer);
  64.         buffer.rewind();//将position设回0
  65.         System.out.println("before rewind:" + buffer);
  66.         System.out.println("--------Test compact--------");
  67.         buffer.clear();
  68.         //放入4个字节,position移动到下个可写入的位置,也就是4
  69.         buffer.put("abcd".getBytes());
  70.         System.out.println("before compact:" + buffer);
  71.         System.out.println(new String(buffer.array()));
  72.         buffer.flip();//将position设回0,并将limit设置成之前position的值
  73.         System.out.println("after flip:" + buffer);
  74.         
  75.         //从Buffer中读取数据的例子,每读一次,position移动一次
  76.         System.out.println((char) buffer.get());
  77.         System.out.println((char) buffer.get());
  78.         System.out.println((char) buffer.get());
  79.         System.out.println("after three gets:" + buffer);
  80.         System.out.println(new String(buffer.array()));
  81.         
  82.         //compact()方法将所有未读的数据拷贝到Buffer起始处
  83.         //然后将position设到最后一个未读元素正后面
  84.         buffer.compact();
  85.         System.out.println("after compact:" + buffer);
  86.         System.out.println(new String(buffer.array()));
  87.     }
  88. }
复制代码
(5)Buffer常用方法总结
6.png
 
6.NIO网络编程之实战
(1)Selector
(2)Channels
(3)SelectionKey
(4)NIO的开发流程
(5)NIO的开发例子
 
(1)Selector
Selector的含义是选择器,也可以称为轮询代理器、事件订阅器。Selector的作用是注册事件和对Channel进行管理。
 
应用程序可以向Selector对象注册它关注的Channel,以及具体的某一个Channel会对哪些IO事件感兴趣,Selector中会维护一个已经注册的Channel的容器。
 
(2)Channels
Channel可以和操作系统进行内容传递,应用程序可以通过Channel读数据,也可以通过Channel向操作系统写数据,当然写数据和读数据都要通过Buffer来实现。
 
所有被Selector注册的Channel都是继承SelectableChannel的子类,通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。ScoketChannel和ServerSocketChannel都是SelectableChannel类的子类。
 
(3)SelectionKey
NIO中的SelectionKey共定义了四种事件类型:OP_ACCEPT、OP_READ、OP_WRITE、OP_CONNECT,分别对应接受连接、读、写、请求连接的网络Socket操作。
 
ServerSocketChannel和SocketChannel可以注册自己感兴趣的操作类型,当对应操作类型的就绪条件满足时操作系统就会通知这些Channel。
 
每个操作类型(事件类型)的就绪条件:
7.png
(4)NIO的开发流程
步骤一:服务端启动ServerSocketChannel,关注OP_ACCEPT事件。
 
步骤二:客户端启动SocketChannel,连接服务端,关注OP_CONNECT事件。
 
步骤三:服务端接受连接,然后启动一个SocketChannel。该SocketChannel可以关注OP_READ、OP_WRITE事件,一般连接建立后会直接关注OP_READ事件。
 
步骤四:客户端的SocketChannel发现连接建立后,关注OP_READ、OP_WRITE事件,一般客户端需要发送数据了才能关注OP_READ事件。
 
步骤五:连接建立后,客户端与服务器端开始相互发送消息(读写),然后根据实际情况来关注OP_READ、OP_WRITE事件。
8.png
(5)NIO的开发例子
一.客户端的代码
  1. //类说明: NIO通信的客户端
  2. public class NioClient {
  3.     private static NioClientHandle nioClientHandle;
  4.     public static void start() {
  5.         if (nioClientHandle != null) {
  6.             nioClientHandle.stop();
  7.         }
  8.         nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP, DEFAULT_PORT);
  9.         new Thread(nioClientHandle, "Client").start();
  10.     }
  11.     //向服务器发送消息
  12.     public static boolean sendMsg(String msg) throws Exception {
  13.         nioClientHandle.sendMsg(msg);
  14.         return true;
  15.     }
  16.     public static void main(String[] args) throws Exception {
  17.         start();
  18.         Scanner scanner = new Scanner(System.in);
  19.         while(NioClient.sendMsg(scanner.next()));
  20.     }
  21. }
  22. //类说明: NIO通信的客户端处理器
  23. public class NioClientHandle implements Runnable {
  24.     private String host;
  25.     private int port;
  26.     private Selector selector;
  27.     private SocketChannel socketChannel;
  28.     private volatile boolean started;
  29.     public NioClientHandle(String ip, int port) {
  30.         this.host = ip;
  31.         this.port = port;
  32.         try {
  33.             //创建选择器
  34.             selector = Selector.open();
  35.             //打开通道
  36.             socketChannel = SocketChannel.open();
  37.             //如果为true, 则此通道将被置于阻塞模式; 如果为false, 则此通道将被置于非阻塞模式;
  38.             //另外, IO复用本身就是非阻塞模式, 所以设置false;
  39.             socketChannel.configureBlocking(false);
  40.             //表示连接已经打开
  41.             started = true;
  42.         } catch (IOException e) {
  43.             e.printStackTrace();
  44.         }
  45.     }
  46.     public void stop() {
  47.         started = false;
  48.     }
  49.     private void doConnect() throws IOException {
  50.         //关于socketChannel.connect方法的说明:
  51.         //如果此通道处于非阻塞模式, 则调用此方法将启动一个非阻塞连接的操作;
  52.         //如果该连接建立得非常快, 就像本地连接可能发生的那样, 则此方法返回true;
  53.         //否则, 此方法返回false, 稍后必须通过调用finishConnect方法完成连接操作
  54.         
  55.         //如果成功连接则什么都不做
  56.         if (socketChannel.connect(new InetSocketAddress(host, port))) {
  57.         } else {
  58.             //连接还未完成, 所以注册连接就绪事件, 向selector表示关注这个事件
  59.             socketChannel.register(selector, SelectionKey.OP_CONNECT);
  60.         }
  61.     }
  62.     @Override
  63.     public void run() {
  64.         try {
  65.             //发起连接
  66.             doConnect();
  67.         } catch (IOException e) {
  68.             e.printStackTrace();
  69.             System.exit(1);
  70.         }
  71.         //循环遍历selector
  72.         while(started) {
  73.             try {
  74.                 //selector.select()会阻塞, 只有当至少一个注册的事件发生的时候才会继续
  75.                 selector.select();
  76.                 //获取当前有哪些事件可以使用
  77.                 Set<SelectionKey> keys = selector.selectedKeys();
  78.                 //转换为迭代器
  79.                 Iterator<SelectionKey> it = keys.iterator();
  80.                 SelectionKey key = null;
  81.                 while(it.hasNext()) {
  82.                     key = it.next();
  83.                     it.remove();//拿到key后从迭代器移除
  84.                     try {
  85.                         handleInput(key);
  86.                     } catch (IOException e) {
  87.                         e.printStackTrace();
  88.                         if (key != null) {
  89.                             key.cancel();
  90.                             if (key.channel() != null) {
  91.                                 key.channel().close();
  92.                             }
  93.                         }
  94.                     }
  95.                 }
  96.             } catch (IOException e) {
  97.                 e.printStackTrace();
  98.             }
  99.         }
  100.         //selector关闭后会自动释放里面管理的资源
  101.         if (selector != null) {
  102.             try {
  103.                 selector.close();
  104.             } catch (IOException e) {
  105.                 e.printStackTrace();
  106.             }
  107.         }
  108.     }
  109.     //具体的事件处理方法
  110.     private void handleInput(SelectionKey key) throws IOException {
  111.         if (key.isValid()) {
  112.             //根据SelectionKey获得关心当前事件的channel
  113.             SocketChannel sc = (SocketChannel)key.channel();
  114.             //处理连接事件
  115.             if (key.isConnectable()) {
  116.                 if (sc.finishConnect()) {
  117.                     socketChannel.register(selector, SelectionKey.OP_READ);
  118.                 } else{
  119.                     System.exit(1);
  120.                 }
  121.             }
  122.             //有数据可读事件
  123.             if (key.isReadable()) {
  124.                 //创建ByteBuffer, 并开辟一个1M的缓冲区
  125.                 ByteBuffer buffer = ByteBuffer.allocate(1024);
  126.                 //读取请求码流, 返回读取到的字节数, 从channel读出来并写到buffer里去
  127.                 int readBytes = sc.read(buffer);
  128.                 //读取到字节,对字节进行编解码
  129.                 if (readBytes > 0) {
  130.                     //将缓冲区当前的limit设置为position,position=0,
  131.                     //用于后续对缓冲区的读取操作
  132.                     buffer.flip();
  133.                     //根据缓冲区可读字节数创建字节数组
  134.                     byte[] bytes = new byte[buffer.remaining()];
  135.                     //将缓冲区可读字节数组复制到新建的数组中
  136.                     buffer.get(bytes);
  137.                     String result = new String(bytes,"UTF-8");
  138.                     System.out.println("accept message: " + result);
  139.                 } else if (readBytes < 0) {
  140.                     key.cancel();
  141.                     sc.close();
  142.                 }
  143.             }
  144.         }
  145.     }
  146.     //发送消息
  147.     private void doWrite(SocketChannel channel, String request) throws IOException {
  148.         //将消息编码为字节数组
  149.         byte[] bytes = request.getBytes();
  150.         //根据数组容量创建ByteBuffer
  151.         ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  152.         //将字节数组复制到缓冲区
  153.         writeBuffer.put(bytes);
  154.         //flip操作
  155.         writeBuffer.flip();
  156.         //发送缓冲区的字节数组
  157.         channel.write(writeBuffer);
  158.     }
  159.     //写数据对外暴露的API
  160.     public void sendMsg(String msg) throws Exception {
  161.         socketChannel.register(selector, SelectionKey.OP_READ);
  162.         doWrite(socketChannel, msg);
  163.     }
  164. }
复制代码
二.服务端的代码
  1. //类说明: NIO通信的服务端
  2. public class NioServer {
  3.     private static NioServerHandle nioServerHandle;
  4.     public static void start() {
  5.         if (nioServerHandle != null) {
  6.             nioServerHandle.stop();
  7.         }
  8.         nioServerHandle = new NioServerHandle(DEFAULT_PORT);
  9.         new Thread(nioServerHandle,"Server").start();
  10.     }
  11.     public static void main(String[] args) {
  12.         start();
  13.     }
  14. }
  15. //类说明: NIO通信的服务端处理器
  16. public class NioServerHandle implements Runnable {
  17.     private Selector selector;
  18.     private ServerSocketChannel serverChannel;
  19.     private volatile boolean started;
  20.     //构造方法
  21.     public NioServerHandle(int port) {
  22.         try {
  23.             //创建选择器
  24.             selector = Selector.open();
  25.             //打开通道
  26.             serverChannel = ServerSocketChannel.open();
  27.             //如果为true, 则此通道将被置于阻塞模式; 如果为false, 则此通道将被置于非阻塞模式;
  28.             //另外, IO复用本身就是非阻塞模式, 所以设置false;
  29.             serverChannel.configureBlocking(false);
  30.             //指定监听的端口
  31.             serverChannel.socket().bind(new InetSocketAddress(port));
  32.             //注册服务端关心的事件: 连接事件
  33.             serverChannel.register(selector, SelectionKey.OP_ACCEPT);
  34.             //表示连接已经打开
  35.             started = true;
  36.             System.out.println("服务器已启动, 端口号: " + port);
  37.         } catch (IOException e) {
  38.             e.printStackTrace();
  39.         }
  40.     }
  41.     public void stop() {
  42.         started = false;
  43.     }
  44.     @Override
  45.     public void run() {
  46.         //循环遍历selector
  47.         while(started) {
  48.             try {
  49.                 //阻塞, 只有当至少一个注册的事件发生的时候才会继续
  50.                 selector.select();
  51.                 Set<SelectionKey> keys = selector.selectedKeys();
  52.                 Iterator<SelectionKey> it = keys.iterator();
  53.                 SelectionKey key = null;
  54.                 while(it.hasNext()) {
  55.                     key = it.next();
  56.                     it.remove();
  57.                     try {
  58.                         handleInput(key);
  59.                     } catch(Exception e) {
  60.                         if (key != null) {
  61.                             key.cancel();
  62.                             if (key.channel() != null) {
  63.                                 key.channel().close();
  64.                             }
  65.                         }
  66.                     }
  67.                 }
  68.             } catch(Throwable t) {
  69.                 t.printStackTrace();
  70.             }
  71.         }
  72.         //selector关闭后会自动释放里面管理的资源
  73.         if (selector != null) {
  74.             try {
  75.                 selector.close();
  76.             } catch (Exception e) {
  77.                 e.printStackTrace();
  78.             }
  79.         }
  80.     }
  81.     private void handleInput(SelectionKey key) throws IOException {
  82.         if (key.isValid()) {
  83.             //处理新接入的请求消息
  84.             if (key.isAcceptable()) {
  85.                 //获得关心当前事件的channel
  86.                 ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
  87.                 //通过ServerSocketChannel的accept创建SocketChannel实例
  88.                 //完成该操作意味着完成TCP三次握手, TCP物理链路正式建立
  89.                 SocketChannel sc = ssc.accept();
  90.                 System.out.println("======socket channel 建立连接");
  91.                 //设置为非阻塞的
  92.                 sc.configureBlocking(false);
  93.                 //连接已经完成了, 可以开始关心读事件了
  94.                 sc.register(selector, SelectionKey.OP_READ);
  95.             }
  96.             //读消息
  97.             if (key.isReadable()) {
  98.                 System.out.println("======socket channel 数据准备完成, " + "可以去读==读取=======");
  99.                 SocketChannel sc = (SocketChannel) key.channel();
  100.                 //创建ByteBuffer, 并开辟一个1M的缓冲区
  101.                 ByteBuffer buffer = ByteBuffer.allocate(1024);
  102.                 //读取请求码流, 返回读取到的字节数
  103.                 int readBytes = sc.read(buffer);
  104.                 //读取到字节, 对字节进行编解码
  105.                 if (readBytes > 0) {
  106.                     //将缓冲区当前的limit设置为position=0, 用于后续对缓冲区的读取操作
  107.                     buffer.flip();
  108.                     //根据缓冲区可读字节数创建字节数组
  109.                     byte[] bytes = new byte[buffer.remaining()];
  110.                     //将缓冲区可读字节数组复制到新建的数组中
  111.                     buffer.get(bytes);
  112.                     String message = new String(bytes,"UTF-8");
  113.                     System.out.println("服务器收到消息: " + message);
  114.                     //处理数据
  115.                     String result = response(message) ;
  116.                     //发送应答消息
  117.                     doWrite(sc, result);
  118.                 } else if (readBytes < 0) {
  119.                     //链路已经关闭, 释放资源
  120.                     key.cancel();
  121.                     sc.close();
  122.                 }
  123.             }
  124.         }
  125.     }
  126.     //发送应答消息
  127.     private void doWrite(SocketChannel channel,String response) throws IOException {
  128.         //将消息编码为字节数组
  129.         byte[] bytes = response.getBytes();
  130.         //根据数组容量创建ByteBuffer
  131.         ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  132.         //将字节数组复制到缓冲区
  133.         writeBuffer.put(bytes);
  134.         //flip操作
  135.         writeBuffer.flip();
  136.         //发送缓冲区的字节数组
  137.         channel.write(writeBuffer);
  138.     }
  139. }
复制代码
 
6.NIO网络编程之Reactor模式
(1)单线程的Reactor模式
(2)单线程Reactor模式的改进
(3)多线程的Reactor模式
 
(1)单线程的Reactor模式
9.png
一.单线程Reactor模式的流程
首先服务端的Reactor其实是一个线程对象。Reactor会启动事件循环,并使用Selector(选择器)来实现IO多路复用。
 
然后服务端启动时会注册一个Acceptor事件处理器到Reactor中。这个Acceptor事件处理器会关注accept事件,这样Reactor监听到accept事件就会交给Acceptor事件处理器进行处理了。
 
当客户端向服务器端发起一个连接请求时,Reactor就会监听到一个accept事件,于是会将该accept事件派发给Acceptor处理器进行处理。
 
接着Acceptor处理器通过accept()方法便能得到这个客户端对应的连接(SocketChannel),然后将该连接(SocketChannel)所关注的read事件及对应的read事件处理器注册到Reactor中,这样Reactor监听到该连接的read事件就会交给对应的read事件处理器进行处理。
 
当Reactor监听到客户端的连接(SocketChannel)有读写事件发生时,就会将读写事件派发给对应的读写事件处理器进行处理。比如读事件处理器会通过SocketChannel的read()方法读取数据,此时的read()方法可以直接读取到数据,不需要阻塞等待可读数据的到来。
 
每当Acceptor处理器和读写事件处理器处理完所有就绪的感兴趣的IO事件后,Reactor线程会再次执行select()方法阻塞等待新的事件就绪并将其分派给对应处理器进行处理。
 
二.单线程Reactor模式的问题
注意:单线程的Reactor模式中的单线程主要是针对IO操作而言的,也就是所有的IO的accept、read、write、connect操作都在一个线程上完成。
 
由于在单线程Reactor模式中,不仅IO操作在Reactor线程上,而且非IO的业务操作也在Reactor线程上进行处理,这会大大降低IO请求的响应。所以应将非IO的业务操作从Reactor线程上剥离,以提高Reactor线程对IO请求的响应。
 
(2)单线程Reactor模式的改进
10.png
一.增加工作线程池来进行改进
为了改善单线程Reactor模式中Reactor线程还要处理业务逻辑,可以添加一个工作线程池。将非IO操作(解码、计算、编码)从Reactor线程中移出,然后转交给这个工作线程池来执行。所有IO操作依旧由单个Reactor线程来完成,如IO的accept、read、write以及connect操作。这样就能提高Reactor线程的IO响应,避免因耗时的业务逻辑而降低对后续IO请求的处理。
 
二.使用线程池的好处
合理使用线程池,可以带来很多好处:
好处一:减少频繁创建和销毁线程的性能开销
好处二:重复利用线程,避免对每个任务都创建线程,可以提高响应速度
好处三:合理设置线程池的大小,可以避免因为线程池过大影响性能
 
三.单线程Reactor不适合高并发场景
对于一些小容量的应用场景,可以使用单线程Reactor模型,但是对于一些高负载、大并发或大数据量的应用场景却不合适。
 
原因一:一个NIO线程同时处理成百上千的链路,性能上无法支撑。即便NIO线程的CPU负荷达到100%,也无法满足海量消息的读取和发送。
 
原因二:当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时。超时之后往往会进行重发,这更加重了NIO线程的负载。最终会导致大量消息积压和处理超时,成为系统的性能瓶颈。
 
(3)多线程的Reactor模式
11.png
一.多线程的Reactor模式介绍
在多线程的Reactor模式下,存在一个Reactor线程池,Reactor线程池里的每一个Reactor线程都有自己的Selector和事件分发逻辑。
 
Reactor线程池里的主反应器线程mainReactor可以只有一个,但子反应器线程subReactor一般会有多个,通常subReactor也是一个线程池。
 
主反应器线程mainReactor主要负责接收客户端的连接请求,然后将接收到的SocketChannel传递给子反应器线程subReactor,由subReactor来完成和客户端的通信。
 
二.多线程的Reactor模式流程
首先服务端的Reactor变成了多个线程对象,分为mainReactor和subReactor。这些Reactor对象也会启动事件循环,并使用Selector(选择器)来实现IO多路复用。
 
然后服务端在启动时会注册一个Acceptor事件处理器到mainReactor中。这个Acceptor事件处理器会关注accept事件,这样mainReactor监听到accept事件就会交给Acceptor事件处理器进行处理。
 
当客户端向服务端发起一个连接请求时,mainReactor就会监听到一个accept事件,于是就会将这个accept事件派发给Acceptor处理器来进行处理。
 
接着Acceptor处理器通过accept()方法便能得到这个客户端对应的连接(SocketChannel),然后将这个连接SocketChannel传递给subReactor线程池进行处理。
 
subReactor线程池会分配一个subReactor线程给这个SocketChannel,并将SocketChannel关注的read事件及对应的read事件处理器注册到这个subReactor线程中。当然也会注册关注的write事件及write事件处理器到subReactor线程中以完成IO写操作。
 
总之,Reactor线程池中的每一Reactor线程都会有自己的Selector和事件分发逻辑。当有IO事件就绪时,相关的subReactor就将事件派发给响应的处理器处理。
 
注意,这里subReactor线程只负责完成IO的read()操作,在读取到数据后将业务逻辑的处理放入到工作线程池中完成。若完成业务逻辑后需要返回数据给客户端,则IO的write()操作还是会被提交回subReactor线程来完成。这样所有的IO操作依旧在Reactor线程(mainReactor或subReactor)中完成,而工作线程池仅用来处理非IO操作的逻辑。
 
多Reactor线程模式将"接收客户端的连接请求"和"与该客户端进行读写通信"分在了两个Reactor线程来完成。mainReactor完成接收客户端连接请求的操作,它不负责与客户端的通信,而是将建立好的连接转交给subReactor线程来完成与客户端的通信,这样就不会因为read()操作的数据量太大而导致后面的客户端连接请求得不到及时处理。并且多Reactor线程模式在海量的客户端并发请求的情况下,还可以通过subReactor线程池将海量的连接分发给多个subReactor线程,在多核的操作系统中这能大大提升应用的负载和吞吐量。
 
Netty服务端就是使用了多线程的Reactor模式。
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册