首页 问答 文章
登陆
登陆 注册新账号
注册
已有账号登陆
Netty实现代理服务器
2019-03-20 19:49:00.0
public class HttpProxyClientHandle extends ChannelInboundHandlerAdapter {

    private Channel clientChannel;

    public HttpProxyClientHandle(Channel clientChannel) {
        this.clientChannel = clientChannel;
    }

    Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        FullHttpResponse response = (FullHttpResponse) msg;
        //修改http响应体返回至客户端
        response.headers().add("test","from proxy");
        clientChannel.writeAndFlush(msg);
    }
}

public class HttpProxyInitializer extends ChannelInitializer {

    private Channel clientChannel;

    SslContext sslCtx;
    public HttpProxyInitializer(Channel clientChannel) {
        try {
            sslCtx = SslContextBuilder.forClient()
                    .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } catch (SSLException e) {
            e.printStackTrace();
        }
        this.clientChannel = clientChannel;
    }

    Override
    protected void initChannel(Channel ch) throws Exception {
        if (sslCtx != null) {
            ch.pipeline().addLast(sslCtx.newHandler(ch.alloc(), "*", 443));
        }
        ch.pipeline().addLast(new HttpClientCodec());
        ch.pipeline().addLast(new HttpObjectAggregator(6553600));
        ch.pipeline().addLast(new HttpProxyClientHandle(clientChannel));
    }
}
public class HttpProxyServerHandle extends ChannelInboundHandlerAdapter {
    private ChannelFuture cf;
    private String host;
    private int port;

    private String content = "hello world";
    private final static String LOC = "302";
    private final static String NOT_FOND = "404";
    private final static String BAD_REQUEST = "400";
    private final static String INTERNAL_SERVER_ERROR = "500";
    private static Map<String, HttpResponseStatus> mapStatus = new HashMap<String, HttpResponseStatus>();
    static {
        mapStatus.put(LOC, HttpResponseStatus.FOUND);
        mapStatus.put(NOT_FOND, HttpResponseStatus.NOT_FOUND);
        mapStatus.put(BAD_REQUEST, HttpResponseStatus.BAD_REQUEST);
        mapStatus.put(INTERNAL_SERVER_ERROR, HttpResponseStatus.INTERNAL_SERVER_ERROR);
    }
    Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {

            FullHttpRequest request = (FullHttpRequest) msg;
            String host = request.headers().get("host");
            String[] temp = host.split(":");
            int port = 80;
            if (temp.length > 1) {
                port = Integer.parseInt(temp[1]);
            } else {
                if (request.uri().indexOf("https") == 0) {
                    port = 443;
                }
            }
            this.host = temp[0];
            this.port = port;
//            if ("CONNECT".equalsIgnoreCase(request.method().name())) {//HTTPS建立代理握手
//                HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, NettyHttpProxyServer.SUCCESS);
//                ctx.writeAndFlush(response);
//                ctx.pipeline().remove("httpCodec");
//                ctx.pipeline().remove("httpObject");
//                return;
//            }
            //连接至目标服务器
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(ctx.channel().eventLoop()) // 注册线程池
                    .channel(ctx.channel().getClass()) // 使用NioSocketChannel来作为连接用的channel类
                    .handler(new HttpProxyInitializer(ctx.channel()));

            ChannelFuture cf = bootstrap.connect(temp[0], 443);
            cf.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        future.channel().writeAndFlush(msg);
                    } else {
                        ctx.channel().close();
                    }
                }
            });
//            ChannelFuture cf = bootstrap.connect(temp[0], port).sync();
//            cf.channel().writeAndFlush(request);
        } else { // https 只转发数据,不做处理
            if (cf == null) {
                //连接至目标服务器
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(ctx.channel().eventLoop()) // 复用客户端连接线程池
                        .channel(ctx.channel().getClass()) // 使用NioSocketChannel来作为连接用的channel类
                        .handler(new ChannelInitializer() {

                            Override
                            protected void initChannel(Channel ch) throws Exception {
                                ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                    Override
                                    public void channelRead(ChannelHandlerContext ctx0, Object msg) throws Exception {
                                        ctx.channel().writeAndFlush(msg);
                                    }
                                });
                            }
                        });
                cf = bootstrap.connect(host, port);
                cf.addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()) {
                            future.channel().writeAndFlush(msg);
                        } else {
                            ctx.channel().close();
                        }
                    }
                });
            } else {
                cf.channel().writeAndFlush(msg);
            }
        }
    }
}

服务器代码

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
//        b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    Override
                    protected void initChannel(SocketChannel ch) {
                        CorsConfig corsConfig = CorsConfigBuilder.forAnyOrigin().allowNullOrigin().allowCredentials().build();
                        ChannelPipeline pipeline = ch.pipeline();
                        if (ch.localAddress().getPort() == 443) {
                            /** 如果是443端口, 则需要添加ssl处理器 */
                            SslHandler sslHandler = sslCtx.newHandler(ch.alloc());
                            pipeline.addLast(sslHandler);
                        }
//                        if (sslCtx != null) {
//                            pipeline.addLast(sslCtx.newHandler(ch.alloc()));
//                        }
//                        http://localhost/upload/20190216134048825000000.png
                        pipeline.addLast(new HttpResponseEncoder());
                        pipeline.addLast(new HttpRequestDecoder());
                        pipeline.addLast(new HttpContentCompressor());
//                        pipeline.addLast(new HttpServerCodec());
                        pipeline.addLast(new HttpObjectAggregator(1048576));//1048576
//                        pipeline.addLast(new HttpObjectAggregator(2 * 1024 * 1024));//1048576
                        pipeline.addLast(new ChunkedWriteHandler());
                        pipeline.addLast(new CorsHandler(corsConfig));
//                        pipeline.addLast(new HttpObjectAggregator(1048576));
                        pipeline.addLast(new WebResponseHandler2());
                    }
                });
        try {
//            b.bind(port).sync().channel().closeFuture().sync();

            List<ChannelFuture> futures = new ArrayList<>();
            for (Integer integer : port) {
                futures.add(b.bind(integer));
            }
            for (ChannelFuture f : futures) {
                f.channel().closeFuture().sync();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            logger.error("WebServer.java 127 ->" + e.getLocalizedMessage(), e.getCause());
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            // Wait until all threads are terminated.
            bossGroup.terminationFuture().syncUninterruptibly();
            workerGroup.terminationFuture().syncUninterruptibly();
//            System.gc();
//            System.runFinalization();
        }
热门文章
1
input 上传第二次不能选择同一文件
2
input="file" 浏览时只显示指定文件类型 xls、xlsx、csv
3
有时候操作系统的时间与pg的时间不一致
4
java stream 转二维结构为树状结构
5
java Object和Map转化
6
让Node.js项目实现热部署,修改文件避免重启
7
golang  mysql操作
8
BigDecimal的用法详解(保留两位小数,四舍五入,数字格式化)
9
java label
10
java Object和Map转化
最新文章
1
postgresql数据库
2
Mac 启动加载文件位置(可设置环境变量)
3
什么是ORM?
4
微信小程序自定义组件设置回调方法
5
java stream 转二维结构为树状结构
6
正则表达式
7
golang  mysql操作
8
利用Opencv实现简单的图片切割(JAVA)
9
PostgreSQL JSON 数据库
10
linux 自动运行脚本
热门标签
mac
java
jvm
微信小程序
小程序
自定义组件
回调
回调方法
golang
postgresql