了解 Reactor 模式,就要先从事件驱动的开发方式说起。
我们知道,服务器开发,CPU 的处理速度远高于 IO 速度,为了避免 CPU 因为 IO 为阻塞,好一点的方法是多进程或线程处理,但这会带来一些进程切换的开销。
这时先驱者找到了事件驱动,或者叫回调的方法。这种方式就是,应用向一个中间人注册一个回调(Event handler),当 IO 就绪后,这个中间人产生一个时间,并通知此 handler 进行处理。这种回调的方式,也闲了"好莱坞原则" - "Don't call us, we'll call you."
那在 IO 就绪这个事件后,谁来充当这个中间人?Reactor 模式的答案是:有一个不断等待和循环的单独进程(线程)来做这件事,它接受所有 handler 的注册,并负责先操作系统个查询 IO 是否就绪,在就绪后用指定的 handler 进行处理,这个角色的名称就叫做 Reactor。
Reactor 与 NIO
NIO 中 Reactor 的核心是 selector,一个简单的 Reactor 示例,一个核心的 Reactor 的循环,这种循环结构又叫做 EventLoop。
结合 NIO 服务端创建时序图 & 实际代码进行解说:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
| public class Reactor implements Runnable { public final Selector selector; public final ServerSocketChannel server; /** * 创建了 ServerSocketChannel 对象,并调用 configureBlocking() 方法,配置为非阻塞模式 * 把通道绑定到制定端口,向 Selector 注册事件,并指定参数 OP_ACCEPT,即监听 accept 事件 */ public Reactor( int port) throws IOException { // 创建Selector对象 selector = Selector.open(); // 创建可选择通道,并配置为非阻塞模式 server = ServerSocketChannel.open(); server.configureBlocking( false ); // 绑定通道到指定端口 ServerSocket socket = server.socket(); InetSocketAddress address = new InetSocketAddress(port); socket.bind(address); /** * 为了将Channel和Selector配合使用,必须将channel注册到selector上。 * 通过SelectableChannel.register()方法来实现 */ // 向 Selector 注册该 channel SelectionKey selectionKey = server.register(selector, Selection.OP_ACCEPT); /** * selectionKey.attach(theObject); 可以将一个对象或更多信息附着到 SelectionKey上, * Object attachedObj = selectionKey.attachment(); 可以从 SelectionKey 获取附着的信息。 */ // 利用 selectionKey 的 attach 功能绑定 Acceptor,如果有事件,触发 Acceptor selectionKey.attach( new Acceptor( this )); } /** * Selector 开始监听 ,进入内部循环。在非阻塞 IO 中,内部循环模式都是遵循这种方式。 * 首先调用 select() 方法,该方法会阻塞,直到至少有一个事件发生, * 然后使用 selectedKeys() 方法获取发生事件的 SelectionKey,然后使用迭代器进行循环 */ @Override public void run() { try { while (!Thread.interrupted()) { // 该调用会阻塞,直到至少有一个事件发生 selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); dispatch(key); } selected.clear(); } } catch (IOException ex) { /* ... */ } } /** * 运行 Acceptor */ void dispatch(SelectionKey key) { Acceptor acceptor = (Acceptor) key.attachment(); Runnable r = (Runnable)(acceptor ); if (r != null ) { r.run(); } } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| public class Acceptor implements Runnable { private Reactor reactor; public Acceptor(Reactor reactor) { this .reactor=reactor; } /** * 接收请求 */ @Override public void run() { try { ServerSocketChannel server = reactor.server; SocketChannel channel = server.accept(); if (channel != null ) { // 调用 Handler 来处理 channel new SocketReadHandler(reactor.selector, channel); } } catch (IOException e) { /* ... */ } } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| public class SocketReadHandler implements Runnable { private Selector selector; private SocketChannel channel; public SocketReadHandler(Selector selector, SocketChannel channel) throws IOException { this .selector = selector; this .channel = channel; channel.configureBlocking( false ); /** * 将新接入的客户端连接注册到 Reactor 线程的多路复用器上 * 监听读操作位,用来读取客户端发送的网络消息 */ SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_READ); // 将 SelectionKey 绑定为本 Handler 有事件触发时,将调用本类的 run 方法。 selectionKey.attach( this ); } /** * 处理读取客户端发来的信息的事件 */ @Override public void run() { // 创建读取的缓冲区 ByteBuffer buffer = ByteBuffer.allocate( 1024 ); try { int count = channel.read(buffer); if (count > 0 ) { buffer.flip(): CharBuffer charBuffer = decoder.decode(buffer); String msg = charBuffer.toString(); // ... SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_WRITE); selectionKey.attach(name); } } catch (IOException e) { /* ... */ } buffer.clear(); } } |
Reactor 与 Netty
Reactor 模式有多个变种,Netty 基于 Multiple Reactors 模式做了一定的修改,Mutilple Reactors 模式有多个 reactor:mainReactor 和 subReactor,其中 mainReactor 只有一个,负责响应 client 的连接请求,并建立连接,它使用 NIO Selector;subReactor 可以有一个或多个,每个 subReactor 都会在一个独立线程中执行,并且维护一个独立的 NIO Selector。
这是因为 subReactor 会执行一个比较耗时的 IO 操作,例如消息的读写,使用个多个线程去执行,则更加有利于发挥 CPU 的运算能力,减少 IO 等待时间。
Netty 的线程模型基于 Multiple Reactors 模式,借用了 mainReactor 和 subReactor 结构,从代码来看,它并没有 Thread Pool。Netty 的 subReactor 与 worker thread 是用一个线程,采用 IO 多路复用机制,可以使一个 subReactor 监听并处理多个 channel 的 IO 请求。
其中 parentGroup 和 childGroup 是 Bootstrap 构建方法中传入的两个对象,这两个 group 均是线程池,childGroup 线程池会被各个 subReactor 充分利用,parentGroup 线程池则只是在 bind 某个端口后,获得其中一个线程作为 mainReactor。
Netty 里对应 mainReactor 的角色叫做 "Boss",而对应 subReactor 的角色叫做 "Worker"。Boss 负责分配请求,Worker 负责执行。在 Netty 4.0 之后,NioEventLoop 是 Netty NIO 部分的核心。
Reactor 与 Kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
| /** * An NIO socket server. The threading model is * 1 Acceptor thread that handles new connections * Acceptor has N Processor threads that each have their own selector and read requests from sockets * M Handler threads that handle requests and produce responses back to the processor threads for writing. */ class SocketServer( val host : String, val port : Int, val processorBeginIndex : Int, val numProcessorThreads : Int, val totalProcessorThreads : Int, val time : Time, val metrics : Metrics) extends Logging { private val processors = new Array[Processor](totalProcessorThreads) /** * Start the socket server */ def startup() { this .synchronized { new Acceptor(host, port, processorBeginIndex, numProcessorThreads, processors, time, metrics) } } } /** * Thread that accepts and configures new connections. There is only need for one of these */ private class Acceptor( val host : String, private val port : Int, val processorBeginIndex : Int, numProcessorThreads : Int, processors : Array[Processor], val time : Time, val metrics : Metrics) extends Runnable { val nioSelector = java.nio.channels.Selector.open() val serverChannel = openServerSocket(host, port) val processorEndIndex = processorBeginIndex + numProcessorThreads this .synchronized { for (i <- processorBeginIndex until processorEndIndex) { processors(i) = new Processor(time, metrics) } } /* * Create a server socket to listen for connections on. */ def openServerSocket(host : String, port : Int) : ServerSocketChannel = { val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking( false ) val socketAddress = if (host == null || host.trim.isEmpty) new InetSocketAddress(port) else new InetSocketAddress(host, port) try { serverChannel.socket.bind(socketAddress) } catch { case e : SocketException = > throw new Exception( "Socket server failed to bind." ) } serverChannel } /** * Accept loop that checks for new connection attempts */ def run() { serverChannel.register(nioSelector, SelectionKey.OP _ ACCEPT) val currentProcessor = processorBeginIndex val ready = nioSelector.select() if (ready > 0 ) { val keys = nioSelector.selectedKeys() val iterator = keys.iterator() while (iterator.hasNext) { var key : SelectionKey = null try { key = iterator.next() iterator.remove() if (key.isAcceptable) accept(key, processors(currentProcessor)) else throw new IllegalStateException( "Unrecognized key state for acceptor thread." ) } catch { case e : Throwable = > error( "Error while accepting connection" ) } } } } /* * Accept a new connection */ def accept(key : SelectionKey, processor : Processor) : Unit = { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] val socketChannel = serverSocketChannel.accept() socketChannel.configureBlocking( false ) socketChannel.socket().setTcpNoDelay( true ) socketChannel.socket().setKeepAlive( true ) processor.accept(socketChannel) } } /** * Thread that processes all requests from a single connection. There are N of these running in parallel * each of which has its own selectors */ private class Processor( val time : Time, val metrics : Metrics) extends Runnable { private val metricTags = new util.HashMap[String, String]() private val selector = new org.apache.kafka.common.network.Selector( metrics, time, "socket-server" , metricTags) def run() { while (!Thread.interrupted()) { try { selector.poll( 300 ) } catch { case e @ ( _: IllegalStateException | _: IOException) = > { throw e } } } } /** * Queue up a new connection for reading */ def accept(socketChannel : SocketChannel) { selector.wakeup() } } |
转载请并标注: “本文转载自 linkedkeeper.com (文/张松然)”