Fllink实时计算运用Flink 大数据实战案例二

Posted mirson

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Fllink实时计算运用Flink 大数据实战案例二相关的知识,希望对你有一定的参考价值。

1. 订单支付状态跟踪统计(CEP运用)

  • 功能

    实现对热销商品的统计, 统计周期为一天, 每3秒刷新一次数据。

  • 核心代码

    主逻辑代码实现:

      
        /**
         * 执行Flink任务处理
         * @throws Exception
         */
        private void executeFlinkTask() throws Exception {
    
            // 1. 创建运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 2. 设置kafka服务连接信息
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.10.20.132:9092");
            properties.setProperty("group.id", "fink_group");
    
            // 3. 创建Kafka消费端
            FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer(
                    "orderPayment_binlog",                  // 目标 topic
                    new SimpleStringSchema(),   // 序列化 配置
                    properties);
    
            // 调试,重新从最早记录消费
            kafkaProducer.setStartFromEarliest();     // 尽可能从最早的记录开始
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    
            // 4. 读取Kafka数据源
            DataStreamSource<String> socketStr = env.addSource(kafkaProducer);
    
            // 5. 数据过滤转换处理
            DataStream<OrderPayment> orderPaymentDataStream = socketStr.filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String value) throws Exception {
                    JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value);
                    String isDDL = jsonObject.get("isDdl").getAsString();
                    String type = jsonObject.get("type").getAsString();
                    // 过滤条件: 非DDL操作, 并且是新增的数据
                    return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type);
                }
            }).flatMap(new FlatMapFunction<String, OrderPayment>() {
                @Override
                public void flatMap(String value, Collector<OrderPayment> out) throws Exception {
                    // 获取JSON中的data数据
                    JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data");
                    // 将data数据转换为java对象
                    for(int i =0; i< dataArray.size(); i++) {
                        JsonObject jsonObject = dataArray.get(i).getAsJsonObject();
                        OrderPayment orderPayment = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, OrderPayment.class);
                        System.out.println("orderPayment => " + orderPayment);
                        out.collect(orderPayment);
                    }
                }
            })
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderPayment>(Time.seconds(0)) {
                @Override
                public long extractTimestamp(OrderPayment element) {
                    return element.getUpdateTime();
                }
            })
            .keyBy(OrderPayment::getOrderId);
      
            // 6.通过CEP机制, 判断支付成功的数据
            Pattern<OrderPayment, ?> pattern = Pattern.<OrderPayment>begin("begin")
                    .where(new SimpleCondition<OrderPayment>() {
                        @Override
                        public boolean filter(OrderPayment value) throws Exception {
                            return value.getStatus() == 0;
                        }
                    }).next("follow").where(new SimpleCondition<OrderPayment>() {
                        @Override
                        public boolean filter(OrderPayment value) throws Exception {
                            return value.getStatus() == 1;
                        }
                    }).within(Time.seconds(15)).times(1);
    
            PatternStream<OrderPayment> patternStream = CEP.pattern(orderPaymentDataStream, pattern);
            // 7.定义超时数据的TAG标记
            OutputTag orderExpired = new OutputTag<OrderPayment>("orderExpired"){};
            DataStream<OrderPaymentResult> selectResult = patternStream.select(orderExpired,
                    new OrderExpiredMatcher(), new OrderPayedMatcher());
            selectResult.print("payed");
    
            // 8. 创建Kafka消费端(订单数据源)
            FlinkKafkaConsumer orderKafkaProducer = new FlinkKafkaConsumer(
                    "order_binlog",                  // 目标 topic
                    new SimpleStringSchema(),   // 序列化 配置
                    properties);
    
            orderKafkaProducer.setStartFromEarliest();     // 尽可能从最早的记录开始
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    
            DataStreamSource<String> orderSource = env.addSource(orderKafkaProducer);
    
            // 9. 数据过滤转换处理(订单数据源)
            DataStream<Order> orderDataStream = orderSource.filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String value) throws Exception {
                    JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value);
                    String isDDL = jsonObject.get("isDdl").getAsString();
                    String type = jsonObject.get("type").getAsString();
                    // 过滤条件: 非DDL操作, 并且是新增的数据
                    return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type);
                }
            }).flatMap(new FlatMapFunction<String, Order>() {
                @Override
                public void flatMap(String value, Collector<Order> out) throws Exception {
                    // 获取JSON中的data数据
                    JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data");
                    // 将data数据转换为java对象
                    for(int i =0; i< dataArray.size(); i++) {
                        JsonObject jsonObject = dataArray.get(i).getAsJsonObject();
                        Order order = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, Order.class);
                        System.out.println("order => " + order);
                        out.collect(order);
                    }
                }
            })
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(0)) {
                @Override
                public long extractTimestamp(Order element) {
                    return element.getExecTime();
                }
            });
    
            // 10. 数据源关联处理
            orderDataStream.keyBy(Order::getId).intervalJoin(selectResult.keyBy(OrderPaymentResult::getOrderId))
                    .between(Time.seconds(0), Time.seconds(15))
                    .process(new ProcessJoinFunction<Order, OrderPaymentResult, JoinOrderPayment>() {
                        @Override
                        public void processElement(Order left, OrderPaymentResult right, Context ctx, Collector<JoinOrderPayment> out) throws Exception {
                            JoinOrderPayment joinResult = JoinOrderPayment.build(left, right);
                            out.collect(joinResult);
                        }
                    })
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JoinOrderPayment>(Time.seconds(0)) {
                        @Override
                        public long extractTimestamp(JoinOrderPayment element) {
                            return element.getUpdateTime();
                        }
                    })
                    .keyBy(JoinOrderPayment::getGoodsId)
                    .timeWindow(Time.hours(24), Time.seconds(3))
                    .aggregate(new TotalAmount(), new AmountWindow())
                    .keyBy(HotOrder::getTimeWindow)
                    .process(new TopNHotOrder());
    
            // 11. 执行任务
            env.execute("job");
        }

