Netty实战二-实现UDP的单播和广播

Posted m0_69526086

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty实战二-实现UDP的单播和广播相关的知识,希望对你有一定的参考价值。

================

Netty里已经帮我们封装好了UDP相关的实现类。使用起来也非常方便

1、AddressedEnvelope 接口

interface AddressedEnvelope<M, A extends SocketAddress> extends ReferenceCounted

定义一个消息,其包装了另一个消息并带有发送者和接收者地址。其中M 是消息类型;A 是地址类型

2、DefaultAddressedEnvelope类

class DefaultAddressedEnvelope<M, A extends SocketAddress>implements AddressedEnvelope<M,A>

提供了interface AddressedEnvelope的默认实现

3、DatagramPacket 类

class DatagramPacket extends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> implements ByteBufHolder

扩展了DefaultAddressedEnvelope 以使用ByteBuf 作为消息数据容器。DatagramPacket是final类不能被继承,只能被使用。

他有三个重要方法

content() 来获取消息内容

sender() 来获取发送者的消息

recipient() 来获取接收者的消息。

4、DatagramChannel 接口

interface DatagramChannel extends Channel

扩展了Netty 的Channel 抽象以支持UDP 的多播组管理

5、NioDatagramChannel

class NioDatagramChannel extends AbstractNioMessageChannel implements DatagramChannel

定义了一个能够发送和接收Addressed-Envelope 消息的Channel 类型

Netty 的DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。

三、Netty实现UDP单播

==============

1、 AnswerHandler

定义消息应答服务处理类,改类主要随机从字符串数组中选择一个发送给客户端

/**

  • 作者:DarkKIng

  • 类说明:应答处理Handler

*/

public class AnswerHandler extends

SimpleChannelInboundHandler

/应答的具体内容从常量字符串数组中取得,由nextQuote方法随机获取/

private static final String[] DICTIONARY =

“一个男生暗恋一个女生很久了。一天自习课上,男生偷偷的传了小纸条给女生,上面写着“其实我注意你很久了”。不一会儿,女生传了另一张纸条,男生心急火燎的打开一看“拜托你不要告诉老师,我保证以后再也不嗑瓜子了”。。。。。。男生一脸懵逼”,

“昨天因为一件事骂儿子,说你妈妈是猪,你也是头猪。儿子却反过来说我:爸爸你怎么这么衰,娶了一头猪,还生了一只猪!你说你这熊孩子,这是不是找打。”,

“火云邪神苦练多年,终于将蛤蟆功练至顶级并成功产下8个小蝌蚪。”,

“老婆永远是对的,这话永远也是对的。但老婆没想到的是,她不一定永远是老婆”,

“人生天地间没有谁是容易的,就算是思聪也得每天犯愁怎么花钱。”,

“今天去理发,洗剪吹68,烫发和染发668。我就做了个洗剪吹,结账的时候发现居然收我66”;

private static Random r = new Random();

private String nextQuote()

return DICTIONARY[r.nextInt(DICTIONARY.length-1)];

@Override

protected void channelRead0(ChannelHandlerContext ctx,

DatagramPacket packet)

throws Exception

//获得请求

String req = packet.content().toString(CharsetUtil.UTF_8);

System.out.println(“接收到请求:”+req);

if(UdpQuestionSide.QUESTION.equals(req))

String answer = UdpAnswerSide.ANSWER+nextQuote();

System.out.println(“接收到请求:”+req);

/**

  • 重新 new 一个DatagramPacket对象,我们通过packet.sender()来获取发送者的消息。重新发送出去!

*/

ctx.writeAndFlush(

new DatagramPacket(

Unpooled.copiedBuffer(

answer,

CharsetUtil.UTF_8),

packet.sender()));

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception

ctx.close();

cause.printStackTrace();

2、UdpAnswerSide

定义应答服务器

public final static String ANSWER = “笑话来了:”;

public void run(int port) throws Exception

EventLoopGroup group = new NioEventLoopGroup();

try

/*和tcp的不同,udp没有接受连接的说法,所以即使是接收端,

也使用Bootstrap*/

Bootstrap b = new Bootstrap();

/由于我们用的是UDP协议,所以要用NioDatagramChannel来创建/

b.group(group)

.channel(NioDatagramChannel.class)

.handler(new AnswerHandler());

//没有接受客户端连接的过程,监听本地端口即可

ChannelFuture f = b.bind(port).sync();

System.out.println(“应答服务已启动…”);

f.channel().closeFuture().sync();

finally

group.shutdownGracefully();

public static void main(String [] args) throws Exception

int port = 8080;

new UdpAnswerSide().run(port);

3、QuestoinHandler

定义应答服务器处理handler

/**

  • 作者:DarkKIng

  • 类说明:订阅handler,读取服务器的应答

*/

public class QuestoinHandler extends

SimpleChannelInboundHandler

@Override

protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg)

