kafka connect 应用之:sqlite到hive的etl配置过程
Posted BI实验室
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka connect 应用之:sqlite到hive的etl配置过程相关的知识,希望对你有一定的参考价值。
今天,在kafka connect 上完成了 sqlite到hive的etl配置,具体如下:
默认配置:
cat /mnt/etc/connect-avro-standalone.properties
# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Local storage file for offset data
offset.storage.file.filename=/mnt/connect.offsets
配置:/mnt/etc/sqlite.properties :
cat /mnt/etc/sqlite.properties
##
# Copyright 2015 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=2
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:sqlite:/usr/local/lib/retail.db
mode=incrementing
incrementing.column.name=id
topic.prefix=test_sqlite_jdbc_
配置:/mnt/etc/hdfs_sqlite.properties
cat /mnt/etc/hdfs_sqlite.properties
# Copyright 2015 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=2
topics=test_sqlite_jdbc_locations
hdfs.url=hdfs://localhost:9000
flush.size=2
hive.metastore.uris=thrift://localhost:9083
hive.integration=true
schema.compatibility=BACKWARD
partitioner.class=io.confluent.connect.hdfs.partitioner.FieldPartitioner
partition.field.name=name
加载配置文件:
vagrant@vagrant-ubuntu-trusty-64:~$ connect-standalone /mnt/etc/connect-avro-standalone.properties /mnt/etc/sqlite.properties /mnt/etc/hdfs_sqlite.properties
验证结果:
hive> select * from test_sqlite_jdbc_locations order by 1;
Warning: Using constant number 1 in order by. If you try to use position alias when hive.groupby.orderby.position.alias is false, the position alias will be ignored.
Query ID = vagrant_20180530035036_dc0cbd5d-f472-4a7f-afb2-c137630bee95
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
2018-05-30 03:50:39,448 Stage-1 map = 0%, reduce = 0%
2018-05-30 03:50:41,476 Stage-1 map = 100%, reduce = 100%
Ended Job = job_local1606514397_0001
MapReduce Jobs Launched:
Stage-Stage-1: HDFS Read: 16888 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
4 bobo 400
3 bobby 300
2 bob 200
8 bbbbb 900
7 bbbb 800
6 bbb 700
5 bb 600
1 alice 100
Time taken: 5.303 seconds, Fetched: 8 row(s)
hive>
配置注意事项:
1. topic名可以用 “-” 减号,但是hive表名不能用 “-” 号。所以避免 “-” 号。
2. connection.url 需要填写 retail.db 的绝对路径:
connection.url=jdbc:sqlite:/usr/local/lib/retail.db
3.partition.field.name 需要选取正确的分区字段。
查看connectors:
vagrant@vagrant-ubuntu-trusty-64:~$ curl -s localhost:8083/connectors
["hdfs-sink","test-source-sqlite-jdbc-autoincrement"]
curl -X GET http://localhost:8081/subjects
["test_sqlite_jdbc_locations-value"]
vagrant@vagrant-ubuntu-trusty-64:~$ curl -X GET http://localhost:8081/subjects/test_sqlite_jdbc_locations-value/versions
[1]vagrant@vagrant-ubuntu-trusty-64:~$
curl -X GET http://localhost:8081/subjects/test_sqlite_jdbc_locations-value/versions/1
{"subject":"test_sqlite_jdbc_locations-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"locations\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"]},{\"name\":\"sale\",\"type\":[\"null\",\"int\"]}],\"connect.name\":\"locations\"}"}
参考:
【1】https://gerardnico.com/dit/kafka/connector/sqlite_standalone
【2】https://www.confluent.io/blog/building-real-time-streaming-etl-pipeline-20-minutes/
以上是关于kafka connect 应用之:sqlite到hive的etl配置过程的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?