商品金额累加器:

   /**
     * 商品金额累加器
     */
    private static class TotalAmount implements AggregateFunction<JoinOrderPayment, JoinOrderPayment, JoinOrderPayment> {
        @Override
        public JoinOrderPayment createAccumulator() {
            JoinOrderPayment order = new JoinOrderPayment();
            order.setTotalAmount(0l);
            return order;
        }

        /**
         * 商品销售总金额累加处理
         * @param value
         * @param accumulator
         * @return
         */
        @Override
        public JoinOrderPayment add(JoinOrderPayment value, JoinOrderPayment accumulator) {
            accumulator.setGoodsId(value.getGoodsId());
            accumulator.setGoodsName((value.getGoodsName()));
            accumulator.setStatus(value.getStatus());
            accumulator.setUpdateTime(value.getUpdateTime());
            accumulator.setTotalAmount(accumulator.getTotalAmount() + (value.getExecPrice() * value.getExecVolume()));
            return accumulator;
        }

        @Override
        public JoinOrderPayment getResult(JoinOrderPayment accumulator) {
            return accumulator;
        }

        @Override
        public JoinOrderPayment merge(JoinOrderPayment a, JoinOrderPayment b) {
            return null;
        }
    }

热销商品转换处理:

    /**
     * 热销商品, 时间窗口对象转换处理
     */
    private static class AmountWindow implements WindowFunction<JoinOrderPayment, HotOrder, Long, TimeWindow> {

        @Override
        public void apply(Long goodsId, TimeWindow window, Iterable<JoinOrderPayment> input, Collector<HotOrder> out) throws Exception {
            JoinOrderPayment order = input.iterator().next();
            out.collect(new HotOrder(order.getGoodsId(), order.getGoodsName(), order.getTotalAmount(), window.getEnd()));
        }
    }

热销商品的统计排行实现:

    /**
     * 热销商品的统计排行实现
     */
    private class TopNHotOrder extends KeyedProcessFunction<Long, HotOrder, String> {

        private ListState<HotOrder> orderState;

        @Override
        public void processElement(HotOrder value, Context ctx, Collector<String> out) throws Exception {
            // 将数据加入到状态列表里面
            orderState.add(value);
            // 注册定时器
            ctx.timerService().registerEventTimeTimer(value.getTimeWindow());
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            List<HotOrder> orderList = new ArrayList<>();
            for(HotOrder order : orderState.get()){
                orderList.add(order);
            }
            // 按照成交总金额, 倒序排列
            orderList.sort(Comparator.comparing(HotOrder::getTotalAmount).reversed());
            orderState.clear();
            // 将数据写入至ES
            HotOrderRepository hotOrderRepository = (HotOrderRepository) ApplicationContextUtil.getBean("hotOrderRepository");
            StringBuffer strBuf = new StringBuffer();
            for(HotOrder order: orderList) {
                order.setId(order.getGoodsId());
                order.setCreateDate(new Date(order.getTimeWindow()));
                hotOrderRepository.save(order);
                strBuf.append(order).append("\\n");
                System.out.println("result => " + order);
            }
            out.collect(strBuf.toString());
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            orderState = getRuntimeContext().getListState(new ListStateDescriptor<HotOrder>("hot-order", HotOrder.class));

        }
    }

超时数据的匹配处理:

