实时数仓Flink生产环境部署+提交作业步骤

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实时数仓Flink生产环境部署+提交作业步骤相关的知识,希望对你有一定的参考价值。

文章目录

1、基础环境

  • 开发环境:Windows
    WIN10+JDK1.8+IDEA2022.3+Maven3.8.1
  • 生产环境:Linux
    CentOS7.5+JDK1.8+Hadoop3.1.3+Kafka3.0.0
    Flink集群生产环境常用部署模式为 YARN模式

2、开发环境

创建Maven项目

2.1、pom.xml

本依赖支持:Flink、FlinkSQL、Flink读写Kafka、JSON解析

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.13.6</flink.version>
    <scala.version>2.12</scala.version>
    <hadoop.version>3.1.3</hadoop.version>
    <slf4j.version>2.0.5</slf4j.version>
    <log4j.version>2.19.0</log4j.version>
    <fastjson.version>1.2.83</fastjson.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
    <!-- Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_$scala.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_$scala.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_$scala.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_$scala.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <!-- Flink_Kafka -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_$scala.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <!-- FlinkSQL -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_$scala.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <!-- 'format'='json' -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>$flink.version</version>
    </dependency>
    <!-- Checkpoint保存到HDFS -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>$hadoop.version</version>
    </dependency>
    <!-- JSON解析 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>$fastjson.version</version>
    </dependency>
    <!--Flink默认使用slf4j记录日志,相当于一个日志的接口,此处使用log4j作为具体的日志实现 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>$slf4j.version</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>$slf4j.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>$log4j.version</version>
    </dependency>
</dependencies>
<!-- 打包插件 -->
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- 不要复制META-INF文件夹中的签名,否则,在使用JAR时可能会导致SecurityExceptions -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers combine.children="append">
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

2.2、log4j.properties

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.3、测试用的代码

代码架构

2.3.1、Flink执行环境工具

package org.example;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkEnv 
    private final static String CHECKPOINT_DIRECTORY = "hdfs://hadoop105:8020/Flink/Checkpoint";
    private final static String HADOOP_USER_NAME = "hjw";

    public static StreamExecutionEnvironment getEnv() 
        //创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度(=Kafka分区数)
        env.setParallelism(3);
        //获取checkpoint撇嘴
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        //开启CheckPoint:每隔5分钟1次,精准一次模式
        env.enableCheckpointing(300 * 1000L, CheckpointingMode.EXACTLY_ONCE);
        //设置CheckPoint超时:10分钟
        checkpointConfig.setCheckpointTimeout(600 * 1000L);
        //设置Checkpoint最大数量(10/5=2)
        checkpointConfig.setMaxConcurrentCheckpoints(2);
        //设置重启策略:重启3次,执行延时5秒
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
        //设置状态后端
        env.setStateBackend(new HashMapStateBackend());
        checkpointConfig.setCheckpointStorage(CHECKPOINT_DIRECTORY);
        System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
        //返回
        return env;
    
    public static StreamTableEnvironment getTableEnv() 
        StreamExecutionEnvironment env = FlinkEnv.getEnv();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        configuration.setString("table.local-time-zone", "Asia/Shanghai");
        return StreamTableEnvironment.create(env);
    

2.3.2、Kafka工具

package org.example;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class KafkaTool 
    private static final String BOOTSTRAP_SERVER = "hadoop105:9092,hadoop106:9092,hadoop107:9092";
    private static final String CONSUMER_GROUP_ID = "Flink01";

    public static FlinkKafkaProducer<String> getFlinkKafkaProducer(String topic) 
        return new FlinkKafkaProducer<>(BOOTSTRAP_SERVER, topic, new SimpleStringSchema());
    

    public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topic) 
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVER);
        properties.setProperty("group.id", CONSUMER_GROUP_ID);
        properties.setProperty("auto.offset.reset", "latest");
        return new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
    

    public static DataStreamSource<String> getKafkaSource(StreamExecutionEnvironment env, String topic) 
        return env.addSource(getFlinkKafkaConsumer(topic));
    

    public static String getInputTable(String topic) 
        return " WITH ("
                + "'connector'='kafka',"
                + "'topic'='" + topic + "',"
                + "'properties.bootstrap.servers'='" + BOOTSTRAP_SERVER + "',"
                + "'properties.group.id'='" + CONSUMER_GROUP_ID + "',"
                + "'scan.startup.mode'='latest-offset',"
                + "'format'='json'"
                + ")";
    

    public static String getOutputTable(String topic) 
        return " WITH ("
                + "'connector'='kafka',"
                + "'topic'='" + topic + "',"
                + "'properties.bootstrap.servers'='" + BOOTSTRAP_SERVER + "',"
                + "'format'='json'"
                + ")";
    

2.3.3、测试Flink读写Kafka

package org.example.test;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.FlinkEnv;
import org.example.KafkaTool;

public class FlinkTest 
    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = FlinkEnv.getEnv();
        DataStreamSource<String> kafkaSource = KafkaTool.getKafkaSource(env, "topic01");
        SingleOutputStreamOperator<String> s = kafkaSource.map(JSONObject::parseObject).map(Object::toString);
        s.addSink(KafkaTool.getFlinkKafkaProducer("topic02"));
        env.execute();
    

实时数仓Flink生产环境部署+提交作业步骤

Flink美团 Flink 大作业部署与状态稳定性优化实践

美团 Flink 大作业部署与状态稳定性优化实践

Flink 实时数仓伪分布虚拟机 (所有组件部署完成)

哪里可以找到flink集群部署的详细步骤

Flink 助力美团数仓增量生产