Reactor模式

Reactor是一个或多个输入事件的处理模式,用于处理并发传递给服务处理程序的服务请求。服务处理程序判断传入请求发生的事件,并将它们同步的分派给关联的请求处理程序。

Reactor之所以高效是因为采用了分而治之事件驱动设计。分而治之:Reactor模式将处理过程分为多个小任务,每个任务执行一个非阻塞的操作,通常由一个IO事件触发执行。事件驱动:事件驱动设计通常比其他模型更高效,因为他不用针对每个请求启用一条线程,减少了上下文的切换,缺点是必须手动将事件和处理动作绑定。

NIO中总共设计了四种事件:

  • OP_ACCEPT:服务端监听到一个连接,准备接收
  • OP_CONNECT:客户端与服务端连接建立成功
  • OP_READ:读事件就绪,通道有数据可读
  • OP_WRITE:写事件就绪,可以向通道写入数据

Reactor模式按照职责不同,通常可以把线程分为Reactor线程、IO线程和业务线程:

  • Reactor线程:轮询通知发生IO的通道,并分派合适的Handler处理
  • IO线程:执行实际读写操作
  • 业务线程:执行应用程序的业务逻辑

三种Reactor模型

单线程Reactor模式

一个线程完成事件的通知、实际的IO操作和业务处理。

单线程Reactor模式的优点是模型简单,没有多线程的并发问题。缺点是无法发挥多核CPU的性能且存在可靠性问题。当线程意外终止或死循环会导致整个服务不可用。

单Reactor多线程

单个Reactor线程将请求分发给Handler线程,Handler是多线程。

单Reactor多线程模式优点是可以充分利用多核CPU的处理能力,缺点是多线程存在并发问题并且由于Reactor单线程处理所有的事件监听和相应在高并发应用场景下容易出现性能瓶颈。

主从Reactor多线程

主Reactor用于处理连接接收事件,多个从Reactor负责Handler的分发处理。

主从Reactor多线程例子

  • MultiReactor:
    • Reactor:负责分发请求
    • Acceptor:负责建立SocketChannel链接
  • BasicHandler:负责处理用户请求

BasicHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class BasicHandler implements Runnable {
private static final int WRITE_SIZE = 1024;
private static final int READ_SIZE = 1024;
//定义服务状态
private static final int READING=0, SENDING = 1, CLOSED = 2;

public SocketChannel socketChannel;
public SelectionKey selectionKey;
private ByteBuffer readBuffer = ByteBuffer.allocate(READ_SIZE);
private ByteBuffer writeBuffer = ByteBuffer.allocate(WRITE_SIZE);
private int state = READING;

public BasicHandler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}

public BasicHandler(Selector selector, SocketChannel socketChannel) throws IOException {
this.socketChannel = socketChannel;
//设置非阻塞
socketChannel.configureBlocking(false);
selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
selectionKey.attach(this);
//唤醒select方法
selector.wakeup();
}

@Override
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException e) {
try {
selectionKey.channel().close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}

protected void read() throws IOException {}

protected void send() throws IOException {}
}

MultiReactor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public class MultiReactor {
private static final int POOL_SIZE=3;
static Executor selectorPool = Executors.newFixedThreadPool(POOL_SIZE);
private Reactor mainReactor;
private Reactor[] subReactors = new Reactor[POOL_SIZE - 1];
int next = 0;
private final int port;

public MultiReactor(int port){
this.port = port;
try {
mainReactor = new Reactor();
for (int i = 0; i < subReactors.length; i++) {
subReactors[i] = new Reactor();
}
} catch (IOException e) {
e.printStackTrace();
}
}

public void start() throws IOException {
Thread mainReactorThread = new Thread(mainReactor, "mainReactor");
new Acceptor(mainReactor.getSelector(),port);
selectorPool.execute(mainReactorThread);

for (int i = 0; i < subReactors.length; i++) {
Thread sub = new Thread(subReactors[i], "subReactor-" + i);
selectorPool.execute(sub);
}
}

private static class Reactor implements Runnable {

private ConcurrentLinkedQueue<BasicHandler> events = new ConcurrentLinkedQueue<>();
final Selector selector;

public Reactor() throws IOException {
this.selector = Selector.open();
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
BasicHandler handler = null;
while ((handler = events.poll()) != null) {
handler.socketChannel.configureBlocking(false);
handler.selectionKey = handler
.socketChannel
.register(selector, SelectionKey.OP_READ);
handler.selectionKey.attach(handler);
}
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
for (SelectionKey key : selected) {
dispatch(key);
}
selected.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}

void dispatch(SelectionKey key) {
Runnable runnable = (Runnable) key.attachment();
if (runnable != null) {
runnable.run();
}
}

void register(BasicHandler basicHandler) {
events.offer(basicHandler);
selector.wakeup();
}

public Selector getSelector() {
return selector;
}

}

private class Acceptor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;

public Acceptor(Selector selector, int port) throws IOException{
this.selector = selector;
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(this);
System.out.println("mainReactor Acceptor: Listening on port:" + port);
}

@Override
public void run() {
try {
SocketChannel client = serverSocket.accept();
if (client != null) {
client.write(ByteBuffer.wrap("你已成功连接服务器!".getBytes()));
System.out.println("mainReactor Acceptor: " + client.getRemoteAddress());
Reactor subReactor = subReactors[next];
subReactor.register(new BasicHandler(client));
next++;
next %= subReactors.length;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

参考