BIO、NIO、AIO详解

Java的I/O演进之路

I/O模型基本说明

I/O模型:就是用什么样的通道或者说是通信模型和架构进行数据传输和接收,很大程度上决定了程序通信的性能,Java共支持三种网络编程的I/O模型:BIO、NIO、AIO。
实际通信需求下,要根据不同的业务场景和性能需求决定不同的I/O模型。

I/O模型

BIO

同步阻塞式,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务端就需要启动一个进程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。

NIO

同步非阻塞式,服务器实现模式为一个线程处理多个请求,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。

AIO

异步非阻塞,服务器实现模式一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理,一般适用于连接数较多且连接数时间较长的应用。

BIO、NIO、AIO使用场景分析

BIO方式适用于连接数目较小且固定的架构、这种方式对服务器资源要求较高,并发局限于应用中,JDK1.4以前的唯一选择。

NIO方式使用于连接数目多且连接比较短的架构,比如聊天服务器,弹幕系统,服务器间通讯等。JDK1.4开始

AIO方式适用于连接数目多且连接比较长的架构,比如相册服务器,充分利用OS参与并发操作。JDK7开始支持。

BIO深入理解

Java BIO就是传统的Java IO编程,相关接口和类在java.io中

BIO同步阻塞式,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务端就需要启动一个进程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。

Client客户端:

通过Socket对象请求与服务端的连接;从Socket中得到字节输入或者是字节输出流进行数据的读写操作

Server服务端:

通过ServerSocket注册端口;服务端通过调用accept方法用于监听客户端的Socket请求;从Socket中获取字节输入或者字节输出流进行数据的读写操作。

传统BIO编程实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package cn.xiaohupao;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;

