Flink从Kafka获取数据写入MySQL的实现

Posted 一只楠喃

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink从Kafka获取数据写入MySQL的实现相关的知识,希望对你有一定的参考价值。

需求

获取实时热门商品

1、按 1分钟的窗口大小,每 3秒统计一次,做滑动窗口聚合
2、每个窗口聚合,输出每个窗口中点击量前 5 名的商品
3、水印延迟容忍时间为3秒

Kafka生产数据

package exam0714;

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.sql.Timestamp;
import java.util.Properties;
import java.util.Random;

public class UserActionToKafka 
    public static void main(String[] args) throws Exception 
        writeUserActionToKafka("user");

    

    // 包装一个写入kafka的方法
    public static void writeUserActionToKafka(String topic) throws Exception 
        // kafka 配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.52.100:9092");
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        Random random = new Random();
        String[] behaviors = "pv", "buy", "cart", "fav";
        while (true) 
            Thread.sleep(1);
            //随机用户
            Long userId = Math.round(random.nextDouble() * 10);
            //随机商品
            Long itemid = Math.round(random.nextDouble() * 10);
            //随机分类
            Long categoryId = Math.round(random.nextDouble() * 1000);
            //随机行为类型
            int index = random.nextInt(behaviors.length);
            String behavior = behaviors[index];
            Long currentTime = System.currentTimeMillis() ;

            userAction user = new userAction();
            user.setUserId(userId);
            user.setItemId(itemid);
            user.setCategoryId(categoryId);
            user.setBehavior(behavior);
            user.setEventTimeAction(currentTime);
            String u = JSON.toJSONString(user);
            kafkaProducer.send(new ProducerRecord<String, String>(topic,u));
            System.out.println(u);
            try 
                Thread.sleep(100);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
    


    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    private static class userAction 
        private Long userId;
        private Long itemId;
        private Long categoryId;
        private String behavior;
        private Long eventTimeAction;

        @Override
        public String toString() 
            return "userAction" +
                    "userId=" + userId +
                    ", itemId=" + itemId +
                    ", categoryId=" + categoryId +
                    ", behavior='" + behavior + '\\'' +
                    ", eventTimeAction=" + new Timestamp(eventTimeAction) +
                    '';
        
    

Pojo(我这里放成了内部类)

FactUser

private static class FactUser 
        private Long itemId;
        private Long aggCount;
        private Long reportTime;

        public Long getItemId() 
            return itemId;
        

        public void setItemId(Long itemId) 
            this.itemId = itemId;
        

        public Long getAggCount() 
            return aggCount;
        

        public void setAggCount(Long aggCount) 
            this.aggCount = aggCount;
        

        public Long getReportTime() 
            return reportTime;
        

        public void setReportTime(Long reportTime) 
            this.reportTime = reportTime;
        

        @Override
        public String toString() 
            return "FactUser" +
                    "itemId=" + itemId +
                    ", aggCount=" + aggCount +
                    ", reportTime=" + new Timestamp(reportTime) +
                    '';
        
    

userAction

   private static class userAction 
        private Long userId;
        private Long itemId;
        private Long categoryId;
        private String behavior;
        private Long eventTimeAction;

        public Long getUserId() 
            return userId;
        

        public void setUserId(Long userId) 
            this.userId = userId;
        

        public Long getItemId() 
            return itemId;
        

        public void setItemId(Long itemId) 
            this.itemId = itemId;
        

        public Long getCategoryId() 
            return categoryId;
        

        public void setCategoryId(Long categoryId) 
            this.categoryId = categoryId;
        

        public String getBehavior() 
            return behavior;
        

        public void setBehavior(String behavior) 
            this.behavior = behavior;
        

        public Long getEventTimeAction() 
            return eventTimeAction;
        

        public void setEventTimeAction(Long eventTimeAction) 
            this.eventTimeAction = eventTimeAction;
        

        @Override
        public String toString() 
            return "userAction" +
                    "userId=" + userId +
                    ", itemId=" + itemId +
                    ", categoryId=" + categoryId +
                    ", behavior='" + behavior + '\\'' +
                    ", eventTimeAction=" + new Timestamp(eventTimeAction) +
                    '';
        
    

从Kafka读取

 private static DataStream<String> readKafka(StreamExecutionEnvironment env, String topic) 
        Properties props = new Properties();
        //集群地址
        props.setProperty("bootstrap.servers", "192.168.52.100:9092");
        //消费者组id
        props.setProperty("group.id", "flink");
        //从最新的地方开始读取
        props.setProperty("auto.offset.reset", "latest");
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), props);
        //使用kafkaSource
        DataStream<String> kafkaDS = env.addSource(kafkaSource);
        return kafkaDS;
    

