.~day08_高级特性和新特性

Posted ChinaManor

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了.~day08_高级特性和新特性相关的知识,希望对你有一定的参考价值。

day08_Flink高级特性和新特性

今日目标

  • BroadcastState状态管理
  • Flink DataStream 双流 Join
  • Streaming File sink 落地
  • File Sink 落地
  • FlinkSQL 整合 Hive

BroadcastState 状态管理

  • broadcast state 广播变量状态

image-20210624081140798

  • 应用场景

    关联更新的规则,获取指定的数据(给ip得到经度纬度)=> 地图 API 获取到 省市区街道位置

  • 需求

    实时Flink DataStream 过滤出配置中(数据库)的用户,并在事件流中补全这批用户的基础信息。

  • 需求流程

image-20210624082644430

  • 开发步骤
package cn.itcast.flink.broadcast;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

/**
 * Author itcast
 * Date 2021/6/24 8:29
 * 两个数据流  1.事件流 2.用户配置流 3.connect关联操作 4.打印输出 5.执行任务
 * <String,String,String,Integer></>
 * {"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}
 * {"userID": "user_2", "eventTime": "2019-08-17 12:19:48", "eventType": "click", "productID": 1}
 * <String, String, Integer ></>
 * 'user_2', '李四', 20
 * 最终的数据流  6个 Tuple6<String,String,String,Integer,String,Integer></>
 * (user_3,2019-08-17 12:19:47,browse,1,王五,33)
 * (user_2,2019-08-17 12:19:48,click,1,李四,20)
 */
public class BroadcastStateDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //2.source
        //-1.构建实时数据事件流-自定义随机
        //<userID, eventTime, eventType, productID>
        DataStreamSource<Tuple4<String, String, String, Integer>> clickSource = env.addSource(new MySource());
        //-2.构建配置流-从mysql
        //<用户id,<姓名,年龄>>
        DataStreamSource<Map<String, Tuple2<String, Integer>>> configSource = env.addSource(new MySQLSource());
        //3.transformation
        //-1.定义状态描述器
        //MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =
        //new MapStateDescriptor<>("config",Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
        MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> broadcastDesc = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
        //-2.广播配置流
        //BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor);
        BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configSource.broadcast(broadcastDesc);
        //-3.将事件流和广播流进行连接
        //BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS);
        SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> result = clickSource.connect(broadcastDS)
                //-4.处理连接后的流-根据配置流补全事件流中的用户的信息
                .process(new BroadcastProcessFunction<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>, Tuple6<String, String, String, Integer, String, Integer>>() {

                    @Override
                    public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
                        //读取出来 f0 为 userId
                        //事件流中读取用户 userId
                        String userId = value.f0;
                        //从ctx环境变量中通过 desc 读取出来广播状态
                        ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(broadcastDesc);
                        //如果广播状态不为空,get(null) 获取出来 配置数据Tuple2
                        if (broadcastState != null) {
                            Map<String, Tuple2<String, Integer>> map = broadcastState.get(null);
                            //判断 map 不为空则
                            if (map != null) {
                                Tuple2<String, Integer> stringIntegerTuple2 = map.get(userId);
                                //取出姓名和年龄
                                //collect 收集 Tuple6
                                //3-4.处理(process)连接后的流-根据配置流补全事件流中的用户的信息,Tuple4和Tuple2合并
                                //处理每一条元素,processElement
                                out.collect(Tuple6.of(
                                        userId,
                                        value.f1,
                                        value.f2,
                                        value.f3,
                                        stringIntegerTuple2.f0,
                                        stringIntegerTuple2.f1
                                ));
                            }
                        }
                    }

                    //value就是MySQLSource中每隔一段时间获取到的最新的map数据
                    //先根据状态描述器获取历史的广播状态 ctx.getBroadcastState(desc)
                    @Override
                    public void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
                        //再清空历史状态 broadcastState 数据
                        BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(broadcastDesc);
                        //最后将最新的广播流数据放到 state 中(更新状态数据) broadcastState.put(null,value)
                        broadcastState.clear();
                        broadcastState.put(null, value);

                    }
                });

        //处理广播中的元素
        //4.sinks
        result.print();
        //5.execute
        env.execute();
    }

    /**
     * <userID, eventTime, eventType, productID>
     */
    public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>> {
        private boolean isRunning = true;
        @Override
        public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception {
            Random random = new Random();
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            while (isRunning){
                int id = random.nextInt(4) + 1;
                String user_id = "user_" + id;
                String eventTime = df.format(new Date());
                String eventType = "type_" + random.nextInt(3);
                int productId = random.nextInt(4);
                ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId));
                Thread.sleep(500);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    /**
     * <用户id,<姓名,年龄>>
     */
    public static class MySQLSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>> {
        private boolean flag = true;
        private Connection conn = null;
        private PreparedStatement ps = null;
        private ResultSet rs = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            conn = DriverManager.getConnection("jdbc:mysql://node3:3306/bigdata?useSSL=false",
                    "root",
                    "123456");
            String sql = "select `userID`, `userName`, `userAge` from `user_info`";
            ps = conn.prepareStatement(sql);
        }
        @Override
        public void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception {
            while (flag){
                Map<String, Tuple2<String, Integer>> map = new HashMap<>();
                ResultSet rs = ps.executeQuery();
                while (rs.next()){
                    String userID = rs.getString("userID");
                    String userName = rs.getString("userName");
                    int userAge = rs.getInt("userAge");
                    //Map<String, Tuple2<String, Integer>>
                    map.put(userID, Tuple2.of(userName,userAge));
                }
                ctx.collect(map);
                Thread.sleep(5000);//每隔5s更新一下用户的配置信息!
            }
        }
        @Override
        public void cancel() {
            flag = false;
        }
        @Override
        public void close() throws Exception {
            if (conn != null) conn.close();
            if (ps != null) ps.close();
            if (rs != null) rs.close();
        }
    }
}
  • 实时的数据流和 动态变化的数据库中的配置流 进行 connect 操作, 打印输出

