flink学习33:flinkSQL连接mysql,查询插入数据

Posted hzp666

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink学习33:flinkSQL连接mysql,查询插入数据相关的知识,希望对你有一定的参考价值。

总览

1.生成运行时env

2.生成表环境

3.接上数据流,数据流数据生成表

4.把数据库中sink保存数据的表,在flink中生成一遍(相当于把flink生成的表,绑定到数据库中的表),配上数据库连接信息,并执行,及注册

5.查询表,可以根据注册表名查询

6.插入表,可以根据生成的flink表进行数据插入

完整案例:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.DataTypes, Table
import org.apache.flink.table.descriptors._
object SqlReadmysql 
  def main(args: Array[String]): Unit = 
    // creat env
    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //parallelism
    bsEnv.setParallelism(1)

    //set env
    val bsSetting = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //create table env
    val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSetting)

    //create ds
    val dataStream = bsEnv.fromElements(Tuple2("01","lisi" ))

    val table1 = bsTableEnv.fromDataStream(dataStream)

    //create table
    val sinkDDL =
      """
        |create table student2_flink (
        |code varchar(20) null,
        |name varchar(20) null
        |)with(
        |'connector.type'='jdbc',
        |'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',
        |'connector.table'='student2',
        |'connector.driver'='com.mysql.jdbc.Driver',
        |'connector.username'='root',
        |'connector.password'='root'
        |)
        |""".stripMargin
    println(sinkDDL)
    // execute the create table sql
    bsTableEnv.executeSql(sinkDDL)

    //register table
    val myStudent = bsTableEnv.from("student2_flink")

    //execute query
    val result = bsTableEnv.sqlQuery(s"select * from $myStudent")

    result.toRetractStream[(String, String)].print()

    //insert data
    table1.executeInsert("student2_flink")

    //execute
    bsEnv.execute()


  

POM文件:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.sinopharm.gksk</groupId>
    <artifactId>gksk-bigdata</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>gksk-bigdata</name>
    <!-- FIXME change it to the project's website -->
    <url>http://www.example.com</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <log4j.version>1.2.17</log4j.version>
        <slf4j.version>1.7.25</slf4j.version>
        <slf4j.api.version>1.7.25</slf4j.api.version>
    </properties>
<!--Flink项目核心依赖-->
    <dependencies>
        <!--Flink Java 项目核心依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.14.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>
        <!--Flink scala项目核心依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>
        <!--Flink Table API 核心依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.12.0</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.14.4</version>
        </dependency>

<!--        csv-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.14.4</version>
        </dependency>
<!--以下用到什么引用什么-->
        <!--Flink Kafka依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>
        <!--Flink rocksdb状态后依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.16</version>
        </dependency>
<!--本地测试核心依赖-->

        <!--Flink 本地测试客户端依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>
        <!--Flink 本地测试wei ui依赖 http://127.0.0.1:8081/ -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>1.14.4</version>
            <scope>runtime</scope>
        </dependency>
        <!--junit测试-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <!--日志输出-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>$slf4j.version</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>$log4j.version</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
            </resource>
            <resource>
                <directory>src/main/scala</directory>
            </resource>
        </resources>

        <plugins>
            <!--这里没引打包插件 需要的自己引用-->
            <!--Java compiler-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!--Java Compiler-->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Could not instantiate the executor. Make sure a planner module is on the classpath

原因:pom文件中缺少 planner

解决办法:添加

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.14.4</version>
</dependency>

ps:注意有时候 配置两个planner也会报错

flinksql 连接mysql报错 JDBC-Class not found. - com.mysql.jdbc.Driver

原因:缺少mysql的jar包

解决:pom文件添加:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.16</version>
</dependency>

open() failed.The server time zone value '�й���׼ʱ��' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.

原因:URL没有指定时区,jdbc 6.0以上都有这个问题

解决:在URL后边加时区

'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',


 

  • useUnicode=true  表示使用Unicode字符,因此可以使用中文
  • characterEncoding=utf8  设置编码方式
  • useSSL=true   设置安全连接
  • serverTimezone=UTC    设置全球标准时间
     

open() failed.Cannot load connection class because of underlying exception: com.mysql.cj.exceptions.WrongArgumentException: Malformed database URL, failed to parse the main URL sections.

原因:连接的URL写错了

解决:好好看看,字符 、格式

flink 读取mysql并使用flink sql

参考技术A 1.mysql连接

2.flink sql

3.dependency

以上是关于flink学习33:flinkSQL连接mysql,查询插入数据的主要内容,如果未能解决你的问题,请参考以下文章

flink 读取mysql并使用flink sql

flink sql client 连接kafka解析avro数据 (avro ArrayIndexOutOfBoundsException 解决办法)

Flink Sql 自定义实现 kudu connector

95-910-330-源码-FlinkSQL-Calcite-Flink结合Calcite

Flink SQL CEP 学习内容

Demo:基于 Flink SQL 构建流式应用