java socket bio 改造为 netty nio

news/2024/9/30 10:23:43 标签: java, nio

        公司早些时候接入一款健康监测设备,由于业务原因近日把端口暴露在公网后,每当被恶意连接时系统会创建大量线程,在排查问题是发现是使用了厂家提供的服务端demo代码,在代码中使用的是java 原生socket,在发现连接后使用独立线程处理后续通信,占用系统资源造成了服务宕机,因此需要进行改造。

        厂家提供的demo代码如下:

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

public class Demo {

    public static void main(String[] args) {
        int port = 8003;
        if (args.length == 1) {
            port = Integer.parseInt(args[0]);
        }
        ServerSocket ss;
        try {
            ss = new ServerSocket(port);
        }
        catch (Exception e) {
            System.out.println("服务端socket失败 port = " + port);
            return;
        }
        System.out.println("启动socket监听 端口:" + port);
        List<Socket> socketList = new ArrayList<>();
        while (true) {
            try {
                Socket socket = ss.accept();
                if (socket == null || socket.isClosed()) {
                    socketList.remove(socket);
                    continue;
                }
                if (socketList.contains(socket)) {
                    continue;
                }
                socketList.add(socket);
                System.out.println("socket连接 address = " + socket.getInetAddress().toString() + " port = " + socket.getPort());
                new Thread(new HealthReadThread(socket)).start();
            }
            catch (IOException e) {
                System.out.println(e.getMessage());
            }
        }
    }
}
import java.io.*;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class HealthReadThread implements Runnable {

    private Socket socket;

    HealthReadThread(Socket socket) {
        this.socket = socket;
    }

    private static String message = "";

    @Override
    public void run() {
        try {
            //输入
            InputStream inPutStream = socket.getInputStream();
            BufferedInputStream bis = new BufferedInputStream(inPutStream);
//            BufferedReader br = new BufferedReader(new InputStreamReader(inPutStream));
            //输出
            OutputStream outputStream = socket.getOutputStream();
            BufferedOutputStream bw = new BufferedOutputStream(outputStream);

            String ip = socket.getInetAddress().getHostAddress();
            int port = socket.getPort();

            String readStr = "";
//            char[] buf;
            byte[] buf;
            int readLen = 0;
            while (true) {
                if (socket.isClosed()) {
                    break;
                }
                buf = new byte[1024];
                try {
                    readLen = bis.read(buf);
                    if (readLen <= 0) {
//                        System.out.println(Thread.currentThread().getId() + "线程: " + "ip地址:" + ip + " 端口地址:" + port + "暂无接收数据");
                        continue;
                    }
                    System.out.println(Thread.currentThread().getId() + "线程: " + "ip地址:" + ip + " 端口地址:" + port + " 接收到原始命令长度:" + readLen);
                    readStr = StringUtils.byteToHexString(buf, readLen);
//                    readStr = new String(buf ,0 , readLen);
                } catch (IOException e) {
                    System.out.println(e.getMessage());
                    socket.close();
//                    continue;
                }
                if (readStr == null || "".equals(readStr)) {
                    continue;
                }
                // 省略业务代码
            }
        }
        catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

使用netty进行改造:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DeviceNettyServer implements ApplicationRunner {

    @Override
    public void run(ApplicationArguments args) throws Exception {
        start();
    }

    public void start() {
        Thread thread = new Thread(() -> {
            // 配置服务端的NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(4);

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    // 使用 NIO 方式进行网络通信
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            // 添加自己的处理器
                            ch.pipeline().addLast(new DeviceMsgHandler());
                        }
                    });

            try {
                int port1 = 8081;
                int port2 = 8082;
                // 绑定一个端口并且同步,生成一个ChannelFuture对象
                ChannelFuture f1 = b.bind(port1).sync();
                ChannelFuture f2 = b.bind(port2).sync();
                log.info("启动监听, 端口:" + port1 + "、" + port2);

                // 对关闭通道进行监听
                f1.channel().closeFuture().sync();
                f2.channel().closeFuture().sync();
            } catch (Exception e) {
                log.error("启动监听失败", e);
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        });
        thread.setName("DeviceNettyServer");
        thread.start();
    }
}
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class DeviceMsgHandler extends SimpleChannelInboundHandler<ByteBuf> {

    /**
     * 已连接的设备
     */
    private static final ConcurrentHashMap<Channel, DeviceDTO> CONNECTION_DEVICE_MAP = new ConcurrentHashMap<>(8);

    /**
     * 一旦连接,第一个被执行
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        String remoteAddress = ctx.channel().remoteAddress().toString();
        log.info("发现连接, remoteAddress: " + remoteAddress);
        // 发送查询设备信息指令
        sendQuery(ctx.channel());
    }

    /**
     * 读取数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
        byte[] bytes = new byte[msg.readableBytes()];
        msg.readBytes(bytes);
        // 忽略业务处理代码

        // 传递给下一个处理器
        ctx.fireChannelRead(msg);
    }

    /**
     * 连接断开
     *
     * @param ctx
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        log.info("连接断开, remoteAddress: " + ctx.channel().remoteAddress());
        CONNECTION_DEVICE_MAP.remove(ctx.channel());
    }

    /**
     * 连接异常
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.info("连接异常, remoteAddress: " + ctx.channel().remoteAddress());
        CONNECTION_DEVICE_MAP.remove(ctx.channel());
       
    }

经过改造后使用了4个worker线程进行读写,消除了原先恶意连接造成线程数无线扩大的问题,使用nio也极大的提高了系统资源利用率。


http://www.niftyadmin.cn/n/5684893.html

相关文章

[Docker学习笔记]Docker的原理Docker常见命令

文章目录 什么是DockerDocker的优势Docker的原理Docker 的安装Docker 的 namespaces Docker的常见命令docker version:查看版本信息docker info 查看docker详细信息我们关注的信息 docker search:镜像搜索docker pull:镜像拉取到本地docker push:推送本地镜像到镜像仓库docker …

当 ucx --with-cuda 时做了什么

1&#xff0c;找一只活麻雀&#xff0c;下载编译 ucx git clone https://github.com/openucx/ucx.git cd ucx/ git checkout v1.16.0 ./autogen.sh ./autogen.sh mkdir build cd build ../contrib/configure-devel --with-cuda/usr/local/cuda --without-rocm --without-java …

最佳ThreeJS实践 · 实现赛博朋克风格的三维图像气泡效果

在现代计算机图形学和游戏开发中&#xff0c;创建引人入胜且逼真的三维场景是至关重要的。赛博朋克风格&#xff0c;以其鲜艳的色彩、充满未来感的细节以及复杂的光影效果&#xff0c;成为了许多开发者和艺术家的热门选择。在本文中&#xff0c;我们将深入探讨如何利用 Three.j…

<Rust>iced库(0.13.1)学习之部件(二十九):button部件新增方法on_press_with,可传入闭包函数

前言 本专栏是学习Rust的GUI库iced的合集,将介绍iced涉及的各个小部件分别介绍,最后会汇总为一个总的程序。 iced是RustGUI中比较强大的一个,目前处于发展中(即版本可能会改变),本专栏基于版本0.12.1. 注:新版本已更新为0.13 概述 这是本专栏的第二十九篇,在新版本中…

数据结构串的kmp相关(求next和nextval)

傻瓜版&#xff0c;用来演示手算过程&#xff0c;个人理解用的&#xff0c;仅供参考。

STM32常见配置

二. GPIO配置 2.1 初始化 GPIO时钟 使能所需GPIO端口的时钟&#xff1a; RCC_AHB1PeriphClockCmd(RCC_AHB1Periph_GPIOA, ENABLE);2.2 配置 GPIO 引脚 创建一个GPIO初始化结构体并配置引脚‘ GPIO_InitTypeDef GPIO_InitStruct;// 配置引脚为推挽输出模式 GPIO_InitStruct…

爬虫入门之爬虫原理以及请求响应

爬虫入门之爬虫原理以及请求响应 爬虫需要用到的库, 叫requests. 在导入requests库之前, 需要安装它, 打开cmd: 输入pip install 库名 pip install requests后面出现successful或requirement already就说明已经下载成功了!!! 下载出现的问题: 1.有报错或者是下载慢 修改镜像…

【三步 完全离线搭建 openwebui 】

完全离线linux 版open webui 的搭建 1.在具有网络连接的环境中下载whl 在有网络的环境&#xff0c;使用pip download可以保存所有的依赖包,可以使用-i 指定清华的镜像源加速下载速度。 # 命令&#xff1a; pip download <package_name> --only-binary:all: --wheel --…