kafka connect 使用说明
Posted boanxin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka connect 使用说明相关的知识,希望对你有一定的参考价值。
KAFKA CONNECT 使用说明
一、概述
kafka connect 是一个可扩展的、可靠的在kafka和其他系统之间流传输的数据工具。简而言之就是他可以通过Connector(连接器)简单、快速的将大集合数据导入和导出kafka。可以接收整个数据库或收集来自所有的应用程序的消息到kafka的topic中,kafka connect 功能包括:
1,kafka连接器通用框架:kafka connect 规范了kafka和其他数据系统集成,简化了开发、部署和管理。
2,分布式和单机式:扩展到大型支持整个organization的集中管理服务,也可以缩小到开发,测试和小规模生产部署。
3,REST接口:通过rest API 来提交(和管理)Connector到kafka connect 集群。
4,offset自动化管理:从Connector 获取少量信息,connect来管理offset提交。
5,分布式和默认扩展:kafka connect建立在现有的组管理协议上,更多的工作可以添加扩展到connect集群。
6,流/批量集成:利用kafka现有能力,connect是一个桥接流和批量数据系统的理想解决方案。
在这里我们测试connect的kafka版本是:0.9.0.0
二,单机模式
单机模式的命令格式如下:
bin/connect-standalone.sh config/connect-standalone.properties Connector1.properties [Connector2.properties ...]
现在就上述文件我的配
1,connect-standalone.sh 是执行单机模式的命令。
#!/bin/sh # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. base_dir=$(dirname $0) if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties" fi if [ -z "$KAFKA_HEAP_OPTS" ]; then export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9901 " fi if [ -z "$KAFKA_HEAP_OPTS" ]; then export KAFKA_HEAP_OPTS="-Xmx1024M" fi exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.connect.cli.ConnectStandalone "[email protected]"
在这里可以设置给connect的虚拟机内存设置:
if [ -z "$KAFKA_HEAP_OPTS" ]; then export KAFKA_HEAP_OPTS="-Xmx1024M" fi
也可以设置JMS配置:
if [ -z "$KAFKA_HEAP_OPTS" ]; then export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9901 " fi
2,connect-standalone.properties的配置:
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=10.253.129.237:9092,10.253.129.238:9092,10.253.129.239:9092 # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter‘s setting with the converter we want to apply # it to key.converter.schemas.enable=true value.converter.schemas.enable=true # The internal converter used for offsets and config data is configurable and must be specified, but most users will # always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format. internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/datafs/20181106/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000
这里需要注意broker的配置,其余配置在kafka官网都有说明参考:
http://kafka.apache.org/090/documentation.html#connectconfigs
3,connect-file-source.properties的配置
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. name=test_source2 connector.class=FileStreamSource tasks.max=2 file=/datafs/20181106/json2/log.out topic=TEST_MANAGER5
注意路径和topic的配置
4,connect-file-sink.properties 的配置:
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. name=test_sink1 connector.class=FileStreamSink #connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector tasks.max=1 file=/datafs/20181106/a.out topics=TEST_MANAGER5
三,集群模式
命令格式:
bin/connect-distributed.sh config/connect-distributed.properties
{"name":"test","config":{"topic":"TEST_MANAGER","connector.class":"FileStreamSource","tasks.max":"2","file":"/datafs/log1.out"}}
四,REST API
DELETE /Connectors/{name}:删除 Connector, 停止所有的任务并删除其配置
以上是关于kafka connect 使用说明的主要内容,如果未能解决你的问题,请参考以下文章
使用 Kafka HDFS Connect 写入 HDFS 时出错
使用 cp-kafka-connect-base 为 snowflake-kafka-connector 构建一个组合的 docker 映像,以部署在 kafka connect 集群上