Flink+Druid构建实时OLAP的探索

Posted chouyarn

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink+Druid构建实时OLAP的探索相关的知识,希望对你有一定的参考价值。

场景

k12在线教育公司的业务场景中,有一些业务场景需要实时统计和分析,如分析在线上课老师数量、学生数量,实时销售额,课堂崩溃率等,需要实时反应上课的质量问题,以便于对整个公司的业务情况有大致的了解。

方案对比

对比了很多解决方案,如下几种,列出来供参考。

方案实时入库SQL支持度
Spark+CarbonData 支持 Spark SQL语法丰富
Kylin 不支持 支持join
Flink+Druid 支持 0.15以前不支持SQL,不支持join
  1. 上一篇文章所示,使用Spark+CarbonData也是一种解决方案,但是他的缺点也是比较明显,如不能和Flink进行结合,因为我们整个的大数据规划的大致方向是,Spark用来作为离线计算,Flink作为实时计算,并且这两个大方向短时间内不会改变;
  2. Kylin一直是老牌OLAP引擎,但是有个缺点无法满足我们的需求,就是在技术选型的那个时间点kylin还不支持实时入库(后续2.0版本支持实时入库),所以就选择了放弃;
  3. 使用Flink+Druid方式实现,这个时间选择这个方案,简直是顺应潮流呀,Flink现在如日中天,各大厂都在使用,Druid是OLAP的新贵,关于它的文章也有很多,我也不赘述太多。有兴趣的可以看下这篇文章,我的博客其它文章也有最新版本的安装教程,实操方案哦。

设计方案

实时处理采用Flink SQL,实时入库Druid方式采用 druid-kafka-indexing-service,另一种方式入库方式,Tranquility,这种方式测试下来问题多多,放弃了。数据流向如下图。

技术图片

 

场景举例

实时计算课堂连接掉线率。此事件包含两个埋点上报,进入教室和掉线分别上报数据。druid设计的字段

flink的处理

将上报的数据进行解析,上报使用的是json格式,需要解析出所需要的字段然后发送到kafka。字段包含如下

sysTime,DateTime格式
pt,格式yyyy-MM-dd
eventId,事件类型(enterRoom|disconnect)
lessonId,课程ID
Druid处理

启动Druid Supervisor,消费Kafka里的数据,使用预聚合,配置如下

技术图片

  "type": "kafka",
  "dataSchema": 
    "dataSource": "sac_core_analyze_v1",
    "parser": 
      "parseSpec": 
        "dimensionsSpec": 
          "spatialDimensions": [],
          "dimensions": [
            "eventId",
            "pt"
          ]
        ,
        "format": "json",
        "timestampSpec": 
          "column": "sysTime",
          "format": "auto"
        
      ,
      "type": "string"
    ,
    "metricsSpec": [
      
            "filter": 
                "type": "selector",
                "dimension": "msg_type",
                "value": "disconnect"
            ,
            "aggregator": 
                "name": "lesson_offline_molecule_id",
                "type": "cardinality",
                "fields": ["lesson_id"]
            ,
            "type": "filtered"
        , 
            "filter": 
                "type": "selector",
                "dimension": "msg_type",
                "value": "enterRoom"
            ,
            "aggregator": 
                "name": "lesson_offline_denominator_id",
                "type": "cardinality",
                "fields": ["lesson_id"]
            ,
            "type": "filtered"
        
    ],
    "granularitySpec": 
      "type": "uniform",
      "segmentGranularity": "DAY",
      "queryGranularity": 
        "type": "none"
      ,
      "rollup": true,
      "intervals": null
    ,
    "transformSpec": 
      "filter": null,
      "transforms": []
    
  ,
  "tuningConfig": 
    "type": "kafka",
    "maxRowsInMemory": 1000000,
    "maxBytesInMemory": 0,
    "maxRowsPerSegment": 5000000,
    "maxTotalRows": null,
    "intermediatePersistPeriod": "PT10M",
    "basePersistDirectory": "/tmp/1564535441619-2",
    "maxPendingPersists": 0,
    "indexSpec": 
      "bitmap": 
        "type": "concise"
      ,
      "dimensionCompression": "lz4",
      "metricCompression": "lz4",
      "longEncoding": "longs"
    ,
    "buildV9Directly": true,
    "reportParseExceptions": false,
    "handoffConditionTimeout": 0,
    "resetOffsetAutomatically": false,
    "segmentWriteOutMediumFactory": null,
    "workerThreads": null,
    "chatThreads": null,
    "chatRetries": 8,
    "httpTimeout": "PT10S",
    "shutdownTimeout": "PT80S",
    "offsetFetchPeriod": "PT30S",
    "intermediateHandoffPeriod": "P2147483647D",
    "logParseExceptions": false,
    "maxParseExceptions": 2147483647,
    "maxSavedParseExceptions": 0,
    "skipSequenceNumberAvailabilityCheck": false
  ,
  "ioConfig": 
    "topic": "sac_druid_analyze_v2",
    "replicas": 2,
    "taskCount": 1,
    "taskDuration": "PT600S",
    "consumerProperties": 
      "bootstrap.servers": "bd-prod-kafka01:9092,bd-prod-kafka02:9092,bd-prod-kafka03:9092"
    ,
    "pollTimeout": 100,
    "startDelay": "PT5S",
    "period": "PT30S",
    "useEarliestOffset": false,
    "completionTimeout": "PT1200S",
    "lateMessageRejectionPeriod": null,
    "earlyMessageRejectionPeriod": null,
    "stream": "sac_druid_analyze_v2",
    "useEarliestSequenceNumber": false
  ,
  "context": null,
  "suspended": false
View Code

 

最重要的配置是metricsSpec,他主要定义了预聚合的字段和条件。

数据查询

数据格式如下

pteventIdlesson_offline_molecule_idlesson_offline_denominator_id
2019-08-09 enterRoom "AQAAAAAAAA==" "AQAAAAAAAA=="
2019-08-09 disconnect "AQAAAAAAAA==" "AQAAAAAAAA=="

结果可以按照这样的SQL出

SELECT pt,CAST(APPROX_COUNT_DISTINCT(lesson_offline_molecule_id) AS DOUBLE)/CAST(APPROX_COUNT_DISTINCT(lesson_offline_denominator_id) AS DOUBLE) from sac_core_analyze_v1 group by pt

可以使用Druid的接口查询结果,肥肠的方便~

以上是关于Flink+Druid构建实时OLAP的探索的主要内容,如果未能解决你的问题,请参考以下文章

基于Flink构建企业级实时数仓(附项目源码)

流计算 Oceanus | 巧用 Flink 构建高性能 ClickHouse 实时数仓

阿里云 Flink+Hologres:构建企业级一站式实时数仓

实时OLAP分析利器Druid介绍

Demo:基于 Flink SQL 构建流式应用

基于Flink构建实时数据仓库