消息传递有很多种方式,请求/响应(Request/Reply)是最常用的。在前面的博文的例子中,很多都是采用请求/响应的方式,当服务器接收到消息后,会立即write回写一条消息到客户端。HTTP协议也是基于请求/响应的方式。
但是请求/响应并不能满足所有的消息传递的需求,有些需求可能需要服务端主动推送消息到客户端,而不是被动的等待请求后再给出响应。
发布/订阅(Publish/Subscribe)是一种服务器主动发送消息到客户端的消息传递方式。订阅者Subscriber连接到服务器客户端后,相当于开始订阅发布者Publisher发布的消息,当发布者发布了一条消息后,所有订阅者都会接收到这条消息。
网络聊天室一般就是基于发布/订阅模式来实现。例如加入一个QQ群,就相当于订阅了这个群的所有消息,当有新的消息,服务器会主动将消息发送给所有的客户端。只不过聊天室里的所有人既是发布者又是订阅者。
下面分别用MINA、Netty、Twisted分别实现简单的发布/订阅模式的服务器程序,连接到服务器的所有客户端都是订阅者,当发布者发布一条消息后,服务器会将消息转发给所有客户端。
MINA 在MINA中,通过IoService的getManagedSessions()方法可以获取这个IoService当前管理的所有IoSession,即所有连接到服务器的客户端集合。当服务器接收到发布者发布的消息后,可以通过IoService的getManagedSessions()方法获取到所有客户端对应的IoSession并将消息发送到这些客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class TcpServer { public static void main (String[] args) throws IOException { IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast("codec" , new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8" ), "\r\n" , "\r\n" ))); acceptor.setHandler(new TcpServerHandle()); acceptor.bind(new InetSocketAddress(8080 )); } } class TcpServerHandle extends IoHandlerAdapter { @Override public void exceptionCaught (IoSession session, Throwable cause) throws Exception { cause.printStackTrace(); } @Override public void messageReceived (IoSession session, Object message) throws Exception { Collection<IoSession> sessions = session.getService().getManagedSessions().values(); IoUtil.broadcast(message, sessions); } }
Netty Netty提供了ChannelGroup来用于保存Channel组,ChannelGroup是一个线程安全的Channel集合,它提供了一些列Channel批量操作。当一个TCP连接关闭后,对应的Channel会自动从ChannelGroup移除,所以不用手动去移除关闭的Channel。
Netty文档关于ChannelGroup的解释:
A thread-safe Set that contains open Channels and provides various bulk operations on them. Using ChannelGroup, you can categorize Channels into a meaningful group (e.g. on a per-service or per-state basis.) A closed Channel is automatically removed from the collection, so that you don’t need to worry about the life cycle of the added Channel. A Channel can belong to more than one ChannelGroup.
当有新的客户端连接到服务器,将对应的Channel加入到一个ChannelGroup中,当发布者发布消息时,服务器可以将消息通过ChannelGroup写入到所有客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class TcpServer { public static void main (String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LineBasedFrameDecoder(80 )); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new TcpServerHandler()); } }); ChannelFuture f = b.bind(8080 ).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } } class TcpServerHandler extends ChannelInboundHandlerAdapter { private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void channelActive (ChannelHandlerContext ctx) { channels.add(ctx.channel()); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { channels.writeAndFlush(msg + "\r\n" ); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }
Twisted 在Twisted中,全局的数据一般会放在Factory,而每个连接相关的数据会放在Protocol中。所以这里可以在Factory中加入一个属性,来存放Protocol集合,表示所有连接服务器的客户端。当有新的客户端连接到服务器时,将对应的Protocol实例放入集合,当连接断开,将对应的Protocol从集合中移除。当服务器接收到发布者发布的消息后,遍历所有客户端并发送消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from twisted.protocols.basic import LineOnlyReceiverfrom twisted.internet.protocol import Factoryfrom twisted.internet import reactorclass TcpServerHandle (LineOnlyReceiver) : def __init__ (self, factory) : self.factory = factory def connectionMade (self) : self.factory.clients.add(self) def connectionLost (self, reason) : self.factory.clients.remove(self) def lineReceived (self, line) : for c in self.factory.clients: c.sendLine(line) class TcpServerFactory (Factory) : def __init__ (self) : self.clients = set() def buildProtocol (self, addr) : return TcpServerHandle(self) reactor.listenTCP(8080 , TcpServerFactory()) reactor.run()
下面分别是两个客户端程序,一个是用于发布消息的客户端,一个是订阅消息的客户端。
发布消息的客户端很简单,就是向服务器write一条消息即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class PublishClient { public static void main (String[] args) throws IOException { Socket socket = null ; OutputStream out = null ; try { socket = new Socket("localhost" , 8080 ); out = socket.getOutputStream(); out.write("Hello\r\n" .getBytes()); out.flush(); } finally { out.close(); socket.close(); } } }
订阅消息的客户端连接到服务器后,会阻塞等待接收服务器发送的发布消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class SubscribeClient { public static void main (String[] args) throws IOException { Socket socket = null ; BufferedReader in = null ; try { socket = new Socket("localhost" , 8080 ); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); while (true ) { String line = in.readLine(); if (line == null ) { break ; } System.out.println(line); } } finally { in.close(); socket.close(); } } }
分别针对MINA、Netty、Twisted服务器进行测试:
测试时首先开启服务器;
然后再运行订阅消息的客户端SubscribeClient,SubscribeClient可以开启多个;
最后运行发布消息的客户端PublishClient,可以多次运行查看所有SubscribeClient的输出结果。
运行结果可以发现,当运行发布消息的客户端PublishClient发布一条消息到服务器时,服务器会主动将这条消息转发给所有的TCP连接,所有的订阅消息的客户端SubscribeClient都会接收到这条消息并打印出来。
MINA、Netty、Twisted一起学系列 MINA、Netty、Twisted一起学(一):实现简单的TCP服务器
MINA、Netty、Twisted一起学(二):TCP消息边界问题及按行分割消息
MINA、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)
MINA、Netty、Twisted一起学(四):定制自己的协议
MINA、Netty、Twisted一起学(五):整合protobuf
MINA、Netty、Twisted一起学(六):session
MINA、Netty、Twisted一起学(七):发布/订阅(Publish/Subscribe)
MINA、Netty、Twisted一起学(八):HTTP服务器
MINA、Netty、Twisted一起学(九):异步IO和回调函数
MINA、Netty、Twisted一起学(十):线程模型
MINA、Netty、Twisted一起学(十一):SSL/TLS
MINA、Netty、Twisted一起学(十二):HTTPS
源码 https://github.com/wucao/mina-netty-twisted