Netty-JBoss Marshalling编解码

Netty-JBoss Marshalling编解码

Tags: Netty

摘要

Netty-JBoss Marshalling编解码

JBoss Marshalling引用Jar下载

maven配置

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>netty_marshalling</groupId>
    <artifactId>netty_marshalling</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>netty_marshalling</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha2</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling</artifactId>
            <version>1.3.0.CR9</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>1.3.0.CR9</version>
        </dependency>
    </dependencies>
</project>

Netty的Marshalling服务器端开发

package netty_marshalling;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * @author lilinfeng
 * @date 2014年2月14日
 * @version 1.0
 */
public class SubReqServer {
    public void bind(int port) throws Exception {
    // 配置服务端的NIO线程组
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 100)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(
                    MarshallingCodeCFactory
                        .buildMarshallingDecoder());
                ch.pipeline().addLast(
                    MarshallingCodeCFactory
                        .buildMarshallingEncoder());
                ch.pipeline().addLast(new SubReqServerHandler());
            }
            });

        // 绑定端口,同步等待成功
        ChannelFuture f = b.bind(port).sync();

        // 等待服务端监听端口关闭
        f.channel().closeFuture().sync();
    } finally {
        // 优雅退出,释放线程池资源
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
    }

    public static void main(String[] args) throws Exception {
    int port = 8080;
    if (args != null && args.length > 0) {
        try {
        port = Integer.valueOf(args[0]);
        } catch (NumberFormatException e) {
        // 采用默认值
        }
    }
    new SubReqServer().bind(port);
    }
}

48行添加解码器 MarshallingCodeCFactory.buildMarshallingDecoder()

52行添加编码器MarshallingCodeCFactory.buildMarshallingEncoder()

MarshallingCodeCFactory.java

package netty_marshalling;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;


public final class MarshallingCodeCFactory {

    /**
     * 创建Jboss Marshalling解码器MarshallingDecoder
     *
     * @return
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        final MarshallerFactory marshallerFactory = Marshalling
                .getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(
                marshallerFactory, configuration);
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }

    /**
     * 创建Jboss Marshalling编码器MarshallingEncoder
     *
     * @return
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling
                .getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(
                marshallerFactory, configuration);
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}

serial 表示的是Java序列化工厂对象
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); 1024表示单个消息序列化后的最大长度

Netty的Marshalling客户端开发

客户端启动类SubReqClient

package netty_marshalling;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class SubReqClient {

    public void connect(int port, String host) throws Exception {
    // 配置客户端NIO线程组
    EventLoopGroup group = new NioEventLoopGroup();
    try {
        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch)
                throws Exception {
                ch.pipeline().addLast(
                    MarshallingCodeCFactory
                        .buildMarshallingDecoder());
                ch.pipeline().addLast(
                    MarshallingCodeCFactory
                        .buildMarshallingEncoder());
                ch.pipeline().addLast(new SubReqClientHandler());
            }
            });

        // 发起异步连接操作
        ChannelFuture f = b.connect(host, port).sync();

        // 当代客户端链路关闭
        f.channel().closeFuture().sync();
    } finally {
        // 优雅退出,释放NIO线程组
        group.shutdownGracefully();
    }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
    int port = 8080;
    if (args != null && args.length > 0) {
        try {
        port = Integer.valueOf(args[0]);
        } catch (NumberFormatException e) {
        // 采用默认值
        }
    }
    new SubReqClient().connect(port, "127.0.0.1");
    }
}

运行Marshalling客户端和服务器端

服务器端 SubReqServer.java
客户端 SubReqClient.java

客户端发送了10次请求,服务器端也能正常收到10次,然后客户端也收到10次响应。说明Netty的Marshalling编解码器支持半包和粘包的处理,对于开发者而言只需要正确的讲Marshalling编码器和解码器加入到ChannelPipline中,就能实现Marshalling序列化的支持。

示例代码地址

https://coding.net/u/javacore/p/netty_marshalling/git