Apache Druid 数据摄取---本地数据和kafka流式数据
Posted 博学谷狂野架构师
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Druid 数据摄取---本地数据和kafka流式数据相关的知识,希望对你有一定的参考价值。
Durid概述
Apache Druid是一个高性能的实时分析型数据库。
上篇文章,我们了解了Druid的加载方式,
咱么主要说两种,一种是加载本地数据,一种是通过kafka加载流式数据。
数据摄取
4.1 加载本地文件
4.1.1.1 数据选择
4.1.1.2 演示数据查看
4.1.1.3 选择数据源
4.1.1.4 加载数据
4.1.2 数据源规范配置
4.1.2.1 设置时间列
Druid的体系结构需要一个主时间列(内部存储为名为_time
的列)。如果您的数据中没有时间戳,请选择 固定值(Constant Value)
。在我们的示例中,数据加载器将确定原始数据中的时间列是唯一可用作主时间列的候选者。
4.1.2.2 设置转换器
4.1.2.3 设置过滤器
4.1.2.4 配置schema
4.1.2.5 配置Partition
4.1.3 提交任务
4.1.3.1 发布数据
这就是您构建的规范,为了查看更改将如何更新规范是可以随意返回之前的步骤中进行更改,同样,您也可以直接编辑规范,并在前面的步骤中看到它。
4.1.3.2 提交任务
您可以进入任务视图,重点关注新创建的任务。任务视图设置为自动刷新,请等待任务成功。
4.1.3.3 查看数据源
4.1.3.4 查询数据
4.2 kafka加载流式数据
4.2.1 安装Kafka
4.2.1.1 编辑资源清单
vi docker-compose.yml
version: 2
services:
zookeeper:
image: zookeeper
container_name: zookeeper
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka ## 镜像
volumes:
- /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直)
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.64.190 ## 修改:宿主机IP
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 ## 卡夫卡运行是基于zookeeper的
KAFKA_ADVERTISED_PORT: 9092
KAFKA_LOG_RETENTION_HOURS: 120
KAFKA_MESSAGE_MAX_BYTES: 10000000
KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
KAFKA_NUM_PARTITIONS: 3
KAFKA_DELETE_RETENTION_MS: 1000
4.2.2.2 启动容器
docker-compose up -d
docker-compose ps
4.2.3 验证kafka
4.2.3.1 登录容器
#进入容器
docker exec -it kafka_kafka_1 bash
#进入 /opt/kafka_2.13-2.7.0/bin/ 目录下
cd /opt/kafka_2.13-2.7.0/bin/
4.2.3.2 发送消息
#运行kafka生产者发送消息
./kafka-console-producer.sh --broker-list 192.168.64.173:9092 --topic test
"datas":["channel":"","metric":"temperature","producer":"ijinus","sn":"IJA0101-00002245","time":"1543207156000","value":"80"],"ver":"1.0"
4.2.3.3 消费消息
./kafka-console-consumer.sh --bootstrap-server 192.168.64.173:9092 --topic test --from-beginning
4.2.4 发送数据到kafka
4.2.4.1 编写代码
@Component
public class KafkaSender
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送消息到kafka
*
* @param topic 主题
* @param message 内容体
*/
public void sendMsg(String topic, String message)
kafkaTemplate.send(topic, message);
@RestController
@RequestMapping("/taxi")
public class KafkaController
@Autowired
private KafkaSender kafkaSender;
@RequestMapping("/batchTask/num")
public String batchAdd(@PathVariable("num") int num)
for (int i = 0; i < num; i++)
Message message = Utils.getRandomMessage();
kafkaSender.sendMsg("message", JSON.toJSONString(message));
return "OK";
4.2.4.2 发送消息
4.2.5 数据选择
4.2.51 kafka数据查看
4.2.5.2 选择数据源
116.62.213.90:10903,116.62.213.90:10904
4.2.5.3 加载数据
4.2.6 数据源规范配置
4.2.6.1 设置时间列
因为我们的时间列有两个创建时间以及打车时间,我们配置时间列为trvelDate
4.2.6.2 设置转换器
我们使用case_simple
来实现判断功能,更多判断功能参考
case_simple(status,0,测试数据,1,发起打车,2,排队中,3,司机接单,4,完成打车,状态错误)
nvl(age,25)
case_simple(nvl(sex,0),0,男,1,女,男)
4.2.6.3 设置过滤器
"type" : "bound",
"dimension" : "status",
"ordering": "numeric",
"lower": "1",
4.2.6.4 配置schema
4.2.6.5 配置Partition
4.2.6.6 配置拉取方式
在 Tune
步骤中,将 Use earliest offset
设置为 True
非常重要,因为我们需要从流的开始位置消费数据。 其他没有任何需要更改的地方,进入到 Publish
步
4.5.7 提交任务
4.2.7.1 发布数据
这就是您构建的规范,为了查看更改将如何更新规范是可以随意返回之前的步骤中进行更改,同样,您也可以直接编辑规范,并在前面的步骤中看到它。
4.2.7.2 提交任务
您可以进入任务视图,重点关注新创建的任务。任务视图设置为自动刷新,请等待任务成功。
4.2.7.3 查看数据源
4.2.7.4 查询数据
4.2.7.5 动态添加数据
4.2.8 清理数据
4.2.8.1 关闭集群
# 进入impl安装目录
cd /usr/local/imply/imply-2021.05-1
# 关闭集群
./bin/service --down
4.2.8.2 等待关闭服务
ps -ef|grep druid
4.2.8.3 清理数据
ll
rm -rf var
4.2.8.4 重新启动集群
nohup bin/supervise -c conf/supervise/quickstart.conf > logs/quickstart.log 2>&1 &
4.2.8.5 查看数据源
以上是关于Apache Druid 数据摄取---本地数据和kafka流式数据的主要内容,如果未能解决你的问题,请参考以下文章
Druid Kafka 摄取(imply-2.2.3):kafka 错误 NoReplicaOnlineException