private class OrderExpiredMatcher implements PatternTimeoutFunction<OrderPayment, OrderPaymentResult> {
        @Override
        public OrderPaymentResult timeout(Map<String, List<OrderPayment>> map, long l) throws Exception {
            OrderPaymentResult result = new OrderPaymentResult();
            OrderPayment payment = map.get("begin").iterator().next();
            result.setOrderId(payment.getOrderId());
            result.setStatus(payment.getStatus());
            result.setUpdateTime(payment.getUpdateTime());
            result.setMessage("支付超时");
            return result;
        }
    }

支付成功的匹配处理:

private class OrderPayedMatcher implements PatternSelectFunction<OrderPayment, OrderPaymentResult> {

        @Override
        public OrderPaymentResult select(Map<String, List<OrderPayment>> map) throws Exception {
            OrderPaymentResult result = new OrderPaymentResult();
            OrderPayment payment = map.get("follow").iterator().next();
            result.setOrderId(payment.getOrderId());
            result.setStatus(payment.getStatus());
            result.setUpdateTime(payment.getUpdateTime());
            result.setMessage("支付成功");
            return result;
        }
    }

2. 商品UV统计(普通统计)

  • 功能

    统计商品在一段时间内的UV(Unique Visitor),去重后的点击量, 根据IP去重。

  • 核心代码

    主逻辑实现:

    public class ScreenUniqueVisitorProcessor {
    
        /**
         * 执行flink任务处理
         * @throws Exception
         */
        public void executeFlinkTask() throws Exception {
            // 1. 创建运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
            env.setParallelism(1);
    
            // 2. 读取Socket数据源
    //        DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\\n");
            DataStreamSource<String> socketStr = env.readTextFile("./data/goods_access.log");
    
            // 3. 数据解析转换处理
            socketStr.flatMap(new FlatMapFunction<String, GoodsAccessLog>() {
    
                @Override
                public void flatMap(String value, Collector<GoodsAccessLog> out) throws Exception {
                    // 获取Json中的data数据
                    // 根据分隔符解析数据
                    String[] arrValue = value.split("\\t");
                    System.out.println("receive msg => " + value);
                    // 将数据组装为对象
                    GoodsAccessLog log = new GoodsAccessLog();
                    for(int i=0; i<arrValue.length; i++) {
                        if(i == 0) {
                            log.setIp(arrValue[i]);
                        }else if( i== 1) {
                            log.setAccessTime(Long.valueOf(arrValue[i]));
                        }else if( i== 2) {
                            log.setEventType(arrValue[i]);
                        }else if( i== 3) {
                            log.setGoodsId(arrValue[i]);
                        }
                    }
                    out.collect(log);
                }
            })
            .filter(new FilterFunction<GoodsAccessLog>() {
                @Override
                public boolean filter(GoodsAccessLog value) throws Exception {
                    return value.getEventType().equals("view");
                }
            })
            .keyBy(GoodsAccessLog::getGoodsId)
            .timeWindow(Time.seconds(10))
            .process( new ProcessWindowFunction<GoodsAccessLog, Map<String, String> , String, TimeWindow>(){
                @Override
                public void process(String key, Context context, Iterable<GoodsAccessLog> elements, Collector<Map<String, String>> out) throws Exception {
                    Set<String> ipSet = new HashSet<>();
                    Map<String, String> goodsUV = new LinkedHashMap<>();
                    elements.forEach( log -> {
                        ipSet.add(log.getIp());
                    });
                    goodsUV.put(key , context.window().getEnd() + ":" + ipSet.size());
                    out.collect(goodsUV);
                }
            })
            .print("uv result").setParallelism(1);
    
            // 5. 执行任务
            env.execute("job");
        }
    
    }

热销商品的金额累加处理:

/**
 * 商品金额累加器
 */
private static class TotalAmount implements AggregateFunction<Order, Order, Order> {
    @Override
    public Order createAccumulator() {
        Order order = new Order();
        order.setTotalAmount(0l);
        return order;
    }

    /**
     * 累加统计商品销售总金额
     * @param value
     * @param accumulator
     * @return
     */
    @Override
    public Order add(Order value, Order accumulator) {
        accumulator.setGoodsId(value.getGoodsId());
        accumulator.setGoodsName((value.getGoodsName()));
        accumulator.setTotalAmount(accumulator.getTotalAmount() + (value.getExecPrice() * value.getExecVolume()));
        return accumulator;
    }

    @Override
    public Order getResult(Order accumulator) {
        return accumulator;
    }

    @Override
    public Order merge(Order a, Order b) {
        return null;
    }
}

热销商品的数据转换处理, 用于统计:

/**
 * 热销商品, 在时间窗口内, 对象数据的转换处理
 */