/**
* @Author: xiaohupao
* @Date: 2021/10/9 21:42
*/
public class Server {
public static void main(String[] args) {
try {
//定义一个ServerSock对象进行服务器的端口注册
ServerSocket ss = new ServerSocket(2576);

//监听客户端的Socket连接请求
Socket socket = ss.accept();

//从socket管道中得到一个字节输入流对象
InputStream is = socket.getInputStream();

//把字节输入流包装成缓冲字符输入流
BufferedReader br = new BufferedReader(new InputStreamReader(is));

String msg;
while ((msg = br.readLine()) != null){
System.out.println("服务端接收到:" + msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package cn.xiaohupao;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;

/**
* @Author: xiaohupao
* @Date: 2021/10/9 21:51
*/
public class Client {
public static void main(String[] args) {
try {
//创建一个Socket对象请求服务端的连接
Socket socket = new Socket("127.0.0.1", 2576);

//从Socket对象中获取一个字节输出流
OutputStream os = socket.getOutputStream();

//把字节输出流包装成一个打印流
PrintStream ps = new PrintStream(os);

ps.println("hello World!");
ps.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}

以上通信中,服务端会一直等待客户端消息,如果客户端没有消息的发送,服务端将一直进入阻塞状态。同时服务端是按照行来获取消息的,这意味着客户端也必须按照行进行消息的发送,否则服务端将进入等待消息的阻塞状态。

多发多收的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package cn.xiaohupao.two;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;

/**
* @Author: xiaohupao
* @Date: 2021/10/9 21:51
*/
public class Client {
public static void main(String[] args) {
try {
//创建一个Socket对象请求服务端的连接
Socket socket = new Socket("127.0.0.1", 2576);

//从Socket对象中获取一个字节输出流
OutputStream os = socket.getOutputStream();

//把字节输出流包装成一个打印流
PrintStream ps = new PrintStream(os);

Scanner sc = new Scanner(System.in);
while(true){
System.out.println("请发送消息");
String msg = sc.nextLine();
ps.println(msg);
ps.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.xiaohupao.two;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

/**
* @Author: xiaohupao
* @Date: 2021/10/9 21:42
* 客户端多发送消息,服务端多 接收消息
*/
public class Server {
public static void main(String[] args) {
try {
//定义一个ServerSock对象进行服务器的端口注册
ServerSocket ss = new ServerSocket(2576);

//监听客户端的Socket连接请求
Socket socket = ss.accept();

//从socket管道中得到一个字节输入流对象
InputStream is = socket.getInputStream();

//把字节输入流包装成缓冲字符输入流
BufferedReader br = new BufferedReader(new InputStreamReader(is));

String msg;
while ((msg = br.readLine()) != null){
System.out.println("服务端接收到:" + msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

BIO模式下接收多个客户端

上面的案例中一个服务端只能接收一个客户端的通信请求,如果服务端需要处理很多个客户端的消息通信请求应该如何处理呢,此时我们就需要在服务端引入线程,也就是说客户端每发起一个请求,服务端就创建一个新的线程来处理这个客户端的请求,这样就实现了一个客户端一个线程的模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package cn.xiaohupao.three;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
* @Author: xiaohupao
* @Date: 2021/10/12 9:56
* 实现服务端可以同时接收多个客户端的Socket通信需求
*/
public class Server {
public static void main(String[] args) {
try {
//注册端口
ServerSocket ss = new ServerSocket(2576);

//监听客户端的Socket请求
while (true){
Socket socket = ss.accept();

//创建一个线程与客户端的Socket通信需求
new ServerThreadReader(socket).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package cn.xiaohupao.three;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;

/**
* @Author: xiaohupao
* @Date: 2021/10/12 10:01
*/
public class ServerThreadReader extends Thread{

private Socket socket;

public ServerThreadReader(Socket socket){
this.socket = socket;
}
@Override
public void run() {
try {
//从Socket管道中得到一个字节输入流
InputStream is = socket.getInputStream();
//缓冲字符输入流
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
while ((msg = br.readLine()) != null){
System.out.println(msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package cn.xiaohupao.three;

import java.io.IOException;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;

/**
* @Author: xiaohupao
* @Date: 2021/10/12 10:10
*/
public class Client {
public static void main(String[] args) {
try {
//请求与服务端的Socket连接
Socket socket = new Socket("127.0.0.1", 2576);

//得到一个打印流
PrintStream ps = new PrintStream(socket.getOutputStream());

//发送消息
Scanner sc = new Scanner(System.in);
while (true){
System.out.print("请发送消息:");
String msg = sc.nextLine();
ps.println(msg);
ps.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
  • 每个Socket接收到,都会创建一个线程,线程的竞争,切换上下文影响性能;
  • 每个线程都会占用栈空间和CPU资源
  • 并不是每个Socket都进行IO操作,无意义的线程处理;
  • 客户端的并发访问增加的。服务都将呈现1:1线程开销,访问量越大,系统的将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死。

伪异步I/O编程

上述案例中:客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败。

我们采用一个伪异步I/O的通信框架,采用线程池和任务队列实现,当客户端接入时,将客户端的Socket封装成一个Task交给后端的线程池中进行处理。JDK的线程池维护一个消息队列和N个活跃的线程,对消息队列中的Socket任务进行处理,由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package cn.xiaohupao.four;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
* @Author: xiaohupao
* @Date: 2021/10/12 11:54
* 伪异步的服务端
*/
public class Server {
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket(2576);
HandlerSocketServerPool pools = new HandlerSocketServerPool(6, 10);
while (true){
Socket accept = ss.accept();

//将socket对象交给线程池进行处理
Runnable target = new ServerRunnableTarget(accept);
pools.execute(target);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package cn.xiaohupao.four;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;

/**
* @Author: xiaohupao
* @Date: 2021/10/12 12:07
*/
public class ServerRunnableTarget implements Runnable{

private Socket socket;

public ServerRunnableTarget(Socket socket){
this.socket = socket;
}

@Override
public void run() {
try {
InputStream is = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
while ((msg = br.readLine()) != null){
System.out.println("服务器接收到:" + msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package cn.xiaohupao.four;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* @Author: xiaohupao
* @Date: 2021/10/12 11:58
*/
public class HandlerSocketServerPool {

//创建一个线程池
private ExecutorService executorService;

public HandlerSocketServerPool(int maxNum, int queueSize){
executorService = new ThreadPoolExecutor(3, maxNum, 120, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(queueSize));
}

/**
* 提供一个方法来提交任务给线程池的任务队列来暂存,等着线程池来处理
*/
public void execute(Runnable target){
executorService.execute(target);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package cn.xiaohupao.four;

import java.io.IOException;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;

/**
* @Author: xiaohupao
* @Date: 2021/10/12 16:13
*/
public class Client {
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1", 2576);
PrintStream ps = new PrintStream(socket.getOutputStream());

Scanner sc = new Scanner(System.in);
while (true){
System.out.print("客户端发送:");
String msg = sc.nextLine();
ps.println(msg);
ps.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
  • 伪异步io采用线程池实现,因此避免了为每个请求创建一个独立线程造成线程资源耗尽的问题,但由于底层依然采用的同步阻塞模型,因此无法从根上解决问题。
  • 如果单个消息处理的缓慢,或者服务器线程池中的全部线程都被阻塞,那么后续socket的io消息都将在队列中排队。新的Socket请求将被拒绝,客户端会发生大量连接超时。

基于BIO形式下的文件上传

支持任意文件形式的上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package cn.xiaohupao.five;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
* @Author: xiaohupao
* @Date: 2021/10/12 18:15
*/
public class Server {
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket(2576);

while (true){
Socket socket = ss.accept();

new ServerReaderThread(socket).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package cn.xiaohupao.five;

import java.io.DataInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.UUID;

/**
* @Author: xiaohupao
* @Date: 2021/10/12 18:17
*/
public class ServerReaderThread extends Thread{

private Socket socket;

public ServerReaderThread(Socket socket){
this.socket = socket;
}

@Override
public void run() {
try {
//得到一个数据输入流获取客户端发来的数据
DataInputStream dis = new DataInputStream(socket.getInputStream());

//读取客户端发送过来的文件类型
String suffix = dis.readUTF();
System.out.println("服务端成功收到了" + suffix);
//定义一个字节输出管道负责把客户端发来的文件数据写出去
OutputStream os = new FileOutputStream("C:\\Users\\Wk'sPC\\Documents\\Server\\" + UUID.randomUUID().toString()+suffix);

byte[] buffer = new byte[1024];
int len;
while ((len = dis.read(buffer)) > 0){
os.write(buffer, 0, len);
}

os.close();
System.out.println("文件保存成功!");
} catch (IOException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package cn.xiaohupao.five;

import java.io.*;
import java.net.Socket;

/**
* @Author: xiaohupao
* @Date: 2021/10/12 17:54
* 实现客户端上传任意类型文件数据给服务端保存
*/
public class Client {
public static void main(String[] args) {
try(InputStream is = new FileInputStream("C:\\Users\\Wk'sPC\\Documents\\演示文稿20.png")) {
//请求与服务器的Socket连接
Socket socket = new Socket("127.0.0.1", 2576);

//字节输出流包装成一个数据输出流
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());

//先发送上传文件的后缀给服务器
dos.writeUTF(".png");

//把数据发送给服务器进行接收
byte[] buffer = new byte[1024];
int len;
while ((len = is.read(buffer)) > 0){
dos.write(buffer, 0, len);
}
dos.flush();

socket.shutdownOutput();
} catch (IOException e) {
e.printStackTrace();
}
}
}

BIO模式下的端口转发思想

需要实现一个客户端的消息可以发送给所有的客户端去接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cn.xiaohupao.six;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

/**
* @Author: xiaohupao
* @Date: 2021/10/15 11:50
* BIO模式下的端口转发思想-服务端实现
*/
public class Server {

public static List<Socket> allSocketOnLine = new ArrayList<>();
/**
* 注册端口
* 接收客户端的socket连接,交给一个独立的线程来处理
* 把当前连接的客户端Socket存入到一个所谓在线Socket集合中保存
* 接收客户端的消息,然后推送给当前所有的在线的socket接收
* @param args
*/
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket(2576);

while (true){
Socket accept = ss.accept();

//把当前的客户端socket放入到集合中
allSocketOnLine.add(accept);

//分配一个线程来处理当前的socket
new ServerReaderThread(accept).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package cn.xiaohupao.six;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;

/**
* @Author: xiaohupao
* @Date: 2021/10/15 12:12
*/
public class ServerReaderThread extends Thread{

private Socket socket;

public ServerReaderThread(Socket socket){
this.socket = socket;
}

@Override
public void run() {
try {
//从Socket中获取客户端的输入流
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String msg;
while ((msg = br.readLine()) != null){
//服务端将接收的消息推送给其他socket
sendMsgToAllClient(msg);
}
} catch (IOException e) {
//下线时维护Socket集合
Server.allSocketOnLine.remove(this.socket);
}
}

private void sendMsgToAllClient(String msg){
for (Socket socket : Server.allSocketOnLine){
try {
PrintStream ps = new PrintStream(socket.getOutputStream());
ps.println(msg);
ps.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

基于BIO模式下的即时通信

基于BIO模式下的即时通信,实现客户端与客户端的端口消息转发逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package cn.xiaohupao.chat.server;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;

/**
* @Author: xiaohupao
* @Date: 2021/10/15 19:16
*/
public class ServerChat {

/**
* 定义一个集合存放所有在线的Socket
*/
public static Map<Socket, String> onLineSockets = new HashMap<>();

public static void main(String[] args) {
try {
//注册端口
ServerSocket ss = new ServerSocket(2576);

//循环一直等待所有可能的客户端连接
while (true){
Socket accept = ss.accept();

//把客户端的Socket管道单独配置到一个线程来处理
new ServerReader(accept).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package cn.xiaohupao.chat.server;

import cn.xiaohupao.chat.util.Constants;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Set;

/**
* @Author: xiaohupao
* @Date: 2021/10/15 19:30
*/
public class ServerReader extends Thread{

private Socket socket;

public ServerReader(Socket socket){
this.socket = socket;
}

@Override
public void run() {
DataInputStream dis = null;
try {
dis = new DataInputStream(socket.getInputStream());

//循环等待客户端的消息
while (true){
//读取当前的消息类型:登录;群发;私聊,@消息
int flag = dis.readInt();
if (flag == 1){

//先将当前登录的客户端Socket存到在线人数的Socket集合中
String name = dis.readUTF();

//Log
System.out.println(name + "---->" + socket.getRemoteSocketAddress());

ServerChat.onLineSockets.put(this.socket, name);
}
writeMsg(flag, dis);
}
} catch (IOException e) {
System.out.println("人有下线了");
//移除当前Socket
ServerChat.onLineSockets.remove(this.socket);

try {
writeMsg(1, dis);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}

private void writeMsg(int flag, DataInputStream dis) throws IOException {
String msg = null;
if (flag == 1){
//读取所有在线人数发送给所有客户端去更新自己的在线人数列表
StringBuilder rs = new StringBuilder();
Collection<String> onlineNames = ServerChat.onLineSockets.values();

if (onlineNames.size() > 0){
for (java.lang.String name : onlineNames){
rs.append(name).append(Constants.SPILIT);
}

msg = rs.substring(0, rs.lastIndexOf(Constants.SPILIT));
sendMsgToAll(flag, msg);
}
}else if (flag == 2 || flag == 3){
//读到消息 群发或者@消息
String newMsg = dis.readUTF();

//得到发件人
String sendName = ServerChat.onLineSockets.get(socket);

//内容
StringBuilder msgFinal = new StringBuilder();

//时间
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss EEE");

if (flag == 2){
msgFinal.append(sendName).append(" ").append(sdf.format(System.currentTimeMillis())).append("\r\n");
msgFinal.append(" ").append(newMsg).append("\r\n");
sendMsgToAll(flag, msgFinal.toString());
}else {
msgFinal.append(sendName).append(" ").append(sdf.format(System.currentTimeMillis())).append("对您私发\r\n");
msgFinal.append(" ").append(newMsg).append("\r\n");
//对谁私发
String destName = dis.readUTF();
sendMsgToOne(destName, msgFinal.toString());
}
}
}

private void sendMsgToAll(int flag, String msg){
//拿到所有在线的Socket管道,给这些管道发消息
Set<Socket> onLineSockets = ServerChat.onLineSockets.keySet();
for (Socket sk : onLineSockets){
try {
DataOutputStream dos = new DataOutputStream(sk.getOutputStream());
dos.writeInt(flag);
dos.writeUTF(msg);
dos.flush();
} catch (IOException e) {
e.printStackTrace();
}

}
}

private void sendMsgToOne(String destName, String msg) throws IOException {
//拿到所有在线的Socket管道,给这些管道写消息
Set<Socket> sockets = ServerChat.onLineSockets.keySet();
for (Socket sk : sockets){
//只对这个名字对应的Socket私发消息
if (ServerChat.onLineSockets.get(sk).trim().equals(destName)){
DataOutputStream dos = new DataOutputStream(sk.getOutputStream());
dos.writeInt(2);
dos.writeUTF(msg);
dos.flush();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
package cn.xiaohupao.chat.util;

/**
* @Author: xiaohupao
* @Date: 2021/10/15 19:57
*/
public class Constants {

public static final int PORT = 2576;

public static final String SPILIT = "003197♣♣㏘♣④④♣";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package cn.xiaohupao.chat.client;

import cn.xiaohupao.chat.util.Constants;

import javax.swing.*;
import java.awt.*;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.DataOutputStream;
import java.net.Socket;

/**
* @Author: xiaohupao
* @Date: 2021/10/15 20:48
*/
public class ClientChat implements ActionListener {
/** 1.设计界面 */
private JFrame win = new JFrame();
/** 2.消息内容框架 */
public JTextArea smsContent =new JTextArea(23 , 50);
/** 3.发送消息的框 */
private JTextArea smsSend = new JTextArea(4,40);
/** 4.在线人数的区域 */
/** 存放人的数据 */
/** 展示在线人数的窗口 */
public JList<String> onLineUsers = new JList<>();

// 是否私聊按钮
private JCheckBox isPrivateBn = new JCheckBox("私聊");
// 消息按钮
private JButton sendBn = new JButton("发送");

// 登录界面
private JFrame loginView;

private JTextField ipEt , nameEt , idEt;

private Socket socket ;

public static void main(String[] args) {
new ClientChat().initView();

}

private void initView() {
/** 初始化聊天窗口的界面 */
win.setSize(650, 600);

/** 展示登录界面 */
displayLoginView();

/** 展示聊天界面 */
//displayChatView();

}

private void displayChatView() {

JPanel bottomPanel = new JPanel(new BorderLayout());
//-----------------------------------------------
// 将消息框和按钮 添加到窗口的底端
win.add(bottomPanel, BorderLayout.SOUTH);
bottomPanel.add(smsSend);
JPanel btns = new JPanel(new FlowLayout(FlowLayout.LEFT));
btns.add(sendBn);
btns.add(isPrivateBn);
bottomPanel.add(btns, BorderLayout.EAST);
//-----------------------------------------------
// 给发送消息按钮绑定点击事件监听器
// 将展示消息区centerPanel添加到窗口的中间
smsContent.setBackground(new Color(0xdd,0xdd,0xdd));
// 让展示消息区可以滚动。
win.add(new JScrollPane(smsContent), BorderLayout.CENTER);
smsContent.setEditable(false);
//-----------------------------------------------
// 用户列表和是否私聊放到窗口的最右边
Box rightBox = new Box(BoxLayout.Y_AXIS);
onLineUsers.setFixedCellWidth(120);
onLineUsers.setVisibleRowCount(13);
rightBox.add(new JScrollPane(onLineUsers));
win.add(rightBox, BorderLayout.EAST);
//-----------------------------------------------
// 关闭窗口退出当前程序
win.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
win.pack(); // swing 加上这句 就可以拥有关闭窗口的功能
/** 设置窗口居中,显示出来 */
setWindowCenter(win,650,600,true);
// 发送按钮绑定点击事件
sendBn.addActionListener(this);
}

private void displayLoginView(){

/** 先让用户进行登录
* 服务端ip
* 用户名
* id
* */
/** 显示一个qq的登录框 */
loginView = new JFrame("登录");
loginView.setLayout(new GridLayout(3, 1));
loginView.setSize(400, 230);

JPanel ip = new JPanel();
JLabel label = new JLabel(" IP:");
ip.add(label);
ipEt = new JTextField(20);
ip.add(ipEt);
loginView.add(ip);

JPanel name = new JPanel();
JLabel label1 = new JLabel("姓名:");
name.add(label1);
nameEt = new JTextField(20);
name.add(nameEt);
loginView.add(name);

JPanel btnView = new JPanel();
JButton login = new JButton("登陆");
btnView.add(login);
JButton cancle = new JButton("取消");
btnView.add(cancle);
loginView.add(btnView);
// 关闭窗口退出当前程序
loginView.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
setWindowCenter(loginView,400,260,true);

/** 给登录和取消绑定点击事件 */
login.addActionListener(this);
cancle.addActionListener(this);

}

private static void setWindowCenter(JFrame frame, int width , int height, boolean flag) {
/** 得到所在系统所在屏幕的宽高 */
Dimension ds = frame.getToolkit().getScreenSize();

/** 拿到电脑的宽 */
int width1 = ds.width;
/** 高 */
int height1 = ds.height ;

System.out.println(width1 +"*" + height1);
/** 设置窗口的左上角坐标 */
frame.setLocation(width1/2 - width/2, height1/2 -height/2);
frame.setVisible(flag);
}

@Override
public void actionPerformed(ActionEvent e) {
/** 得到点击的事件源 */
JButton btn = (JButton) e.getSource();
switch(btn.getText()){
case "登陆":
String ip = ipEt.getText().toString();
String name = nameEt.getText().toString();
// 校验参数是否为空
// 错误提示
String msg = "" ;
// 12.1.2.0
// \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\
if(ip==null || !ip.matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")){
msg = "请输入合法的服务端ip地址";
}else if(name==null || !name.matches("\\S{1,}")){
msg = "姓名必须1个字符以上";
}

if(!msg.equals("")){
/** msg有内容说明参数有为空 */
// 参数一:弹出放到哪个窗口里面
JOptionPane.showMessageDialog(loginView, msg);
}else{
try {
// 参数都合法了
// 当前登录的用户,去服务端登陆
/** 先把当前用户的名称展示到界面 */
win.setTitle(name);
// 去服务端登陆连接一个socket管道
socket = new Socket(ip, Constants.PORT);

//为客户端的socket分配一个线程 专门负责收消息
new ClientReader(this,socket).start();

// 带上用户信息过去
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
dos.writeInt(1); // 登录消息
dos.writeUTF(name.trim());
dos.flush();

// 关系当前窗口 弹出聊天界面
loginView.dispose(); // 登录窗口销毁
displayChatView(); // 展示了聊天窗口了


} catch (Exception e1) {
e1.printStackTrace();
}
}
break;
case "取消":
/** 退出系统 */
System.exit(0);
break;
case "发送":
// 得到发送消息的内容
String msgSend = smsSend.getText().toString();
if(!msgSend.trim().equals("")){
/** 发消息给服务端 */
try {
// 判断是否对谁发消息
String selectName = onLineUsers.getSelectedValue();
int flag = 2 ;// 群发 @消息
if(selectName!=null&&!selectName.equals("")){
msgSend =("@"+selectName+","+msgSend);
/** 判断是否选中了私法 */
if(isPrivateBn.isSelected()){
/** 私法 */
flag = 3 ;//私发消息
}

}

DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
dos.writeInt(flag); // 群发消息 发送给所有人
dos.writeUTF(msgSend);
if(flag == 3){
// 告诉服务端我对谁私发
dos.writeUTF(selectName.trim());
}
dos.flush();

} catch (Exception e1) {
e1.printStackTrace();
}

}
smsSend.setText(null);
break;

}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package cn.xiaohupao.chat.client;

import cn.xiaohupao.chat.util.Constants;

import java.io.DataInputStream;
import java.net.Socket;

/**
* @Author: xiaohupao
* @Date: 2021/10/17 20:22
*/
class ClientReader extends Thread {

private Socket socket;
private ClientChat clientChat ;

public ClientReader(ClientChat clientChat, Socket socket) {
this.clientChat = clientChat;
this.socket = socket;
}

@Override
public void run() {
try {
DataInputStream dis = new DataInputStream(socket.getInputStream());
/** 循环一直等待客户端的消息 */
while(true){
/** 读取当前的消息类型 :登录,群发,私聊 , @消息 */
int flag = dis.readInt();
if(flag == 1){
// 在线人数消息回来了
String nameDatas = dis.readUTF();
// 展示到在线人数的界面
String[] names = nameDatas.split(Constants.SPILIT);

clientChat.onLineUsers.setListData(names);
}else if(flag == 2){
//群发,私聊 , @消息 都是直接显示的。
String msg = dis.readUTF() ;
clientChat.smsContent.append(msg);
// 让消息界面滾動到底端
clientChat.smsContent.setCaretPosition(clientChat.smsContent.getText().length());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

NIO深入理解

Java NIO也有人称之为java non-blocking IO是从Java1.4开始引入的一个新的IO API,可以替代标准的Java IO API。NIO与原来的IO有同样的作用和目的,但是使用的方式完全不同,NIO支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行文件读写操作。NIO可以理解为非阻塞IO,传统的IO的read和write只能阻塞执行,线程在读写IO期间不能干其他事情,比如调用Socket.read()时,如果服务器一直没有数据传输过来,线程一直阻塞,而NIO中可以配置Socket为非阻塞模式。

NIO相关的类都被放在java.nio包及其子包下,并且对原java.io包中的很多类进行改写。

NIO有三大核心部分:Channel(通道)、Buffer(缓冲区)、Selector(选择器)

Java NIO的非阻塞模式,是一个线程从某通道发送请求或者读取数据,但它仅能得到目前可用的数据,如果目前没有数据可用,就什么也不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他事情。非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。

NIO可以做到用一个线程处理多个操作。假设有1000个请求,根据实际情况,可以分配20或者80个线程来处理。不像BIO,非得分配1000个。

NIO与BIO的比较

BIO以流的方式处理数据,而NIO以块的方式处理数据,块I/O的效率要比流I/O高很多。

BIO是阻塞的,NIO是非阻塞的。

BIO基于字节流和字符流进行操作,而NIO基于Channel和Buffer进行操作,数据总是从通道读取到缓冲区,或者从缓冲区写道通道中。Selector用于监听多个通道的事件,因此使用单个线程就可以监听多个客户端通道。

NIO三大核心原理

NIO有三大核心部分:Channel(通道)、Buffer(缓冲区)、Selector(选择器)

Buffer缓冲区

缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。相比较直接对数组的操作,Buffer API更加容易操作和管理。

Buffer类及其子类

Buffer就像一个数组,可以保存多个相同类型的数据。根据数据类型不同,有以下Buffer常用的子类:

ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。

1
static XxxBuffer allocate(int capacity);//创建一个容量为Capacity的XxxBuffer对象

缓冲区的基本属性

Buffer中的重要概念

  • 容量capacity:作为一个内存块,Buffer具有一定的固定大小,缓冲区容量不能为负,并且创建后不能更改。
  • 限制limit:表示缓冲区中可以操作数据的大小(limit后数据不能进行读写)。缓冲区的限制不能为负,并且不能大于其容量。写入模式,限制等于buffer的容量。读取模式下,limit等于写入的数据量。
  • 位置position:下一个要读取或写入的数据索引。缓冲区的位置不能为负,并且不能大于其限制
  • 标记mark与重置reset:标记是一个索引,通过Buffer中的Mark方法指定Buffer中一个特定的position,之后可以通过调用reset()方法恢复到这个position。

标记、位置、限制、容量遵循$0<=mark<=position<=limit<=capacity$

Buffer常见方法

1
2
3
4
5
6
7
8
9
10
11
Buffer clear();//清空缓冲区并返回对缓冲区的引用
Buffer flip();//将缓冲区的界限设置为当前位置,并将当前位置重置为0 0
int capacity();//返回Buffer的capacity大小
boolean hasRemaining();//判断缓冲区是否还有元素
int limit();//返回Buffer的界限limit的位置
Buffer mark();//对缓冲区设置标记
int position();//返回缓冲区的当前位置position
Buffer position(int n);//将设置缓冲区的当前位置为n,并返回修改后的Buffer对象
int remaining();//返回position和limit之间的元素个数
Buffer reset();//将位置position转到以前设置的mark所在的位置
Buffer rewind();//将位置设置为0,取消色设置的mark
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package cn.xiaohupao.niotest;

import org.junit.jupiter.api.Test;

import java.nio.Buffer;
import java.nio.ByteBuffer;

/**
* @Author: xiaohupao
* @Date: 2021/10/18 10:26
*/
public class BufferTest {
@Test
public void test01(){

//分配一个缓冲区,容量设置为10
ByteBuffer allocate = ByteBuffer.allocate(10);
System.out.println(allocate.position());//0
System.out.println(allocate.limit());//10
System.out.println(allocate.capacity());//10
System.out.println(allocate);

System.out.println("----------------------");

//put往缓冲区中添加数据
String name = "wkwkwk";
allocate.put(name.getBytes());
System.out.println(allocate.position());//6
System.out.println(allocate.limit());//10
System.out.println(allocate.capacity());//10

System.out.println("----------------------");

//Buffer flip() 为将缓冲区的界限设置为当前位置,并将当前位置设置为0
Buffer flip = allocate.flip();
System.out.println(allocate.position());//0
System.out.println(allocate.limit());//6
System.out.println(allocate.capacity());//10

System.out.println("----------------------");

//get数据的读取
char ch = (char) allocate.get();
System.out.println(ch);//w
System.out.println(allocate.position());//1
System.out.println(allocate.limit());//6
System.out.println(allocate.capacity());//10
}

@Test
public void test02(){
//分配一个缓冲区,容量设置为10
ByteBuffer allocate = ByteBuffer.allocate(10);
System.out.println(allocate.position());//0
System.out.println(allocate.limit());//10
System.out.println(allocate.capacity());//10
System.out.println(allocate);

System.out.println("----------------------");

//put往缓冲区中添加数据
String name = "wkwkwk";
allocate.put(name.getBytes());
System.out.println(allocate.position());//6
System.out.println(allocate.limit());//10
System.out.println(allocate.capacity());//10

System.out.println("----------------------");

//clear清除缓冲区中的数据
allocate.clear();
System.out.println(allocate.position());//0
System.out.println(allocate.limit());//10
System.out.println(allocate.capacity());//10
System.out.println((char)allocate.get());//w

//定义一个缓冲区
ByteBuffer allocate1 = ByteBuffer.allocate(10);
String name1 = "lsnlsn";
allocate1.put(name1.getBytes());

allocate1.flip();

byte[] b = new byte[2];
allocate1.get(b);
String res = new String(b);
System.out.println(res);//ls

System.out.println(allocate1.position());//2
System.out.println(allocate1.limit());//6
System.out.println(allocate1.capacity());//10

System.out.println("----------------------");

allocate1.mark();//标记2这个位置

byte[] b2 = new byte[3];

allocate1.get(b2);
String res1 = new String(b2);
System.out.println(res1);//nls

System.out.println(allocate1.position());//5
System.out.println(allocate1.limit());//6
System.out.println(allocate1.capacity());//10

System.out.println("----------------------");

allocate1.reset();//回到标记为2的地方,目前已经读取2个字符
if (allocate1.hasRemaining()){
System.out.println(allocate1.remaining());//4
}

}
}

直接与非直接缓冲区

什么是直接内存与非直接内存

byte buffer可以是两种类型,一种是基于直接内存;另一种是非直接内存。对于直接内存来说,JVM将会在IO操作上具有更高的性能,因为它直接作用于本地系统的IO操作。而非直接内存,也就是堆内存中的数据,如果作IO操作,会先从本进程内存复制到直接内存,再利用本地IO处理。

非直接内存的作用链:本地IO - - > 直接内存 - - > 非直接内存 - - > 直接内存 - - > 本地IO

直接内存的作用链:本地IO - - > 直接内存 - - > 本地IO

在做IO处理时,网络 发送大量数据时,直接内存会具有更高的效率。直接内存使用allocateDirect创建,但是它比申请普通的堆内存需要耗费更高的性能。不过,这部分的数据是在JVM之外的,因此它不会占用应用的内存。所以呢,当有很大的数据要缓存,并且它的生命周期又很长,那么就比较适合使用直接内存。只是一般来说,如果不是能带来很明显的性能提升,还是推荐直接使用堆内存。字节缓冲区是直接缓冲区还是非直接缓冲区可以通过调用其isDirect方法来确定。

使用场景

  • 有很大的数据需要存储,它的生命周期又很长
  • 适合频繁的IO操作,比如网络并发场景
1
2
3
4
5
@Test
public void test03(){
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
System.out.println(buffer.isDirect());//true
}

Channel通道

Java NIO的通道类似于流,但又有些不同:即可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。通道可以非阻塞读取和写入通道,通道可以支持读取或写入缓冲区,也支持异步地读写。

通道:由java.nio.channel表示IO源与目标打开的连接。Channel类似于传统的“流”。只不过Channel本身不能直接访问数据,Channel只能与Buffer进行交互。

NIO的通带类似于流,但有些区别:

  • 通道可以同时进行读写,而流只能读或只能写
  • 通道可以实现异步读写数据
  • 通道可以从缓冲区读数据,也可以写数据到缓冲区

BIO中的流是单向的,例如FileInputStream对象只能进行读取数据的操作,而NIO中的通道是双向的,可以读操作,也可以写操作。

Channel在NIO中是一个接口。

1
public interface channel extends closeable{}

常用的Channel实现类

  • FileChannel:用于读取、写入、映射和操作文件的通道
  • DatagramChannel:通过UDP读写网络中的数据通道
  • SocketChannel:通过TCP读写网络中的数据
  • ServerSocketChannel:可以监听新进来的TCP连接,对每一个新进来的连接都会创建一个SocketChannel。【ServerSocketChannel类似于ServerSocket;SocketChannel类似于Socket】

FileChannel类

获取通道的一种方式是支持通道的对象调用getChannel()方法。支持通道的类如下:

  • FileInputStream
  • FileOutputStream
  • RandomAccessFile
  • DatagramSocket
  • Socket
  • ServerSocket

获取到通道的其他方式使用Files类的静态方法newByteChannel()获取字节通道。或者通过通道的静态方法open()打开并返回指定通道。

1
2
3
4
5
6
7
8
9
int read(ByteBuffer dst);
long read(ByteBuffer[] dsts);
int write(ByteBuffer src);
long write(ByteBuffer[] srcs);
long position();
FileChannel position(long p);
long size();
FileChannel truncate(long s);
void force(boolean metaData);

使用ByteBuffer和FileChannel,将“Hello!576!”写入到data01.txt中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void write(){
try {
//字节输出流通向目标文件
FileOutputStream fos = new FileOutputStream("data01.txt");
//得到字节输出流对应的通道
FileChannel channel = fos.getChannel();
//分配缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("Hello!576!".getBytes());
//把缓冲区切换成写出模式
buffer.flip();
channel.write(buffer);
channel.close();
System.out.println("写数据到文件中!");
} catch (IOException e) {
e.printStackTrace();
}
}

使用ByteBuffer和FileChannel,将data01.txt中的数据读入到程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void read(){
try {
//字节输入流与源文件接通
FileInputStream fis = new FileInputStream("data01.txt");
//得到一个文件字节输入流的文件通道
FileChannel channel = fis.getChannel();
//定义一个缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();

//读取缓冲区中的数据并输出
String res = new String(buffer.array(), 0, buffer.remaining());
System.out.println(res);
} catch (IOException e) {
e.printStackTrace();
}
}

使用FileChannel,完成文件拷贝。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Test
public void copy(){
//定义两个文件路径
File src = new File("C:\\Users\\Wk'sPC\\Documents\\演示文稿2.png");
File dest = new File("F:\\dest\\newPicture.png");

try {
FileInputStream fis = new FileInputStream(src);
FileOutputStream fos = new FileOutputStream(dest);

//得到文件通道
FileChannel isChannel = fis.getChannel();
FileChannel osChannel = fos.getChannel();

//分配缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);

while (true){
//清空缓冲区再写入数据到缓冲区
buffer.clear();
int flag = isChannel.read(buffer);
if (flag == -1){
break;
}
//将缓冲区的模式切换成可读模式
buffer.flip();
osChannel.write(buffer);
}
isChannel.close();
osChannel.close();
System.out.println("复制完成!");

} catch (IOException e) {
e.printStackTrace();
}
}

分散(Scatter)和聚集(Gather)

分散:是指把Channel通道的数据读入到多个缓冲区中。

聚集写入:是指把多个Buffer中的数据聚散到Channel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Test
public void scatter(){
try {
//字节输入流
FileInputStream fis = new FileInputStream("data01.txt");

//字节输出管道
FileChannel fisChannel = fis.getChannel();

//字节输出流
FileOutputStream fos = new FileOutputStream("data02.txt");
FileChannel fosChannel = fos.getChannel();


//定义多个缓冲区
ByteBuffer buffer1 = ByteBuffer.allocate(4);
ByteBuffer buffer2 = ByteBuffer.allocate(1024);

ByteBuffer[] buffers = {buffer1, buffer2};

//从缓冲区中读取数据分散到各个缓冲区
fisChannel.read(buffers);

for (ByteBuffer buffer : buffers){
//切换到读模式
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.remaining()));

}

//聚集写入到通道
fosChannel.write(buffers);

fisChannel.close();
fosChannel.close();

} catch (IOException e) {
e.printStackTrace();
}
}

transferFrom()从通道中去复制原通道数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void transferFrom(){
try {
FileInputStream fis = new FileInputStream("data01.txt");
FileChannel fisChannel = fis.getChannel();

FileOutputStream fos = new FileOutputStream("data03.txt");
FileChannel fosChannel = fos.getChannel();

//复制数据
//fosChannel.transferFrom(fisChannel, fisChannel.position(), fisChannel.size());
//or
fisChannel.transferTo(fisChannel.position(), fisChannel.size(), fosChannel);
fisChannel.close();
fosChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}

Selector选择器

Selector是一个Java NIO组件,可以能够检查一个或多个NIO通道,并确定哪些通道已经准备好进行读取或写入。这样,一个单独的线程可以管理多个channel,从而管理多个网络的连接,提高效率。

选择器是SelectableChannle对象的多路复用器,Selector可以同时监控多个SelectableChannel的IO状况,也就是说,利用Selector可使一个单独的线程管理多个Channel。Selector是非阻塞IO的核心。

  • Java的NIO,用非阻塞的IO方式。可以用一个线程,处理多个的客户端连接,就会使用到Selector选择器
  • Selector能够检测多个注册的通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只使用一个单线程去管理多个通道,也就是管理多个连接和请求。
  • 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程。
  • 避免了多个线程之间的上下文切换导致的开销。

选择器的应用

创建一个Selector:调用Selector.open()方法去创建一个Selector。

1
Selector selector = Selector.open();

向选择器注册通道:SelectableChannel.register(Selector sel, int ops)

1
2
3
4
5
6
7
8
9
10
//获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//切换非阻塞模式
ssChannel.configureBlocking(false);
//绑定连接
ssChannel.bind(new InetSocketAddress(2576));
//获取选择器
Selector selector = Selector.open();
//将通道注册到选择器上,并指定监听接收事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);

当调用register(Selector sel, int ops)将通道注册选择器时,选择器对通道的监听事件,需要通过第二个参数ops指定。可以监听的事件类型:

  • 读:SelectionKey.OP_READ
  • 写:SelectionKey.OP_WRITE
  • 连接:SelectionKey.OP_CONNECT
  • 接收:SelectionKey.OP_ACCEPT
  • 若注册时不知监听一个事件,可以使用“位或”操作符连接。
1
int interestSET = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
Donate comment here