文章目录
  1. 1. MINA
  2. 2. Netty
  3. 3. Twisted
  • MINA、Netty、Twisted一起学系列
    1. 1. 源码
  • 上一篇博文中,有介绍到用换行符分割消息的方法。但是这种方法有个小问题,如果消息中本身就包含换行符,那将会将这条消息分割成两条,结果就不对了。

    本文介绍另外一种消息分割方式,即上一篇博文中讲的第2条:use a fixed length header that indicates the length of the body,用一个固定字节数的Header前缀来指定Body的字节数,以此来分割消息。
    固定字节数的Header前缀来指定Body的字节数
    上面图中Header固定为4字节,Header中保存的是一个4字节(32位)的整数,例如12即为0x0000000C,这个整数用来指定Body的长度(字节数)。当读完这么多字节的Body之后,又是下一条消息的Header。

    下面分别用MINA、Netty、Twisted来实现对这种消息的切合和解码。

    MINA

    MINA提供了PrefixedStringCodecFactory来对这种类型的消息进行编码解码,PrefixedStringCodecFactory默认Header的大小是4字节,当然也可以指定成1或2。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    public class TcpServer {

    public static void main(String[] args) throws IOException {
    IoAcceptor acceptor = new NioSocketAcceptor();

    // 4字节的Header指定Body的字节数,对这种消息的处理
    acceptor.getFilterChain().addLast("codec",
    new ProtocolCodecFilter(new PrefixedStringCodecFactory(Charset.forName("UTF-8"))));

    acceptor.setHandler(new TcpServerHandle());
    acceptor.bind(new InetSocketAddress(8080));
    }

    }

    class TcpServerHandle extends IoHandlerAdapter {

    @Override
    public void exceptionCaught(IoSession session, Throwable cause)
    throws Exception {
    cause.printStackTrace();
    }

    // 接收到新的数据
    @Override
    public void messageReceived(IoSession session, Object message)
    throws Exception {

    String msg = (String) message;
    System.out.println("messageReceived:" + msg);

    }

    @Override
    public void sessionCreated(IoSession session) throws Exception {
    System.out.println("sessionCreated");
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
    System.out.println("sessionClosed");
    }
    }

    Netty

    Netty使用LengthFieldBasedFrameDecoder来处理这种消息。下面代码中的new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4)中包含5个参数,分别是int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip。maxFrameLength为消息的最大长度,lengthFieldOffset为Header的位置,lengthFieldLength为Header的长度,lengthAdjustment为长度调整(默认Header中的值表示Body的长度,并不包含Header自己),initialBytesToStrip为去掉字节数(默认解码后返回Header+Body的全部内容,这里设为4表示去掉4字节的Header,只留下Body)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    public class TcpServer {

    public static void main(String[] args) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch)
    throws Exception {
    ChannelPipeline pipeline = ch.pipeline();

    // LengthFieldBasedFrameDecoder按行分割消息,取出body
    pipeline.addLast(new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4));
    // 再按UTF-8编码转成字符串
    pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));

    pipeline.addLast(new TcpServerHandler());
    }
    });
    ChannelFuture f = b.bind(8080).sync();
    f.channel().closeFuture().sync();
    } finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
    }
    }

    }

    class TcpServerHandler extends ChannelInboundHandlerAdapter {

    // 接收到新的数据
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {

    String message = (String) msg;
    System.out.println("channelRead:" + message);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
    System.out.println("channelActive");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
    System.out.println("channelInactive");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
    }
    }

    Twisted

    在Twisted中需要继承Int32StringReceiver,不再继承Protocol。Int32StringReceiver表示固定32位(4字节)的Header,另外还有Int16StringReceiver、Int8StringReceiver等。而需要实现的接受数据事件的方法不再是dataReceived,也不是lineReceived,而是stringReceived。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    # -*- coding:utf-8 –*-

    from twisted.protocols.basic import Int32StringReceiver
    from twisted.internet.protocol import Factory
    from twisted.internet import reactor

    class TcpServerHandle(Int32StringReceiver):

    # 新的连接建立
    def connectionMade(self):
    print 'connectionMade'

    # 连接断开
    def connectionLost(self, reason):
    print 'connectionLost'

    # 接收到新的数据
    def stringReceived(self, data):
    print 'stringReceived:' + data

    factory = Factory()
    factory.protocol = TcpServerHandle
    reactor.listenTCP(8080, factory)
    reactor.run()

    下面是Java编写的一个客户端测试程序:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class TcpClient {

    public static void main(String[] args) throws IOException {

    Socket socket = null;
    DataOutputStream out = null;

    try {

    socket = new Socket("localhost", 8080);
    out = new DataOutputStream(socket.getOutputStream());

    // 请求服务器
    String data1 = "牛顿";
    byte[] outputBytes1 = data1.getBytes("UTF-8");
    out.writeInt(outputBytes1.length); // write header
    out.write(outputBytes1); // write body

    String data2 = "爱因斯坦";
    byte[] outputBytes2 = data2.getBytes("UTF-8");
    out.writeInt(outputBytes2.length); // write header
    out.write(outputBytes2); // write body

    out.flush();

    } finally {
    // 关闭连接
    out.close();
    socket.close();
    }

    }

    }

    MINA服务器输出结果:

    sessionCreated
    messageReceived:牛顿
    messageReceived:爱因斯坦
    sessionClosed

    Netty服务器输出结果:

    channelActive
    channelRead:牛顿
    channelRead:爱因斯坦
    channelInactive

    Twisted服务器输出结果:

    connectionMade
    stringReceived:牛顿
    stringReceived:爱因斯坦
    connectionLost

    MINA、Netty、Twisted一起学系列

    MINA、Netty、Twisted一起学(一):实现简单的TCP服务器

    MINA、Netty、Twisted一起学(二):TCP消息边界问题及按行分割消息

    MINA、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)

    MINA、Netty、Twisted一起学(四):定制自己的协议

    MINA、Netty、Twisted一起学(五):整合protobuf

    MINA、Netty、Twisted一起学(六):session

    MINA、Netty、Twisted一起学(七):发布/订阅(Publish/Subscribe)

    MINA、Netty、Twisted一起学(八):HTTP服务器

    MINA、Netty、Twisted一起学(九):异步IO和回调函数

    MINA、Netty、Twisted一起学(十):线程模型

    MINA、Netty、Twisted一起学(十一):SSL/TLS

    MINA、Netty、Twisted一起学(十二):HTTPS

    源码

    https://github.com/wucao/mina-netty-twisted

    文章目录
    1. 1. MINA
    2. 2. Netty
    3. 3. Twisted
  • MINA、Netty、Twisted一起学系列
    1. 1. 源码