DruidDruid读取Kafka数据的简单配置过程

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DruidDruid读取Kafka数据的简单配置过程相关的知识,希望对你有一定的参考价值。

Druid的单机版安装参考:https://blog.51cto.com/10120275/2429912

Druid实时接入Kafka的过程

下载、安装、启动kafka过程:

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.1/kafka_2.11-2.2.1.tgz
tar -zxvf kafka_2.11-2.2.1.tgz
ln -s kafka_2.11-2.2.1 kafka
$KAFKA_HOME/kafka-server-start.sh ~/kafka/config/server.properties 1>/dev/null 2>&1 &

创建topic : wikipedia
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia

解压wikiticker-2015-09-12-sampled.json.gz文件,这个步骤是给kafka topic准备输入文件

cd $DRUID_HOME/quickstart/tutorial
gunzip -k wikiticker-2015-09-12-sampled.json.gz

这个步骤操作完成后,在$DRUID_HOME/quickstart/tutorial文件夹下生成wikiticker-2015-09-12-sampled.json

技术图片

上图配置文件如下,其中bootstrap.servers配置kafka地址


  "type": "kafka",
  "dataSchema": 
    "dataSource": "wikipedia",
    "parser": 
      "type": "string",
      "parseSpec": 
        "format": "json",
        "timestampSpec": 
          "column": "time",
          "format": "auto"
        ,
        "dimensionsSpec": 
          "dimensions": [
            "channel",
            "cityName",
            "comment",
            "countryIsoCode",
            "countryName",
            "isAnonymous",
            "isMinor",
            "isNew",
            "isRobot",
            "isUnpatrolled",
            "metroCode",
            "namespace",
            "page",
            "regionIsoCode",
            "regionName",
            "user",
             "name": "added", "type": "long" ,
             "name": "deleted", "type": "long" ,
             "name": "delta", "type": "long" 
          ]
        
      
    ,
    "metricsSpec" : [],
    "granularitySpec": 
      "type": "uniform",
      "segmentGranularity": "DAY",
      "queryGranularity": "NONE",
      "rollup": false
    
  ,
  "tuningConfig": 
    "type": "kafka",
    "reportParseExceptions": false
  ,
  "ioConfig": 
    "topic": "wikipedia",
    "replicas": 2,
    "taskDuration": "PT10M",
    "completionTimeout": "PT20M",
    "consumerProperties": 
      "bootstrap.servers": "localhost:9092" 
    
  

接下来要将wikiticker-2015-09-12-sampled.json文件内容,利用kafka生产者脚本写入wikipedia的topic中

export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < PATH_TO_DRUID/quickstart/tutorial/wikiticker-2015-09-12-sampled.json

以上是关于DruidDruid读取Kafka数据的简单配置过程的主要内容,如果未能解决你的问题,请参考以下文章

flume从kafka读取数据到hdfs中的配置

Flink是如何从kafka中拉取数据的

读取 kafka 主题并通过 Rest API 公开数据以供 prometheus 抓取(Nodejs)

Apache Flink:使用Apache Kafka作为Sink的简单demo(数据结果存放地方)

在Windows下Kafka的基本配置及简单使用

zookeeper+kafka,使用Java实现消息对接读取