throws Exception

//获得应答,DatagramPacket提供了content()方法取得报文的实际内容

String response = msg.content().toString(CharsetUtil.UTF_8);

if (response.startsWith(UdpAnswerSide.ANSWER))

System.out.println(response);

ctx.close();

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception

cause.printStackTrace();

ctx.close();

4、UdpQuestionSide

定义了一个请求客户端

/**

  • 作者:DarkKIng

  • 类说明:订阅服务器

*/

public class UdpQuestionSide

public final static String QUESTION = “我想听个笑话”;

public void run(int port) throws Exception

EventLoopGroup group = new NioEventLoopGroup();

try

Bootstrap b = new Bootstrap();

b.group(group)

/由于我们用的是UDP协议,所以要用NioDatagramChannel来创建/

.channel(NioDatagramChannel.class)

.handler(new QuestoinHandler());

//不需要建立连接

Channel ch = b.bind(0).sync().channel();

//将UDP请求的报文以DatagramPacket打包发送给接受端

ch.writeAndFlush(

new DatagramPacket(

Unpooled.copiedBuffer(QUESTION,

CharsetUtil.UTF_8),

new InetSocketAddress(“127.0.0.1”,

port)))

.sync();

//不知道接收端能否收到报文,也不知道能否收到接收端的应答报文

// 所以等待15秒后,不再等待,关闭通信

if(!ch.closeFuture().await(15000))

System.out.println(“查询超时!”);

catch (Exception e)

group.shutdownGracefully();

public static void main(String [] args) throws Exception

int answerPort = 8080;

new UdpQuestionSide().run(answerPort);

5、程序演示

该程序主要实现了客户单向服务器单点请求一个笑话。服务器随机返回一个笑话。

开启应答服务

开启客户端发送请求

在开启一个客户端发送请求

四、Netty实现UDP广播

==============

1、LogConst

定义消息常量类,用来模拟日志

/**

  • 作者:DarkKIng

  • 类说明:日志信息,用String数组代替

*/

public class LogConst

public final static int MONITOR_SIDE_PORT = 9998;

private static final String[] LOG_INFOS =

“晨光微好,暖在夕阳。幽幽的巷子里,有着岁月酝酿的酒,愈久愈淳。一笔墨香,一盏明灯,记千帆过浪,数不尽的悲欢离合,待那水莲花开。”,

“未来无期,静在安好。一剪寒梅,纷扰了岁月,抚平了伤痕。摆动的双桨,拨动了心的潭水。陌上花开,落一地秋霜,红枫染了红尘,便许了你十里红装。”,

“离别的风,风干了月的泪。夜里的美”,

“是梦的呢喃低语,挥走一片彩云,段落成珠。拂袖离去,乘鹤而来,古道西风瘦马。斑驳的树影中,眉目如画的眼,轻语告别了往事如烟。”,

“无言的殇,几世沧桑,几生悲凉。一起剪了西窗烛,听了夜来风吹雨。昨日的叹息,今日的迷离,执一伞,存了一世一 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 笔的温情。一曲长歌,唱尽了一世繁华,一世缘……”,

“一世恋书,那便十里花开。一生凄凉,那便霜花十里。” ,

“一抹浓烟,便翻页书,展颜一笑,是时间带来遥远的梦。细数树的年轮,感受昨日惆怅,留一半清醒,梦一半叶落。在指尖流过的沙,海边浪花一朵朵,不相遇,才有不约而同。”,

“这世俗,太多牵挂留在心间,一点朱砂泪,一曲相诗歌。岁月朦胧,梦醒了人生,风雨相容,演绎了一段风情。雪亦梦,雨亦梦,万张红纸从天洒来。惊动了山,撼动了天。” +

“一纸情愁,一指烟凉。一相思,一思量,水漫岸头,我们都有着自己不同的三生故事。迎一夜秋风,送一世暖阳,一切冰雪里的花开,是我一生的柔情。” +

“记忆中的短笛,有着清风须来的气息,那时我们面向大海,海风在耳边述说着大海边缘的温暖故事。安好一轮冷月,静好了一残红日,这便是我的语言,我的情丝。” +

“一漫山水,一段情,留在了岁月,拭去了风,晴雨清风,倒是暖阳拂绿草。” +

“这便,晨光微好,花开静好……”;

private final static Random r = new Random();

public static String getLogInfo()

return LOG_INFOS[r.nextInt(LOG_INFOS.length-1)];

2、LogMsg

消息实体类

/**

  • 作者:DarkKIng

  • 类说明:日志实体类

*/

public final class LogMsg

public static final byte SEPARATOR = (byte) ‘:’;

/源的 InetSocketAddress/

private final InetSocketAddress source;

/消息内容/

private final String msg;

/消息id/

private final long msgId;

/消息发送的时间/

private final long time;

//用于传入消息的构造函数

public LogMsg(String msg)

