Flink sql 实现 -connection-clickhouse的 source和 sink

Posted wudl5566

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink sql 实现 -connection-clickhouse的 source和 sink相关的知识,希望对你有一定的参考价值。

1. 场景

2. 版本

mysqlflinkclickhouse
5.7.20-logflink-1.13.120.11.4.13
5.7.20-logflink-1.13.220.11.4.13
5.7.20-logflink-1.13.520.11.4.13

flink 连接clickhouse 的包

3. 代码的自定义结构图

4. 代码的pom 文件

4.1 pom 文件

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.glab</groupId>
  <artifactId>flink-connector-clickhouse</artifactId>
  <version>13.1</version>

  <name>flink-connector-clickhouse</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.13.1</flink.version>
    <scala.binary.version>2.11</scala.binary.version>
    <clickhouse-jdbc-version>0.3.0</clickhouse-jdbc-version>
  </properties>

  <packaging>jar</packaging>

  <dependencies>
    <dependency>
      <groupId>ru.yandex.clickhouse</groupId>
      <artifactId>clickhouse-jdbc</artifactId>
      <version>$clickhouse-jdbc-version</version>
      <scope>provided</scope>
      <exclusions>
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>30.1.1-jre</version>
    </dependency>


    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.5.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpmime</artifactId>
      <version>4.5.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpcore</artifactId>
      <version>4.4.4</version>
    </dependency>
    <dependency>
      <groupId>commons-logging</groupId>
      <artifactId>commons-logging</artifactId>
      <version>1.2</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-core</artifactId>
      <version>1.2.3</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.2.3</version>
      <scope>provided</scope>
    </dependency>

    <!--kafak connector 测试用-->

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-sql-connector-kafka_$scala.binary.version</artifactId>
      <version>$flink.version</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-csv</artifactId>
      <version>$flink.version</version>
      <scope>test</scope>
    </dependency>

    <!-- Table ecosystem -->
    <!-- Projects depending on this project won't depend on flink-table-*. -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_$scala.binary.version</artifactId>
      <version>$flink.version</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_$scala.binary.version</artifactId>
      <version>$flink.version</version>
      <scope>provided</scope>
      <!--<optional>true</optional>-->
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
      <version>$flink.version</version>
      <scope>provided</scope>
    </dependency>


    <!-- test dependencies -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>$flink.version</version>
      <!--<type>test-jar</type>-->
      <scope>provided</scope>
    </dependency>

    <!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