private static class AmountWindow implements WindowFunction<Order, HotOrder, Long, TimeWindow> {

    @Override
    public void apply(Long goodsId, TimeWindow window, Iterable<Order> input, Collector<HotOrder> out) throws Exception {
        Order order = input.iterator().next();
        out.collect(new HotOrder(goodsId, order.getGoodsName(), order.getTotalAmount(), window.getEnd()));
    }
}

热销商品的统计排行处理逻辑:

/**
 * 热销商品的统计排行实现
 */
private class TopNHotOrder extends KeyedProcessFunction<Long, HotOrder, String> {

    private ListState<HotOrder> orderState;

    @Override
    public void processElement(HotOrder value, Context ctx, Collector<String> out) throws Exception {
        // 将数据加入到状态列表里面
        orderState.add(value);
        // 注册定时器
        ctx.timerService().registerEventTimeTimer(value.getTimeWindow());
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        List<HotOrder> orderList = new ArrayList<>();
        for(HotOrder order : orderState.get()){
            orderList.add(order);
        }
        // 按照成交总金额, 倒序排列
        orderList.sort(Comparator.comparing(HotOrder::getTotalAmount).reversed());
        orderState.clear();
        // 将数据写入至ES
        HotOrderRepository hotOrderRepository = (HotOrderRepository) ApplicationContextUtil.getBean("hotOrderRepository");
        StringBuffer strBuf = new StringBuffer();
        for(HotOrder order: orderList) {
            order.setId(order.getGoodsId());
            order.setCreateDate(new Date(order.getTimeWindow()));
            hotOrderRepository.save(order);
            strBuf.append(order).append("\\n");
            System.out.println("result => " + order);
        }
        out.collect(strBuf.toString());
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        orderState = getRuntimeContext().getListState(new ListStateDescriptor<HotOrder>("hot-order", HotOrder.class));

    }
}

3. 商品UV统计(布隆过滤器)

  • 功能

    功能: 统计商品在一段时间内的UV(采用布隆过滤器),去重后的点击量, 根据IP去重。

  • 核心代码

    主逻辑代码:

    /**
     * 执行flink任务处理
     * @throws Exception
     */
    public void executeFlinkTask() throws Exception {
        // 1. 创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
        // 2. 读取Socket数据源
    //        DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\\n");
        DataStreamSource<String> socketStr = env.readTextFile("./data/goods_access.log");
        // 3. 数据解析转换处理
        socketStr.flatMap(new FlatMapFunction<String, GoodsAccessLog>() {
    
            @Override
            public void flatMap(String value, Collector<GoodsAccessLog> out) throws Exception {
                // 获取Json中的data数据
                // 根据分隔符解析数据
                String[] arrValue = value.split("\\t");
                // 将数据组装为对象
                GoodsAccessLog log = new GoodsAccessLog();
                for(int i=0; i<arrValue.length; i++) {
                    if(i == 0) {
                        log.setIp(arrValue[i]);
                    }else if( i== 1) {
                        log.setAccessTime(Long.valueOf(arrValue[i]));
                    }else if( i== 2) {
                        log.setEventType(arrValue[i]);
                    }else if( i== 3) {
                        log.setGoodsId(arrValue[i]);
                    }
                }
                out.collect(log);
            }
        })
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<GoodsAccessLog>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(GoodsAccessLog element) {
                return element.getAccessTime();
            }
        })
        .filter(new FilterFunction<GoodsAccessLog>() {
            @Override
            public boolean filter(GoodsAccessLog value) throws Exception {
                return value.getEventType().equals("view");
            }
        })
        .keyBy(GoodsAccessLog::getGoodsId)
        .timeWindow(Time.minutes(30))
        .trigger(new CustomWindowTrigger())
        .process(new CustomUVBloom())
        .keyBy(0)
        .timeWindow(Time.seconds(3))
        .max(1)
        .print("uv result => ").setParallelism(1);
        // 5. 执行任务
        env.execute("job");
    }

本文由mirson创作分享,如需进一步交流,请加QQ群:19310171或访问www.softart.cn

以上是关于Fllink实时计算运用Flink 大数据实战案例二的主要内容,如果未能解决你的问题,请参考以下文章

Fllink实时计算运用Flink Table API & SQL 深入详解

Fllink实时计算运用Flink 自定义序列化Protobuf接入实现方案

flink hadoop 从0~1分布式计算与大数据项目实战zookeeper内部原理流程简介以及java curator client操作集群注册,读取

实时即未来,大数据项目车联网之创建Flink实时计算子工程

实时即未来,大数据项目车联网之创建Flink实时计算子工程

实时即未来,大数据项目车联网之Flink Watermark(水位线)十四