SpringBoot+UDPClient+定时任务通信实战
Posted 你比从前快乐;
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot+UDPClient+定时任务通信实战相关的知识,希望对你有一定的参考价值。
在springboot配置文件中添加需要连接的服务器地址及端口号
#udp连接的服务器地址与端口 host=10.1.7.113 port=1215
UDP客户端编写
package com.example.udp.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioDatagramChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * udpclient * * @author LH * @date 2021/12/20 9:53 */ @Component public class NettyUdpClient private static final Logger log = LoggerFactory.getLogger(NettyUdpClient.class); @Value("$host") private String HOST; @Value("$port") private int PORT; @Autowired ClientChannelInitializer clientChannelInitializer; //与服务端建立连接后得到的通道对象 private Channel channel; /** * 初始化 `Bootstrap` 客户端引导程序 * * @return */ private final Bootstrap getBootstrap() Bootstrap b = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(); b.group(group) .channel(NioDatagramChannel.class)//数据包通道,udp通道类型 .handler(clientChannelInitializer)//通道处理者 .option(ChannelOption.SO_BROADCAST, true);//开启广播 return b; /** * 建立连接,获取连接通道对象 * * @return */ public void connect() ChannelFuture channelFuture = getBootstrap().connect(HOST, PORT).syncUninterruptibly(); if (channelFuture != null && channelFuture.isSuccess()) log.warn("udp client connect host = , port = success", HOST, PORT); channel = channelFuture.channel(); else log.error("udp client connect host = , port = failed!", HOST, PORT);
添加自定义通道处理器
package com.example.udp.client; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.nio.NioDatagramChannel; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 通道初始化,主要用于设置各种Handler * * @author LH * @date 2021/12/20 9:55 */ @Component public class ClientChannelInitializer extends ChannelInitializer<NioDatagramChannel> @Autowired ClientChannelHandler clientChannelHandler; @Override protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception ChannelPipeline pipeline = nioDatagramChannel.pipeline(); //自定义Handler pipeline.addLast("clientChannelHandler", clientChannelHandler);
自定义数据处理handler编写
package com.example.udp.client; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** * 数据处理handler * * @author LH * @date 2021/12/20 9:56 */ @Component @ChannelHandler.Sharable @Slf4j public class ClientChannelHandler extends SimpleChannelInboundHandler<DatagramPacket> private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private ScheduledFuture<?> heartBeat; /** * 通道建立成功后 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception // 当channel就绪后 log.info("[UDP] client channel is ready!"); // 定时任务编写 this.heartBeat = this.scheduler.scheduleWithFixedDelay(new ClientMain(ctx), 0, 2, TimeUnit.SECONDS); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception System.out.println("[UDP] client 收到的消息:" + datagramPacket.content().toString(CharsetUtil.UTF_8)); @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) cause.printStackTrace(); ctx.close();
自定义类实现Runnable接口,开一个线程进行逻辑处理
package com.bwss.collectoroutlineserver.client; import com.alibaba.fastjson.JSON; import com.bwss.common.utils.ChannelMap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; import java.util.HashSet; import java.util.Map; import java.util.Set; /** * 数据业务逻辑处理 * * @author LH * @date 2021/12/20 15:26 */ public class ClientMain implements Runnable private final ChannelHandlerContext ctx; public ClientMain(ChannelHandlerContext ctx) this.ctx = ctx; @Override public void run() // 业务逻辑编写 // 将所有存活的设备id传送给业务端 ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(需要进行传输的数据), CharsetUtil.UTF_8));
主启动类编写,实现CommandLineRunner接口,在run方法中跑udp客户端
public class CollectorOutlineServerApplication implements CommandLineRunner public static void main(String[] args) throws Exception SpringApplication.run(CollectorOutlineServerApplication.class, args); @Autowired NettyUdpClient nettyUdpClient; @Override public void run(String... args) throws Exception // 启动udp客户端 nettyUdpClient.connect();
注意:
上述方法是采用Spring注入的方式进行的,你也可以使用 new 对象的方式进行,效果都是一样的。
上面是udp通信客户端的编写,服务器端需要处理繁杂的业务逻辑,因此可以自定义实现,下面给出简单的参考示例。
udp服务端简单示例
package com.example.udp.server; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; /** * udp服务端 * * @author LH * @date 2021/12/20 9:45 */ @Component public class NettyUdpServer private static final Logger log = LoggerFactory.getLogger(NettyUdpServer.class); private static final EventLoopGroup group = new NioEventLoopGroup(1); @Autowired ServerChannelInitializer serverChannelInitializer; @Value("$port") private int port; //监听端口的通道,即server的处理通道 private Channel channel; /** * 开启udp server服务 * * @return */ public ChannelFuture start() //启动类 Bootstrap serverBootstrap = new Bootstrap(); serverBootstrap.group(group)//组配置,初始化ServerBootstrap的线程组 .channel(NioDatagramChannel.class)//数据包通道,udp通道类型 .option(ChannelOption.SO_BROADCAST, true)//支持广播 .handler(serverChannelInitializer);//通道处理者 //Future:异步任务的生命周期,可用来获取任务结果 ChannelFuture channelFuture1 = serverBootstrap.bind(port).syncUninterruptibly();//绑定端口,开启监听,同步等待 if (channelFuture1 != null && channelFuture1.isSuccess()) log.info("[UDP] server start success, port = ", port); channel = channelFuture1.channel();//获取通道 else log.error("udp server start failed!!"); channelFuture1.cause().printStackTrace(); return channelFuture1; /** * 停止udp server服务 * 销毁前的拦截 */ @PreDestroy public void destroy() try if (channel != null) ChannelFuture await = channel.close().await(); if (!await.isSuccess()) log.error("udp channel close fail, ", await.cause()); Future<?> future1 = group.shutdownGracefully().await(); if (!future1.isSuccess()) log.error("udp group shutdown fail, ", future1.cause()); log.info("udp shutdown success"); catch (InterruptedException e) log.info("udp shutdown fail"); e.printStackTrace();
package com.example.udp.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.nio.NioDatagramChannel; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 自定义ChannelInitializer,通道初始化,主要用于设置各种Handler * * @author LH * @date 2021/12/20 9:49 */ @Component public class ServerChannelInitializer extends ChannelInitializer<NioDatagramChannel> @Autowired ServerChannelInboundHandler serverChannelHandler; @Override protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception ChannelPipeline pipeline = nioDatagramChannel.pipeline(); pipeline.addLast("serverChannelHandler", serverChannelHandler);
package com.example.udp.server; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * 通道数据输入的处理 * * @author LH * @date 2021/12/20 9:51 */ @Component @ChannelHandler.Sharable @Slf4j public class ServerChannelInboundHandler extends SimpleChannelInboundHandler<DatagramPacket> @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception System.out.println("[UDP] server 收到的消息:" + datagramPacket.content().toString(CharsetUtil.UTF_8)); String response = "" + datagramPacket.content().toString(CharsetUtil.UTF_8) + "的响应,我是服务端啊!!!"; DatagramPacket datagramPacket1 = new DatagramPacket(Unpooled.copiedBuffer(response, CharsetUtil.UTF_8), datagramPacket.sender()); channelHandlerContext.channel().writeAndFlush(datagramPacket1); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception super.channelActive(ctx);
server.port=8090
以上是关于SpringBoot+UDPClient+定时任务通信实战的主要内容,如果未能解决你的问题,请参考以下文章