如何在融合的 kafka docker 安装中安装自定义 SMT?

Posted

技术标签:

【中文标题】如何在融合的 kafka docker 安装中安装自定义 SMT?【英文标题】:How to install a custom SMT in confluent kafka docker installation? 【发布时间】:2021-09-09 09:35:16 【问题描述】:

我正在尝试在 mysql 和 elasticsearch 之间进行事件流传输,我遇到的一个问题是 mysql 中的 JSON 对象在传输到 elasticsearch 时是 JSON 字符串格式而不是对象。

我正在寻找使用 SMT 的解决方案,我找到了这个,

https://github.com/RedHatInsights/expandjsonsmt

我不知道如何在我的 kafka 中安装或加载或连接容器

这是我的 docker-compose 文件,

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    mem_limit: 1024m
    mem_reservation: 256m
    cpus: 3.0


  broker:
    image: confluentinc/cp-server:6.2.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    mem_limit: 2048m
    mem_reservation: 256m
    cpus: 3.0


  schema-registry:
    image: confluentinc/cp-schema-registry:6.2.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    mem_limit: 256m
    mem_reservation: 128m
    cpus: 3.0


  connect:
    image: confluentinc/kafka-connect-datagen:latest
    build:
      context: .
      dockerfile: Dockerfile
      extra_hosts:
        host.docker.internal: host-gateway
    extra_hosts:
      - "host.docker.internal:host-gateway"
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.2.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    mem_limit: 1024m
    mem_reservation: 256m
    cpus: 3.0


  control-center:
    image: confluentinc/cp-enterprise-control-center:6.2.0
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021
    mem_limit: 512m
    mem_reservation: 256m
    cpus: 3.0

      
  rest-proxy:
    image: confluentinc/cp-kafka-rest:6.2.0
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
    mem_limit: 256m
    mem_reservation: 256m
    cpus: 3.0

这是我的融合 kafka 连接,Dockerfile

FROM confluentinc/cp-kafka-connect-base:6.2.0

RUN   confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.5.1 \
   && confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.5.0 \
   && confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.0 \
   && confluent-hub install --no-prompt debezium/debezium-connector-mongodb:1.5.0 \
   && confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.0.6 \
   && confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.0 \
   && confluent-hub install --no-prompt confluentinc/connect-transforms:latest

我尝试在 confluent kafka connect 的 Dockerfile 中添加这一行,就像在那个 github 自述文件中提到的那样

COPY ./kafka-connect-smt-expandjsonsmt-0.0.7-assemble-all.jar $KAFKA_CONNECT_PLUGINS_DIR

它不起作用,因为KAFKA_CONNECT_PLUGINS_DIR 是空的我尝试echo $KAFKA_CONNECT_PLUGINS_DIR 来验证它。

谁能帮助我如何将这个自定义 SMT 安装到我的 docker 安装的 kafka 中?

【问题讨论】:

【参考方案1】:

该连接器也存在于集线器上

confluent-hub install redhatinsights/expandjsonsmt:0.0.7

https://www.confluent.io/hub/redhatinsights/expandjsonsmt

【讨论】:

非常感谢,当我尝试将 jar 文件复制到插件路径时,它不起作用,但从 confluent-hub 安装它是有效的。似乎在进行自定义安装时,我必须进行一些其他配置才能使其正常工作。 enviromment 提供给 compose 的值在容器构建时不可用,因此问题中的 COPY 命令不起作用,不。否则,另一个答案是正确的,因为如果不是胖 JAR,您需要将该 JAR 和任何需要的依赖项复制到插件路径下自己的文件夹中【参考方案2】:

安装 SMT 就像安装其他连接器一样,

将您的自定义 SMT JAR 文件(以及转换所需的任何非 Kafka JAR 文件)复制到 Connect worker 配置中 plugin.path 属性中列出的目录之一下的目录中 –

在你的情况下复制到/usr/share/confluent-hub-components

【讨论】:

我应该将.jar文件单独复制到“CONNECT_PLUGIN_PATH”还是在CONNECT_PLUGIN_PATH中创建一个文件夹然后复制.jar文件? docs.confluent.io/platform/current/connect/transforms/… 将您的自定义 SMT JAR 文件(以及转换所需的任何非 Kafka JAR 文件)复制到 Connect worker 配置中 plugin.path 属性中列出的目录之一下的目录中

以上是关于如何在融合的 kafka docker 安装中安装自定义 SMT?的主要内容,如果未能解决你的问题,请参考以下文章

如何在ubunto中安装docker

如何在CentOS6.5系统中安装Docker

如何在CentOS6.5系统中安装Docker

如何在CentOS6.5系统中安装Docker

如何在CentOS6.5系统中安装Docker

如何在CentOS6.5系统中安装Docker