kafka connect 应用之:sqlite到hive的etl配置过程

Posted BI实验室

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka connect 应用之:sqlite到hive的etl配置过程相关的知识,希望对你有一定的参考价值。

今天,在kafka connect 上完成了 sqlite到hive的etl配置,具体如下:


默认配置:

cat /mnt/etc/connect-avro-standalone.properties 

# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and

# integrates the the Schema Registry. This sample configuration assumes a local installation of

# Confluent Platform with all services running on their default ports.


# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.

bootstrap.servers=localhost:9092


# The converters specify the format of data in Kafka and how to translate it into Connect data.

# Every Connect user will need to configure these based on the format they want their data in

# when loaded from or stored into Kafka

key.converter=io.confluent.connect.avro.AvroConverter

key.converter.schema.registry.url=http://localhost:8081

value.converter=io.confluent.connect.avro.AvroConverter

value.converter.schema.registry.url=http://localhost:8081


# The internal converter used for offsets and config data is configurable and must be specified,

# but most users will always want to use the built-in default. Offset and config data is never

# visible outside of Connect in this format.

internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false


# Local storage file for offset data

offset.storage.file.filename=/mnt/connect.offsets


配置:/mnt/etc/sqlite.properties :

 cat  /mnt/etc/sqlite.properties 

##

# Copyright 2015 Confluent Inc.

#

# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

##


# A simple example that copies all tables from a SQLite database. The first few settings are

# required for all connectors: a name, the connector class to run, and the maximum number of

# tasks to create:

name=test-source-sqlite-jdbc-autoincrement

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

tasks.max=2

# The remaining configs are specific to the JDBC source connector. In this example, we connect to a

# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to

# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.

# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.

connection.url=jdbc:sqlite:/usr/local/lib/retail.db

mode=incrementing

incrementing.column.name=id

topic.prefix=test_sqlite_jdbc_


配置:/mnt/etc/hdfs_sqlite.properties

 cat /mnt/etc/hdfs_sqlite.properties

# Copyright 2015 Confluent Inc.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except

# in compliance with the License. You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software distributed under the License

# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express

# or implied. See the License for the specific language governing permissions and limitations under

# the License.


name=hdfs-sink

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector

tasks.max=2

topics=test_sqlite_jdbc_locations

hdfs.url=hdfs://localhost:9000

flush.size=2

hive.metastore.uris=thrift://localhost:9083

hive.integration=true

schema.compatibility=BACKWARD

partitioner.class=io.confluent.connect.hdfs.partitioner.FieldPartitioner

partition.field.name=name


加载配置文件:

vagrant@vagrant-ubuntu-trusty-64:~$ connect-standalone /mnt/etc/connect-avro-standalone.properties /mnt/etc/sqlite.properties  /mnt/etc/hdfs_sqlite.properties 



验证结果:

hive> select * from test_sqlite_jdbc_locations order by 1;

Warning: Using constant number 1 in order by. If you try to use position alias when hive.groupby.orderby.position.alias is false, the position alias will be ignored.

Query ID = vagrant_20180530035036_dc0cbd5d-f472-4a7f-afb2-c137630bee95

Total jobs = 1

Launching Job 1 out of 1

Number of reduce tasks determined at compile time: 1

In order to change the average load for a reducer (in bytes):

  set hive.exec.reducers.bytes.per.reducer=<number>

In order to limit the maximum number of reducers:

  set hive.exec.reducers.max=<number>

In order to set a constant number of reducers:

  set mapreduce.job.reduces=<number>

Job running in-process (local Hadoop)

2018-05-30 03:50:39,448 Stage-1 map = 0%,  reduce = 0%

2018-05-30 03:50:41,476 Stage-1 map = 100%,  reduce = 100%

Ended Job = job_local1606514397_0001

MapReduce Jobs Launched: 

Stage-Stage-1:  HDFS Read: 16888 HDFS Write: 0 SUCCESS

Total MapReduce CPU Time Spent: 0 msec

OK

4 bobo 400

3 bobby 300

2 bob 200

8 bbbbb 900

7 bbbb 800

6 bbb 700

5 bb 600

1 alice 100

Time taken: 5.303 seconds, Fetched: 8 row(s)

hive> 



配置注意事项:

1. topic名可以用 “-” 减号,但是hive表名不能用 “-” 号。所以避免 “-” 号。

2. connection.url 需要填写 retail.db 的绝对路径:

            connection.url=jdbc:sqlite:/usr/local/lib/retail.db

3.partition.field.name 需要选取正确的分区字段。


查看connectors:


vagrant@vagrant-ubuntu-trusty-64:~$ curl -s localhost:8083/connectors

["hdfs-sink","test-source-sqlite-jdbc-autoincrement"]



curl -X GET http://localhost:8081/subjects

["test_sqlite_jdbc_locations-value"]



vagrant@vagrant-ubuntu-trusty-64:~$ curl -X GET http://localhost:8081/subjects/test_sqlite_jdbc_locations-value/versions

[1]vagrant@vagrant-ubuntu-trusty-64:~$ 



 curl -X GET http://localhost:8081/subjects/test_sqlite_jdbc_locations-value/versions/1

{"subject":"test_sqlite_jdbc_locations-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"locations\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"]},{\"name\":\"sale\",\"type\":[\"null\",\"int\"]}],\"connect.name\":\"locations\"}"}



参考:


【1】https://gerardnico.com/dit/kafka/connector/sqlite_standalone


【2】https://www.confluent.io/blog/building-real-time-streaming-etl-pipeline-20-minutes/


以上是关于kafka connect 应用之:sqlite到hive的etl配置过程的主要内容,如果未能解决你的问题,请参考以下文章

Python之Sqlite3数据库基本操作

Kafka Connect - 不适用于更新操作

一文读懂Kafka Connect核心概念

kafka connect 使用说明

Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?

Kafka-Connect实践