<!--    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_$scala.binary.version</artifactId>
      <version>$flink.version</version>
    </dependency>-->

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
      <version>$flink.version</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc_$scala.binary.version</artifactId>
      <version>$flink.version</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
          <encoding>UTF-8</encoding>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.2</version>
        <configuration>
          <shadedArtifactAttached>true</shadedArtifactAttached>
          <outputFile>out/flink-connector-clickhouse-$pom.version.jar</outputFile>
          <artifactSet>
            <includes>
              <include>*:*</include>
            </includes>
          </artifactSet>
          <filters>
            <filter>
              <artifact>*:*</artifact>
              <excludes>
                <exclude>META-INF/*.SF</exclude>
                <exclude>META-INF/*.DSA</exclude>
                <exclude>META-INF/*.RSA</exclude>
              </excludes>
            </filter>
          </filters>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>


4.2. ClickHouseDynamicTableFactory.java

package com.glab.flink.connector.clickhouse.table;

import com.glab.flink.connector.clickhouse.table.internal.dialect.ClickHouseDialect;
import com.glab.flink.connector.clickhouse.table.internal.options.ClickHouseOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.*;
import org.apache.flink.table.utils.TableSchemaUtils;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

public class ClickHouseDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory 
    public static final String IDENTIFIER = "clickhouse";

    private static final String DRIVER_NAME = "ru.yandex.clickhouse.ClickHouseDriver";

    public static final ConfigOption<String> URL = ConfigOptions.key("url")
            .stringType()
            .noDefaultValue()
            .withDeprecatedKeys("the ClickHouse url in format `clickhouse://<host>:<port>`.");
    public static final ConfigOption<String> USERNAME = ConfigOptions.key("username")
            .stringType()
            .noDefaultValue()
            .withDescription("the ClickHouse username.");

    public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password")
            .stringType()
            .noDefaultValue()
            .withDescription("the ClickHouse password.");

    public static final ConfigOption<String> DATABASE_NAME = ConfigOptions.key("database-name")
            .stringType()
            .defaultValue("default")
            .withDescription("the ClickHouse database name. Default to `default`.");

    public static final ConfigOption<String> TABLE_NAME = ConfigOptions.key("table-name")
            .stringType()
            .noDefaultValue()
            .withDescription("the ClickHouse table name.");

    public static final ConfigOption<Integer> SINK_BATCH_SIZE = ConfigOptions.key("sink.batch-size")
            .intType()
            .defaultValue(Integer.valueOf(1000))
            .withDescription("the flush max size, over this number of records, will flush data. The default value is 1000.");

    public static final ConfigOption<Duration> SINK_FLUSH_INTERVAL = ConfigOptions.key("sink.flush-interval")
            .durationType()
            .defaultValue(Duration.ofSeconds(1L))
            .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The default value is 1s.");

    public static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries")
            .intType()
            .defaultValue(Integer.valueOf(3))
            .withDescription("the max retry times if writing records to database failed.");

    public static final ConfigOption<Boolean> SINK_WRITE_LOCAL = ConfigOptions.key("sink.write-local")
            .booleanType()
            .defaultValue(Boolean.valueOf(false))
            .withDescription("directly write to local tables in case of Distributed table.");

    public static final ConfigOption<String> SINK_PARTITION_STRATEGY = ConfigOptions.key("sink.partition-strategy")
            .stringType()
            .defaultValue("balanced")
            .withDescription("partition strategy. available: balanced, hash, shuffle.");

    public static final ConfigOption<String> SINK_PARTITION_KEY = ConfigOptions.key("sink.partition-key")
            .stringType()
            .noDefaultValue()
            .withDescription("partition key used for hash strategy.");

    public static final ConfigOption<Boolean> SINK_IGNORE_DELETE = ConfigOptions.key("sink.ignore-delete")
            .booleanType()
            .defaultValue(Boolean.valueOf(true))
            .withDescription("whether to treat update statements as insert statements and ignore deletes. defaults to true.");

    public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows")
            .longType()
            .defaultValue(-1L)
            .withDescription("the max number of rows of lookup cache, over this value, the oldest rows will be eliminated." +
                    "cache.max-rows and cache ttl options must all be specified id any of them is specified. cache is not enabled as default.");

    public static final ConfigOption<Duration> LOOKUP_CACHE_TTL = ConfigOptions.key("lookup.cache.ttl")
            .durationType()
            .defaultValue(Duration.ofSeconds(10))
            .withDescription("the cache time to live");

    public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions.key("lookup.max-retries")
            .intType()
            .defaultValue(3)
            .withDescription("the max retry times if lookup database failed.");

    @Override
    public DynamicTableSource createDynamicTableSource(Context context) 
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        ReadableConfig config = helper.getOptions();
        helper.validate();
        try 
            validateConfigOptions(config);
         catch (Exception e) 
            e.printStackTrace();
        

        //带New的使用1.13API,不带的用12的
        ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
        return new ClickHouseDynamicTableSource(resolvedSchema, getOptions(config), getJdbcLookupOptions(config));

    

    @Override
    public DynamicTableSink createDynamicTableSink(Context context) 
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        ReadableConfig config = helper.getOptions();
        helper.validate();
        try 
            validateConfigOptions(config);
         catch (Exception e) 
            e.printStackTrace();
        

        //带New的使用1.13API,不带的用12的
        ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
        return new ClickHouseDynamicTableSink(resolvedSchema, getOptions(config));
    

    @Override
    public String factoryIdentifier() 
        return IDENTIFIER;
    

    @Override
    public Set<ConfigOption<?>> requiredOptions() 
        Set<ConfigOption<?>> requiredOptions = new HashSet<>();
        requiredOptions.add(URL);
        requiredOptions.add(TABLE_NAME);
        return requiredOptions;
    

    @Override
    public Set<ConfigOption<?>> optionalOptions() 
        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
        optionalOptions.add(USERNAME);
        optionalOptions.add(PASSWORD);
        optionalOptions.add(DATABASE_NAME);
        optionalOptions.add(SINK_BATCH_SIZE);
        optionalOptions.add(SINK_FLUSH_INTERVAL);
        optionalOptions.add(SINK_MAX_RETRIES);
        optionalOptions.add(SINK_WRITE_LOCAL);
        optionalOptions.add(SINK_PARTITION_STRATEGY);
        optionalOptions.add(SINK_PARTITION_KEY);
        optionalOptions.add(SINK_IGNORE_DELETE);
        optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);
        optionalOptions.add(LOOKUP_CACHE_TTL);
        optionalOptions.add(LOOKUP_MAX_RETRIES);
        return optionalOptions;
    

    private void validateConfigOptions(ReadableConfig config) throws Exception
        String partitionStrategy = config.get(SINK_PARTITION_STRATEGY);
        if (!Arrays.asList(new String[]  "hash", "balanced", "shuffle" ).contains(partitionStrategy))
            throw new IllegalArgumentException("Unknown sink.partition-strategy `" + partitionStrategy + "`");
        if (partitionStrategy.equals("hash") && !config.getOptional(SINK_PARTITION_KEY).isPresent())
            throw new IllegalArgumentException("A partition key must be provided for hash partition strategy");
        if ((config.getOptional(USERNAME).isPresent() ^ config.getOptional(PASSWORD).isPresent()))
            throw new IllegalArgumentException("Either all or none of username and password should be provided");
    

    private ClickHouseOptions getOptions(ReadableConfig config) 
        return (new ClickHouseOptions.Builder()).withUrl((String)config.get(URL))
                .withUsername((String)config.get(USERNAME))
                .withPassword((String)config.get(PASSWORD))
                .withDatabaseName((String)config.get(DATABASE_NAME))
                .withTableName((String)config.get(TABLE_NAME))
                .withBatchSize(((Integer)config.get(SINK_BATCH_SIZE)).intValue())
                .withFlushInterval((Duration)config.get(SINK_FLUSH_INTERVAL))
                .withMaxRetries(((Integer)config.get(SINK_MAX_RETRIES)).intValue())
                .withWriteLocal((Boolean)config.get(SINK_WRITE_LOCAL))
                .withPartitionStrategy((String)config.get(SINK_PARTITION_STRATEGY))
                .withPartitionKey((String)config.get(SINK_PARTITION_KEY))
                .withIgnoreDelete(((Boolean)config.get(SINK_IGNORE_DELETE)).booleanValue())
                .setDialect(new ClickHouseDialect())
                .build();
    

/*    private JdbcOptions getJdbcOptions(ReadableConfig config) 
        return JdbcOptions.builder()
                .setDriverName(DRIVER_NAME)
                .setDBUrl(config.get(URL))
                .setTableName(config.get(TABLE_NAME))
                .setDialect(new ClickHouseDialect())
                .build();
    */


    private JdbcLookupOptions getJdbcLookupOptions(ReadableConfig config) 
        return JdbcLookupOptions.builder()
                .setCacheExpireMs(config.get(LOOKUP_CACHE_TTL).toMillis())
                .setMaxRetryTimes(config.get(LOOKUP_MAX_RETRIES))
                .setCacheMaxSize(config.get(LOOKUP_CACHE_MAX_ROWS))
                .build();
    



4.3 ClickHouseDynamicTableSink.java

package com.glab.flink.connector.clickhouse.table;

import com.glab.flink.connector.clickhouse.table.internal.AbstractClickHouseSinkFunction;
import com.glab.flink.co

以上是关于Flink sql 实现 -connection-clickhouse的 source和 sink的主要内容,如果未能解决你的问题,请参考以下文章

Flink学习 Flink Table & SQL 实现wordcount Java版本

Flink实战系列Flink SQL 如何实现 count window 功能?

Flink sql的实现

Flink sql的实现

flink1.12.1扩展flink-sql 支持写入到sqlserver

Flink实战系列Flink SQL 字符串类型的字段如何实现列转行?