1.30.Flink SQL案例将Kafka数据写入hive
Posted to.to
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.30.Flink SQL案例将Kafka数据写入hive相关的知识,希望对你有一定的参考价值。
1.30.Flink SQL案例将Kafka数据写入hive
1.30.1.1.场景,环境,配置准备
1.30.1.2.案例代码
1.30.1.2.1.编写pom.xml文件
1.30.1.2.2.Maven工程resources下编写配置文件log4j2.properties
1.30.1.2.3.Maven工程resources下编写配置文件logback.xml
1.30.1.2.4.Maven工程resources下编写配置文件project-config-test.properties
1.30.1.2.5.编写com.xxxxx.log.utils.PropertiesUtils
1.30.1.2.6.编写com.xxxxx.log.flink.handler.ExceptionLogHandlerBySql
1.30.1.2.7.执行命令
1.30.Flink SQL案例将Kafka数据写入hive
1.30.1.1.场景,环境,配置准备
场景:通过Flink SQL的方式,将Kafka的数据实时写入到hive中。
(1)环境
hadoop 3.1.1.3.1.4-315
hive 3.1.0.3.1.4-315
flink 1.12.1
前置准备:
将以下几个包添加到$FLINK_HOME/lib,其中hive-exec-3.1.0.3.1.4.0-315.jar和libfb303-0.9.3.jar从/usr/hdp/current/hive-client/lib中拷贝
1.30.1.2.案例代码
1.30.1.2.1.编写pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xxxxx.zczl</groupId>
<artifactId>flink-log-handler</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!--maven properties -->
<maven.test.skip>true</maven.test.skip>
<maven.javadoc.skip>true</maven.javadoc.skip>
<!-- compiler settings properties -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<slf4j.version>1.7.25</slf4j.version>
<fastjson.version>1.2.73</fastjson.version>
<joda-time.version>2.9.4</joda-time.version>
<flink.version>1.12.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<hive.version>3.1.2</hive.version>
<hadoop.version>3.1.4</hadoop.version>
<!-- <hive.version>3.1.0.3.1.4.0-315</hive.version>
<hadoop.version>3.1.1.3.1.4.0-315</hadoop.version>-->
<!--<hadoop.version>3.3.0</hadoop.version>-->
<mysql.connector.java>8.0.22</mysql.connector.java>
<fileName>flink-log-handler</fileName>
<!--<mainClass>com.xxxxx.issue.flink.handler.IssueHandleFlinkHandlerByCustomRedisSink</mainClass>-->
</properties>
<version>1.0-SNAPSHOT</version>
<!--<distributionManagement>
<repository>
<id>releases</id>
<layout>default</layout>
<url>http://nexus.xxxxx.cn/nexus/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<name>snapshots</name>
<url>http://nexus.xxxxx.cn/nexus/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>-->
<repositories>
<!-- <repository>
<id>releases</id>
<layout>default</layout>
<url>http://nexus.xxxxx.cn/nexus/content/repositories/releases/</url>
</repository>
<repository>
<id>snapshots</id>
<name>snapshots</name>
<url>http://nexus.xxxxx.cn/nexus/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>warn</checksumPolicy>
</snapshots>
</repository>
<repository>
<id>xxxxx</id>
<name>xxxxx</name>
<url>http://nexus.xxxxx.cn/nexus/content/repositories/xxxxx/</url>
</repository>
<repository>
<id>public</id>
<name>public</name>
<url>http://nexus.xxxxx.cn/nexus/content/groups/public/</url>
</repository>-->
<!-- 新加 -->
<repository>
<id>cloudera</id>
<layout>default</layout>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<!-- <repositories>
<!– Cloudera –>
<repository>
<id>cloudera-releases</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<!– Hortonworks –>
<repository>
<id>HDPReleases</id>
<name>HDP Releases</name>
<url>https://repo.hortonworks.com/content/repositories/releases/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
<repository>
<id>HortonworksJettyHadoop</id>
<name>HDP Jetty</name>
<url>https://repo.hortonworks.com/content/repositories/jetty-hadoop</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
<!– MapR –>
<repository>
<id>mapr-releases</id>
<url>https://repository.mapr.com/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>-->
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_$scala.binary.version</artifactId>
<version>$flink.version</version>
<!--<scope>compile</scope>-->
<scope>provided</scope>
</dependency>
<!-- flink以yarn模式启动,执行flink->sql->hive会用到flink-shaded-hadoop-2-uber包 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sequence-file</artifactId>
<version>$flink.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_$scala.binary.version</artifactId>
<version>$flink.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
<version>$flink.version</version>
<!--<scope>compile</scope>-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
<!-- <scope>compile</scope>-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>$flink.version</version>
<!--<scope>compile</scope>-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>$mysql.connector.java</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_$scala.binary.version</artifactId>
<version>$flink.version</version>
<!--<scope>compile</scope>-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_$scala.binary.version</artifactId>
<version>$flink.version</version>
<!--<scope>compile</scope>-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>$flink.version</version>
<!--<scope>compile</scope>-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>$hadoop.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>$hadoop.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>$hive.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>$hive.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>$hive.version</version>
<!--<scope>compile</scope>-->
<scope>provided</scope>
</dependency>
<!-- <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.1</version>
<scope>provided</scope>
</dependency>-->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>$joda-time.version</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>$fastjson.version</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>$slf4j.version</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>$slf4j.version</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>$fileName</finalName>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>$maven.compiler.source</source>
<target>$maven.compiler.target</target>
<encoding>$project.build.sourceEncoding</encoding>
<compilerVersion>$maven.compiler.source</compilerVersion>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<skipTests>$maven.test.skip</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<version>0.12</version>
<configuration>
<excludes>
<exclude>README.md</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.4</version>
<configuration>
<aggregate>true</aggregate>
<reportOutputDirectory>javadocs</reportOutputDirectory>
<locale>en</locale>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>以上是关于1.30.Flink SQL案例将Kafka数据写入hive的主要内容,如果未能解决你的问题,请参考以下文章
flink sql clinet 实战:upsert kafka connector -- flink-1.12
flink sql client 连接kafka解析avro数据 (avro ArrayIndexOutOfBoundsException 解决办法)
kafka kafka 2.3 版本 生产者和消费者事务 案例