《Flink应用实战》--合并流-Union算子
Posted lwqhp
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《Flink应用实战》--合并流-Union算子相关的知识,希望对你有一定的参考价值。
目录
一、基本概念
1.流合并条件
Flink 中的两个流要实现 Join 操作,必须满足以下两点:
-
流需要能够等待,即:两个流必须在同一个窗口中;
-
双流等值 Join,即:两个流中,必须有一个字段相等才能够 Join 上。
2.Flink 中支持 双流join 的算子
Flink 中支持双流 Join 的算子目前已知有5种,如下:
-
**union**
:union 支持双流 Join,也支持多流 Join。多个流类型必须一致; -
**connector**
:connector 支持双流 Join,两个流的类型可以不一致; -
**join**
:该方法只支持 inner join,即:相同窗口下,两个流中,Key都存在且相同时才会关联成功; -
**coGroup**
:同样能够实现双流 Join。即:将同一 Window 窗口内的两个DataStream 联合起来,两个流按照 Key 来进行关联,并通过 apply()方法 new CoGroupFunction() 的形式,重写 join() 方法进行逻辑处理。 -
**intervalJoin**
:Interval Join 没有 Window 窗口的概念,直接用时间戳作为关联的条件,更具表达力。
join() 和 coGroup() 都是 Flink 中用于连接多个流的算子,但是两者也有一定的区别,推荐能使用 coGroup 不要使用Join,因为coGroup更强大(**inner join 除外。就 inner join 的话推荐使用 join ,因为在 join 的策略上做了优化,更高效**
)
二、Union介绍
1.Union算子特点
在DataStream
上使用union
算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]
合并为一个新的DataStream[T]
。数据将按照先进先出(First In First Out)的模式合并,且不去重。下图union
对白色和深色两个数据流进行合并,生成一个数据流。
stream1.union(stream2, stream3, ...)
1 用于DataStream时,返回是Datastream;用于DataSet时,返回是DataSet;
2 可以多个流一起合并(stream1.union(stream2,stream3,stream4)),合并结果是一个新Datastream;只能2个DataSet一起合并,合并结果是一个新DataSet
3 无论是合并Datastream还是合并DataSet,都不去重,2个源的消息或记录都保存。
4 不可以union 2个类型不同的流或union 2个类型不同的数据集
2.Union算子源码解析
public final DataStream<T> union(DataStream<T>... streams)
List<Transformation<T>> unionedTransforms = new ArrayList<>();
unionedTransforms.add(this.transformation);
for (DataStream<T> newStream : streams)
if (!getType().equals(newStream.getType()))
throw new IllegalArgumentException("Cannot union streams of different types: "
+ getType() + " and " + newStream.getType());
unionedTransforms.add(newStream.getTransformation());
return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms));
StreamTransformation是所有transformation的抽象类,提供了实现transformation的基础功能。每一个DataStream都有一个与之对应的StreamTransformation。
DataStream 会在底层创建一个StreamTransformation树,而在程序的运行时,该拓扑结构会被翻译为StreamGraph。
-
union方法首先创建一个 ArrayList数组列表,数据类型是Transformation<T>,然后将当前DataStream的Transformation添加到列表中。
-
遍历union传入参数的 streams集合,并与当前DataStream的输入类型比较,不同则抛出错误,否则将新的 StreamTransformation 添加到列表中。
-
返回一个新的包含所有StreamTransformation 的DataStream。
三、Union开发实践
1.统计商品点击收藏和下单数
(1)需求:
-
实现日志流和订单流数据的合并,返回同一商品一段时间内的点击和收藏次数,以及下单数,下单金额和退款单数。
-
类似于实现以下SQL功能
select sku_id,sku_name ,sum(click_ct) as click_ct ,sum(favor_ct) as favor_ct ,sum(order_ct) as order_ct ,sum(payment_amount) as payment_amount ,sum(refund_order_ct) as refund_order_ct from ( select sku_id,sku_name,click_ct,favor_ct,0 as order_ct,0 as payment_amount ,0 as refund_order_ct from pagelog union all select sku_id,sku_name,0 as click_ct,0 as favor_ct,order_ct,payment_amount ,refund_order_ct from orderdetail ) a group by sku_id,sku_name
(2) 代码实现(测试通过)
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lwqhp.utils.DateTimeUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.Date;
import java.util.HashSet;
public class ProductStatApp
public static void main(String[] args) throws Exception
//1、获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2、获取数据源
//2.1 点击数据
//"common":"ar":"440000","ba":"Xiaomi","item":"1","item_name":"Xiaomi_Mix2","favor":"true","page":"during_time":7100,"item":"1","item_name":"Xiaomi_Mix2","item_type":"sku_id","last_page_id":"cart","page_id":"good_detail","ts":1654226400000
DataStreamSource<String> pageViewDataStream = env.socketTextStream("127.0.0.1", 9999);
//2.2 订单数据
//"detail_id":1,"order_id":1,"sku_id":1,"sku_name":"Xiaomi_Mix2","order_price":23,"total_amount":100,"order_status":"1","create_time":"2022-06-03 11:20:00"
DataStreamSource<String> orderDataStream = env.socketTextStream("127.0.0.1", 9998);
//3、数据转换
//将数据转换成统一的JavaBean
//3.1 页面点击获取点击和收藏
SingleOutputStreamOperator<ProductStat> productStatWithPageDS = pageViewDataStream.process(new ProcessFunction<String, ProductStat>()
@Override
public void processElement(String value, Context ctx, Collector<ProductStat> out) throws Exception
//将数据转换为JSON对象
JSONObject jsonObject = JSON.parseObject(value);
//获取数据时间
Long ts = jsonObject.getLong("ts");
//获取页面信息
JSONObject page = jsonObject.getJSONObject("page");
String page_id = page.getString("page_id");
String item_type = page.getString("item_type");
if ("good_detail".equals(page_id) && "sku_id".equals(item_type))
//取出被点击的商品ID和名称
Long item = page.getLong("item");
String item_name = page.getString("item_name");
out.collect(ProductStat.builder()
.sku_id(item)
.sku_name(item_name)
.click_ct(1L)
.ts(ts)
.build());
//获取收藏信息
JSONObject common = jsonObject.getJSONObject("common");
String favor = common.getString("favor");
if ("true".equals(favor))
Long item = page.getLong("item");
String item_name = page.getString("item_name");
out.collect(ProductStat.builder()
.sku_id(item)
.sku_name(item_name)
.favor_ct(1L)
.ts(ts)
.build());
);
productStatWithPageDS.print("productStatWithPageDS===>");
//3.2 获取订单信息
SingleOutputStreamOperator<ProductStat> productStatWithOrderDS = orderDataStream.map(line ->
//将数据转换为orderdetail
OrderDetail orderDetail = JSON.parseObject(line, OrderDetail.class);
//创建集合用于存放订单ID
HashSet<Long> hashSet = new HashSet<>();
HashSet<Long> refundSet = new HashSet<>();
hashSet.add(orderDetail.getOrder_id());
//退款订单
if ("1".equals(orderDetail.getOrder_status()))
refundSet.add(orderDetail.getOrder_id());
//封装对象并返回
return ProductStat.builder()
.sku_id(orderDetail.getSku_id())
.sku_name(orderDetail.getSku_name())
.payment_amount(orderDetail.getTotal_amount())
.orderIdSet(hashSet)
.refundOrderIdSet(refundSet)
.ts(DateTimeUtil.toTs(orderDetail.getCreate_time()))
.build();
);
productStatWithOrderDS.print("productStatWithOrderDS===>");
//3.3 将各个流Union合并
DataStream<ProductStat> unionDS = productStatWithOrderDS.union(productStatWithOrderDS);
//3.4 提取时间戳生成Watermark
SingleOutputStreamOperator<ProductStat> productStatWithWMDS = unionDS.assignTimestampsAndWatermarks(WatermarkStrategy
.<ProductStat>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<ProductStat>()
@Override
public long extractTimestamp(ProductStat element, long recordTimestamp)
return element.getTs();
));
//3.5 分组、开窗、聚合
SingleOutputStreamOperator<ProductStat> reduceDS = productStatWithWMDS
.keyBy(ProductStat::getSku_id)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce(new ReduceFunction<ProductStat>()
@Override
public ProductStat reduce(ProductStat value1, ProductStat value2) throws Exception
value1.setClick_ct(value1.getClick_ct() + value2.getClick_ct());
value1.setFavor_ct(value1.getFavor_ct() + value2.getFavor_ct());
value1.setPayment_amount(value1.getPayment_amount().add(value2.getPayment_amount()));
value1.getOrderIdSet().addAll(value2.getOrderIdSet());
value1.setOrder_ct(value1.getOrderIdSet().size() + 0L);
value1.getRefundOrderIdSet().addAll(value2.getRefundOrderIdSet());
value1.setRefund_order_ct(value1.getRefundOrderIdSet().size() + 0L);
return value1;
, new WindowFunction<ProductStat, ProductStat, Long, TimeWindow>()
@Override
public void apply(Long aLong, TimeWindow window, Iterable<ProductStat> input, Collector<ProductStat> out) throws Exception
//取出数据
ProductStat productStat = input.iterator().next();
//处理开窗开始和结束时间
productStat.setStt(DateTimeUtil.toYMDhms(new Date(window.getStart())));
productStat.setEdt(DateTimeUtil.toYMDhms(new Date(window.getEnd())));
//处理订单数量
productStat.setOrder_ct(productStat.getOrderIdSet().size() + 0L);
productStat.setFavor_ct(productStat.getRefundOrderIdSet().size() + 0L);
//写出数据
out.collect(productStat);
);
reduceDS.print("reduceDS====>");
//4、执行
env.execute("ProductStatApp");
javaBean:
/**
* 商品统计实体表
* 统计商品的销售指标
*
*/
import com.lwqhp.bean.TransientSink;
import lombok.Builder;
import lombok.Data;
import java.math.BigDecimal;
import java.util.HashSet;
import java.util.Set;
@Data
@Builder
public class ProductStat
String stt;//窗口起始时间
String edt; //窗口结束时间
Long sku_id; //sku编号
String sku_name;//sku名称
@Builder.Default
Long click_ct = 0L; //点击数
@Builder.Default
Long favor_ct = 0L; //收藏数
@Builder.Default
Long order_ct = 0L; //订单数
@Builder.Default //支付金额
BigDecimal payment_amount = BigDecimal.ZERO;
@Builder.Default
Long refund_order_ct = 0L; //退款订单数
@Builder.Default
@TransientSink
Set orderIdSet = new HashSet(); //用于统计订单数
@Builder.Default
@TransientSink
Set refundOrderIdSet = new HashSet();//用于退款支付订单数
Long ts; //统计时间戳
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.commons.lang3.ObjectUtils;
import java.math.BigDecimal;
@Data
@AllArgsConstructor
public class OrderDetail
Long detail_id;
Long order_id;
Long sku_id;
BigDecimal order_price;
String sku_name;
BigDecimal total_amount;
String order_status; //0:正常,1:退款
String create_time; //yyyy-MM-dd HH:mm:ss
相关工具类:
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Date;
public class DateTimeUtil
private final static DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static String toYMDhms(Date date)
LocalDateTime localDateTime = LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault());
return formater.format(localDateTime);
public static Long toTs(String YmDHms)
LocalDateTime localDateTime = LocalDateTime.parse(YmDHms, formater);
return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
参考资料:
Flink 学习笔记(源码篇)<一> ——Transformation_RobinXYuan的博客-CSDN博客
Apache Flink 源码解析之 stream-transformation - 开发者头条
以上是关于《Flink应用实战》--合并流-Union算子的主要内容,如果未能解决你的问题,请参考以下文章