环境搭建

public static void main(String[] args) throws Exception 
        //搭建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //降低并行度便于测试
        env.setParallelism(1);
        //读取kafka的user topic 读数据
        DataStream<String> kafkaDS = readKafka(env, "user");
        //处理数据
        SingleOutputStreamOperator<userAction> mapDS = kafkaDS
                //将kafka的jason数据转成pojo对象
                .map(new MapFunction<String, userAction>() 
                    @Override
                    public userAction map(String value) throws Exception 
                        return JSON.parseObject(value, userAction.class);
                    
                )
                //过滤数据,拿到pv数据
                .filter(x -> "pv".equals(x.behavior));

        mapDS.print();
        //设置水位线

        SingleOutputStreamOperator<userAction> userActionWatermark = mapDS.assignTimestampsAndWatermarks(
                //三种水位线:
                //forBoundedOutOfOrderness 有界无序 在3秒内
                WatermarkStrategy.<userAction>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        //按照事件事件为水位线
                        .withTimestampAssigner((event, timestamp) -> event.eventTimeAction)
        );

        SingleOutputStreamOperator<FactUser> aggregate = userActionWatermark
                //按照需求分组
                .keyBy(t -> t.itemId)
                //设置滚动窗口或滚动窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                //聚合(增量聚合,全量聚合)
                .aggregate(new userActionAggregate(), new userProcessWindowFunction());
//        aggregate.print();
        System.out.println("======================================");

        SingleOutputStreamOperator<FactUser> processDS = aggregate
                //分组聚合00000000
                .keyBy(t -> t.reportTime)
                //全量聚合
                .process(new TopN(5));
        processDS.print();
       // processDS.addSink(new mysqlSink());
        env.execute();
    

增量数据处理

 private static class userActionAggregate implements AggregateFunction<userAction, Long, Long> 
        @Override
        public Long createAccumulator() 
            return 0L;
        

        @Override
        public Long add(userAction value, Long accumulator) 
            return accumulator += 1;
        

        @Override
        public Long getResult(Long
                                              accumulator) 
            return accumulator;
        

        @Override
        public Long merge(Long a, Long b) 
            return a + b;
        
    

全量数据处理

  private static class userProcessWindowFunction extends ProcessWindowFunction<Long, FactUser, Long, TimeWindow> 
        @Override
        public void process(Long key, Context context, Iterable<Long> elements, Collector<FactUser> out) throws Exception 
            Long next = elements.iterator().next();
            FactUser factUser = new FactUser();
            factUser.setItemId(key);
            factUser.setAggCount(next);
            factUser.setReportTime(context.window().getEnd());
            out.collect(factUser);
        
    

业务处理

/**
     * 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串
     * K , I , O
     */
    public static class TopN extends KeyedProcessFunction<Long, FactUser, FactUser> 
        private final int n;

        public TopN(int n) 
            this.n = n;
        

        // 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
        private transient ListState<FactUser> itemState = null;

        @Override
        public void open(Configuration parameters) throws Exception 
            super.open(parameters);
            // 状态的注册
            ListStateDescriptor<FactUser> itemsStateDesc = new ListStateDescriptor<>(
                    "itemState-state",
                    FactUser.class);
            itemState = getRuntimeContext().getListState(itemsStateDesc);
        

        @Override
        public void processElement(FactUser itemViewCount, Context context, Collector<FactUser> collector) throws Exception 
            // 每条数据都保存到状态中
            this.itemState.add(itemViewCount);
            // 注册 windowEnd + 1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
            context.timerService().registerEventTimeTimer(itemViewCount.getReportTime() + 1);
        

        // -------------------------------------------
        // 定时器的代码实现
        // -------------------------------------------
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<FactUser> out) throws Exception 
            super.onTimer(timestamp, ctx, out);
            // 获取收到的所有商品点击量
            List<FactUser> allItems = new ArrayList<>();
            for (FactUser item : itemState.get()) 
                allItems.add(item);
            
            // 提前清除状态中的数据,释放空间
            itemState.clear();
            // 按照点击量从大到小排序
            allItems.sort((o1, o2) -> Long.compare(o2.aggCount, o1.aggCount));

            // 将排名信息格式化成 String, 便于打印
            StringBuilder result = new StringBuilder();
            result.append("====================================\\n");
            result.append("时间: "Flink从Kafka获取数据写入MySQL的实现

Flink最后一站___Flink数据写入Kafka+从Kafka存入Mysql

Flink最后一站___Flink数据写入Kafka+从Kafka存入Mysql

2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql

Flink从Kafka写入mysql

Spark从Kafka获取数据写入MySQL的实现(流式数据)