双流 JOIN

  • 多个数据流 DataStream 之间进行 JOIN 操作

  • 双流 JOIN 分为两大类: Window 窗口的join, Interval 的 join

  • Window窗口 分为 tumbling 窗口, sliding 窗口, session 窗口

  • Interval 包括 下届, 上届

    image-20210624093501057

  • 需求

    订单明细表和商品表每 5 秒中进行一个窗口 JOIN , 将结果落地并打印输出

    • 开发步骤

      package cn.itcast.flink.broadcast;
      
      import com.alibaba.fastjson.JSON;
      import lombok.Data;
      import org.apache.flink.api.common.eventtime.*;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
      import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
      import org.apache.flink.streaming.api.windowing.time.Time;
      
      import java.math.BigDecimal;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Random;
      import java.util.UUID;
      import java.util.concurrent.TimeUnit;
      
      /**
       * Author itcast
       * Date 2021/6/24 9:40
       * Desc TODO
       */
      public class JoinDemo {
          public static void main(String[] args) throws Exception {
              // 创建流执行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(1);
              // 构建商品数据流
              SingleOutputStreamOperator<Goods> goodsSource = env.addSource(new GoodsSource()).assignTimestampsAndWatermarks(new GoodsWatermark());
              // 构建订单明细数据流
              SingleOutputStreamOperator<OrderItem> orderItemSource = env.addSource(new OrderItemSource()).assignTimestampsAndWatermarks(new OrderItemWatermark());
              // 订单表 join 商品表 订单表.goodsId===商品表.goodsId
              DataStream<FactOrderItem> result = orderItemSource.join(goodsSource)
                      .where(o -> o.goodsId)
                      .equalTo(g -> g.goodsId)
                      /// 窗口为滚动窗口 5 秒
                      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                      /// apply 实现 (OrderItem first, Goods second) -> factOrderItem
                      .apply((OrderItem first, Goods second) -> {
                          FactOrderItem factOrderItem = new FactOrderItem();
                          factOrderItem.setGoodsId(first.goodsId);
                          factOrderItem.setGoodsName(second.goodsName);
                          factOrderItem.setCount(new BigDecimal(first.count));
                          factOrderItem.setTotalMoney(new BigDecimal(first.count).multiply(second.goodsPrice));
                          return factOrderItem;
                      });
              //打印输出
              result.print();
              //执行环境
              env.execute();
          }
          //商品类实体类
          @Data
          public static class Goods {
              private String goodsId;
              private String goodsName;
              private BigDecimal goodsPrice;
      
              public static List<Goods> GOODS_LIST;
              public static Random r;
      
              static  {
                  r = new Random();
                  GOODS_LIST = new ArrayList<>();
                  GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));
                  GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));
                  GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
                  GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));
                  GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));
                  GOODS_LIST.add(new Goods(<

      以上是关于.~day08_高级特性和新特性的主要内容,如果未能解决你的问题,请参考以下文章

      MyBatis高级特性

      Day380.⼯⼚高级特性 -Spring5

      Java 8 新特性总结

      阶段1 语言基础+高级_1-3-Java语言高级_08-JDK8新特性_第1节 常用函数接口_8_常用的函数式接口_Supplier接口

      阶段1 语言基础+高级_1-3-Java语言高级_08-JDK8新特性_第1节 常用函数接口_9_常用的函数式接口_Consumer接口

      阶段1 语言基础+高级_1-3-Java语言高级_08-JDK8新特性_第1节 常用函数接口_10_常用的函数式接口_Consumer接口