this(null, msg,-1,System.currentTimeMillis());

//用于传出消息的构造函数

public LogMsg(InetSocketAddress source, long msgId,

String msg)

this(source,msg,msgId,System.currentTimeMillis());

public LogMsg(InetSocketAddress source, String msg, long msgId, long time)

this.source = source;

this.msg = msg;

this.msgId = msgId;

this.time = time;

//返回发送 LogMsg 的源的 InetSocketAddress

public InetSocketAddress getSource()

return source;

//返回消息内容

public String getMsg()

return msg;

//返回消息id

public long getMsgId()

return msgId;

//返回消息中的时间

public long getTime()

return time;

3、LogEventEncoder

日志编码类

/**

  • 作者:DarkKIng

  • 类说明:编码,将实际的日志实体类编码为DatagramPacket

*/

public class LogEventEncoder extends MessageToMessageEncoder

private final InetSocketAddress remoteAddress;

//LogEventEncoder 创建了即将被发送到指定的 InetSocketAddress

// 的 DatagramPacket 消息

public LogEventEncoder(InetSocketAddress remoteAddress)

this.remoteAddress = remoteAddress;

@Override

protected void encode(ChannelHandlerContext channelHandlerContext,

LogMsg logMsg, List out) throws Exception

byte[] msg = logMsg.getMsg().getBytes(CharsetUtil.UTF_8);

//容量的计算:两个long型+消息的内容+分割符

ByteBuf buf = channelHandlerContext.alloc()

.buffer(8*2 + msg.length + 1);

//将发送时间写入到 ByteBuf中

buf.writeLong(logMsg.getTime());

//将消息id写入到 ByteBuf中

buf.writeLong(logMsg.getMsgId());

//添加一个 SEPARATOR

buf.writeByte(LogMsg.SEPARATOR);

//将日志消息写入 ByteBuf中

buf.writeBytes(msg);

//将一个拥有数据和目的地地址的新 DatagramPacket 添加到出站的消息列表中

out.add(new DatagramPacket(buf, remoteAddress));

4、LogEventBroadcaster

Rxjs: 单播和多播

参考技术A

单播的意思是,每个普通的 Observables 实例都只能被一个观察者订阅,当它被其他观察者订阅的时候会产生一个新的实例。也就是普通 Observables 被不同的观察者订阅的时候,会有多个实例,不管观察者是从何时开始订阅,每个实例都是从头开始把值发给对应的观察者。

Subject 其中的一个变体就是 BehaviorSubject,它有一个“当前值”的概念。它保存了发送给消费者的最新值,当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到“当前值”,在定义一个 BehaviorSubject 时需要有初始值, 所以它始终保持有一个最新的值可以发送给后来订阅的 observer。

类似于 BehaviorSubject,可以发送旧值给新的订阅者,但是不仅是‘当前值’,还可以是之前的旧值。

只有当 Observable 执行完成时(执行 complete()),它才会将执行的最后一个值发送给观察者。

类似于了 last 运算符,他会等到 Observable 结束的时候将最后一个值发出,如果先前订阅 observer 时该 Observable 对象还没有 complete 的话,那么这个 observer 就不会接收到值,知道 Observable 完结后,才会收到它的最后一个值,如果后订阅的 observer 来到时该 Observable 对象已经完结,那么它就可以 立即 收到这个 observable 发出的最后一个值

Cold Observables 只有被 observers 订阅的时候,才会开始产生值。是单播的,有多少个订阅就会生成多少个订阅实例,每个订阅都是从第一个产生的值开始接收值,所以每个订阅接收到的值都是一样的。

Hot Observables 不管有没有被订阅都会产生值。是多播的,多个订阅共享同一个实例,是从订阅开始接受到值,每个订阅接收到的值是不同的,取决于它们是从什么时候开始订阅。

在这个年纪我还有一整个未来去面对更好的自己,你们也是。

refCount

当使用 refCount,是引用计数的 observable。
它表示当第一个订阅者开始订阅的时候,开始发送和产生值;
第二个订阅者(之后的订阅者)共享第一个订阅者的 Observables 实例,没有订阅者的时候,会自动取消订阅;之后再重新订阅,又从头开始发送值。

connect

当我们调用 ConnectableObservable.prototype.connect 方法,不管有没有被订阅,都会发送值。
订阅者共享同一个实例,订阅者接收到的值取决于它们何时开始订阅。在我们的例子中,第一个订阅等了一秒从 2 开始接受值,第二个订阅等了两秒从 3 开始接受值。

hotObservable .publish() .
所以它也需要 hotObservable$去 connect 或者是 refCount

以上是关于Netty实战二-实现UDP的单播和广播的主要内容,如果未能解决你的问题,请参考以下文章

QT之UDP通信

Python 实现udp组播

图说单播,组播,广播,选播和地域播

udp广播,单播,多播

关于socket组播和ssdp[修改1.2]

UNP学习 广播