1.30.Flink SQL案例将Kafka数据写入hive

Posted to.to

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.30.Flink SQL案例将Kafka数据写入hive相关的知识,希望对你有一定的参考价值。

1.30.Flink SQL案例将Kafka数据写入hive
1.30.1.1.场景,环境,配置准备
1.30.1.2.案例代码
1.30.1.2.1.编写pom.xml文件
1.30.1.2.2.Maven工程resources下编写配置文件log4j2.properties
1.30.1.2.3.Maven工程resources下编写配置文件logback.xml
1.30.1.2.4.Maven工程resources下编写配置文件project-config-test.properties
1.30.1.2.5.编写com.xxxxx.log.utils.PropertiesUtils
1.30.1.2.6.编写com.xxxxx.log.flink.handler.ExceptionLogHandlerBySql
1.30.1.2.7.执行命令

1.30.Flink SQL案例将Kafka数据写入hive

1.30.1.1.场景,环境,配置准备

场景:通过Flink SQL的方式,将Kafka的数据实时写入到hive中。
(1)环境

hadoop 3.1.1.3.1.4-315
hive 3.1.0.3.1.4-315
flink 1.12.1

前置准备:
将以下几个包添加到$FLINK_HOME/lib,其中hive-exec-3.1.0.3.1.4.0-315.jar和libfb303-0.9.3.jar从/usr/hdp/current/hive-client/lib中拷贝

1.30.1.2.案例代码

1.30.1.2.1.编写pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.xxxxx.zczl</groupId>
    <artifactId>flink-log-handler</artifactId>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--maven properties -->
        <maven.test.skip>true</maven.test.skip>
        <maven.javadoc.skip>true</maven.javadoc.skip>
        <!-- compiler settings properties -->
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <slf4j.version>1.7.25</slf4j.version>
        <fastjson.version>1.2.73</fastjson.version>
        <joda-time.version>2.9.4</joda-time.version>
        <flink.version>1.12.1</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <hive.version>3.1.2</hive.version>
        <hadoop.version>3.1.4</hadoop.version>
       <!-- <hive.version>3.1.0.3.1.4.0-315</hive.version>
        <hadoop.version>3.1.1.3.1.4.0-315</hadoop.version>-->
        <!--<hadoop.version>3.3.0</hadoop.version>-->
        <mysql.connector.java>8.0.22</mysql.connector.java>

        <fileName>flink-log-handler</fileName>
        <!--<mainClass>com.xxxxx.issue.flink.handler.IssueHandleFlinkHandlerByCustomRedisSink</mainClass>-->
    </properties>

    <version>1.0-SNAPSHOT</version>

    <!--<distributionManagement>
       <repository>
           <id>releases</id>
           <layout>default</layout>
           <url>http://nexus.xxxxx.cn/nexus/content/repositories/releases/</url>
       </repository>

       <snapshotRepository>
           <id>snapshots</id>
           <name>snapshots</name>
           <url>http://nexus.xxxxx.cn/nexus/content/repositories/snapshots/</url>
       </snapshotRepository>
   </distributionManagement>-->

    <repositories>

       <!-- <repository>
            <id>releases</id>
            <layout>default</layout>
            <url>http://nexus.xxxxx.cn/nexus/content/repositories/releases/</url>
        </repository>

        <repository>
            <id>snapshots</id>
            <name>snapshots</name>
            <url>http://nexus.xxxxx.cn/nexus/content/repositories/snapshots/</url>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
                <checksumPolicy>warn</checksumPolicy>
            </snapshots>
        </repository>

        <repository>
            <id>xxxxx</id>
            <name>xxxxx</name>
            <url>http://nexus.xxxxx.cn/nexus/content/repositories/xxxxx/</url>
        </repository>

        <repository>
            <id>public</id>
            <name>public</name>
            <url>http://nexus.xxxxx.cn/nexus/content/groups/public/</url>
        </repository>-->
        <!-- 新加 -->
        <repository>
            <id>cloudera</id>
            <layout>default</layout>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

   <!-- <repositories>
        &lt;!&ndash; Cloudera &ndash;&gt;
        <repository>
            <id>cloudera-releases</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>

        &lt;!&ndash; Hortonworks &ndash;&gt;
        <repository>
            <id>HDPReleases</id>
            <name>HDP Releases</name>
            <url>https://repo.hortonworks.com/content/repositories/releases/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>

        <repository>
            <id>HortonworksJettyHadoop</id>
            <name>HDP Jetty</name>
            <url>https://repo.hortonworks.com/content/repositories/jetty-hadoop</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>

        &lt;!&ndash; MapR &ndash;&gt;
        <repository>
            <id>mapr-releases</id>
            <url>https://repository.mapr.com/maven/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>

    </repositories>-->

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>$flink.version</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <!--<scope>compile</scope>-->
            <scope>provided</scope>
        </dependency>
        <!-- flink以yarn模式启动,执行flink->sql->hive会用到flink-shaded-hadoop-2-uber包 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-9.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sequence-file</artifactId>
            <version>$flink.version</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <!--<scope>compile</scope>-->
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_$scala.binary.version</artifactId>
            <version>$flink.version</version>
           <!-- <scope>compile</scope>-->
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>$flink.version</version>
            <!--<scope>compile</scope>-->
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>$mysql.connector.java</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <!--<scope>compile</scope>-->
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <!--<scope>compile</scope>-->
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>$flink.version</version>
            <!--<scope>compile</scope>-->
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>$hadoop.version</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>$hadoop.version</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>$hive.version</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-metastore</artifactId>
            <version>$hive.version</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>$hive.version</version>
            <!--<scope>compile</scope>-->
           <scope>provided</scope>
        </dependency>
       <!-- <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.1</version>
            <scope>provided</scope>
        </dependency>-->

        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>$joda-time.version</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>$fastjson.version</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>$slf4j.version</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>$slf4j.version</version>
            <scope>test</scope>
        </dependency>
    </dependencies>


    <build>
        <finalName>$fileName</finalName>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>$maven.compiler.source</source>
                    <target>$maven.compiler.target</target>
                    <encoding>$project.build.sourceEncoding</encoding>
                    <compilerVersion>$maven.compiler.source</compilerVersion>
                    <showDeprecation>true</showDeprecation>
                    <showWarnings>true</showWarnings>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.12.4</version>
                <configuration>
                    <skipTests>$maven.test.skip</skipTests>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.rat</groupId>
                <artifactId>apache-rat-plugin</artifactId>
                <version>0.12</version>
                <configuration>
                    <excludes>
                        <exclude>README.md</exclude>
                    </excludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-javadoc-plugin</artifactId>
                <version>2.10.4</version>
                <configuration>
                    <aggregate>true</aggregate>
                    <reportOutputDirectory>javadocs</reportOutputDirectory>
                    <locale>en</locale>
                </configuration>
            </plugin>
            <!-- scala编译插件 -->
            <plugin>以上是关于1.30.Flink SQL案例将Kafka数据写入hive的主要内容,如果未能解决你的问题,请参考以下文章

flink sql clinet 实战:upsert kafka connector -- flink-1.12

flink sql client 连接kafka解析avro数据 (avro ArrayIndexOutOfBoundsException 解决办法)

kafka kafka 2.3 版本 生产者和消费者事务 案例

大数据-12-Spark+Kafka构建实时分析Dashboard

SQL优化实战案例:给你的数据库提提速

如何使用 pyspark 将经过火花转换的数据写回 kafka 代理?