2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql

Posted 大数据Manor

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql相关的知识,希望对你有一定的参考价值。

在这里插入图片描述

前言

大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。

这次是上篇文章的续集,最新的Flink版本大大简化了之前复杂的写法~
之前的文章

首先准备模拟数据:

//1、准备配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1.itcast.cn:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("KafkaCustomPartitioner.class", "test.KafkaCustomPartitioner");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        

Kafka的一系列配置,可以从官网直接copy过来@~@~
在这里插入图片描述
然后正式生产模拟数据:

//2、创建KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        String[] categorys = {"女装", "男装", "图书", "家电", "洗护", "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公"};
        Random random = new Random();
        while (true){
            //随机生成分类和金额
            int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1]
            String category = categorys[index];//获取的随机分类
            double price = random.nextDouble() * 100;//注意nextDouble生成的是[0~1)之间的随机数,*100之后表示[0~100)

            CategoryPojo categoryPojo = new CategoryPojo(category, price,System.currentTimeMillis());
            String data = JSON.toJSONString(categoryPojo);

            //3、发送数据
            kafkaProducer.send(new ProducerRecord<String, String>("topicDemo",data));
            System.out.println("数据是"+data);
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

这里的实体类用Lombok,比较简单:
这是之前写的Lombok用法文章

 @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CategoryPojo {
        private String category;//分类名称
        private double price;//该分类总销售额
        private long time;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可
    }

在这里插入图片描述
有了数据写入Kafka,我们开始消费“她”:

设置一下Flink运行环境:

 //TODO 1.设置环境env
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //并行度为1,表示不分区
        env.setParallelism(1);

配置Kafka相关并从哪里开始读offset

//TODO 2设置Kafka相关参数
        Properties props = new Properties();
        //kafka的地址,消费组名
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.88.161:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"category");

        //Flink设置kafka的offset,从最新的开始
         FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "myDemo",
                new SimpleStringSchema(),
                props
        );
        consumer.setStartFromLatest();
        consumer.setCommitOffsetsOnCheckpoints(true);

第3步解析数据源并测试:

DataStreamSource<String> source = env.addSource(consumer);
         SingleOutputStreamOperator<Order> mapDS = source.map(new MapFunction<String, Order>() {
            @Override
            public Order map(String s) throws Exception {
                JSONObject jsonObject = JSON.parseObject(s);
                Order order = JSON.toJavaObject(jsonObject, Order.class);
                return order;
            }
        });
        //测试一下
        mapDS.print();

在这里插入图片描述
success!
在这里插入图片描述
最后存入mysql

//sink输出到Mysql
        result.addSink(JdbcSink.sink(
                "INSERT INTO t_order(category,price,time) values(?,?,?)",
                (ps,order)->{
                    ps.setString(1,order.category);
                    ps.setDouble(2,order.price);
                    ps.setLong(3,order.time);
                },
                //批处理
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://192.168.88.163:3306/bigdata?characterEncoding=utf-8") //jdbc
                .withUsername("root")   //配置用户名
                .withPassword("123456") //密码
                .withDriverName("com.mysql.jdbc.Driver") //驱动类
                .build()
        ));

        env.execute();

以上就是全部内容了,感谢您的阅读!
在这里插入图片描述
另外补充一些不成熟的代码:双流Join

//双流Join
        SingleOutputStreamOperator<Order> order1watermark = mapDS.assignTimestampsAndWatermarks(new OrderItem1WaterMark());
        SingleOutputStreamOperator<Order> order2watermark = mapDS.assignTimestampsAndWatermarks(new OrderItem2WaterMark());

        //商品ID=订单ID
        final DataStream<Order> result = order1watermark.join(order2watermark)
                .where(o1 -> o1.category)
                .equalTo(o2 -> o2.category)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply((o1, o2) -> {
                    Order order = new Order();
                    order.setCategory(o1.category);
                    order.setPrice(o2.price);
                    order.setTime(o2.time);
                    return order;
                });
//        result.print();

水印机制,简化了直接使用系统时间

//水印机制
    public static class OrderItem2WaterMark implements WatermarkStrategy<Order>{

        @Override
        public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Order>() {
                @Override
                public void onEvent(Order order, long l, WatermarkOutput watermarkOutput) {
                    watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));
                }

                @Override
                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                    watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }

        @Override
        public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element,recordTimestamp)->System.currentTimeMillis();
        }
    }
    public static class OrderItem1WaterMark implements WatermarkStrategy<Order> {
        @Override
        public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
        @Override
        public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Order>() {
                @Override
                public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
            }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }

好了,终于完成了✅
双流join不怎么会写,慢慢来吧,
毕竟对于考60分的人,下一次考80分已经是极大的进步~~

总结

以上便是Flink数据写入Kafka+从Kafka存入Mysql(二)~

喜欢的小伙伴欢迎一键三连!!!
我是manor,一枚相信技术改变世界的码农,我们下期再见~

在这里插入图片描述

以上是关于2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql的主要内容,如果未能解决你的问题,请参考以下文章

2021年最新最全Flink系列教程__Flink综合案例

2021年最新最全Flink系列教程_Flink快速入门(概述,安装部署)(建议收藏!!)

2021年最新最全Flink系列教程__Flink高级API

2021年最新最全Flink系列教程_Flink流批一体API

2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)

2021年最新最全Flink系列教程_Flink原理初探和流批一体API