1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink

Posted to.to

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink相关的知识,希望对你有一定的参考价值。

1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义
1.31.1.工程结构
1.31.2.定义pom.xml文件
1.31.3.log4j2.properties
1.31.4.logback.xml
1.31.5.cache.properties
1.31.6.project-config.properties
1.31.7.IssueAcceptSimpleProducer.java
1.31.8.Consumer.java
1.31.9.DefaultTopicSelector.java
1.31.10.SimpleTopicSelector.java
1.31.11.TopicSelector.java
1.31.12.KeyValueDeserializationSchema.java
1.31.13.KeyValueSerializationSchema.java
1.31.14.SimpleKeyValueDeserializationSchema.java
1.31.15.SimpleKeyValueSerializationSchema.java
1.31.16.RocketMQConfig.java
1.31.17.RocketMQSink.java
1.31.18.RocketMQSource.java
1.31.19.RocketMQUtils.java
1.31.20.RunningChecker.java
1.31.21.DateUtils.java
1.31.22.PropertiesUtils.java
1.31.23.RedisUtil.java
1.31.24.IssueConstants.java
1.31.25.IssueAcceptRedisSink.java
1.31.26.IssueAcceptFlinkHandlerByCustomRedisSink.java
1.32.Flink其它案例
1.32.1.使用DataGen生成数据
1.32.2.使用value state进行存储临时数据

1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义

1.31.1.工程结构

1.31.2.定义pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <!-- xxxxxx实时处理 -->
    <modelVersion>4.0.0</modelVersion>
    <groupId>xxx.xxx.xxxx</groupId>
    <artifactId>indicators-real-time-handler</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--maven properties -->
        <maven.test.skip>true</maven.test.skip>
        <maven.javadoc.skip>true</maven.javadoc.skip>
        <!-- compiler settings properties -->
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <!--<rocketmq.version>4.7.1</rocketmq.version>-->
        <rocketmq.version>4.5.1</rocketmq.version>
        <flink.version>1.11.1</flink.version>
        <flink-connector-redis.version>1.0</flink-connector-redis.version>
        <commons-lang.version>2.5</commons-lang.version>
        <scala.binary.version>2.12</scala.binary.version>
        <junit.version>4.12</junit.version>
        <redis.version>3.3.0</redis.version>
        <slf4j.version>1.7.25</slf4j.version>
        <fastjson.version>1.2.73</fastjson.version>
        <joda-time.version>2.9.4</joda-time.version>
        <!--<hadoop.version>2.8.3</hadoop.version>-->
        <!-- 用于连接中间件团队的redis用 -->
        <tmc-version>0.6.2</tmc-version>


        <fileName>issue-handler</fileName>
        <mainClass>com.xxx.issue.flink.handler.IssueHandleFlinkHandlerByCustomRedisSink</mainClass>
    </properties>

    <dependencies>
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.4</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <!--<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>$hadoop.version</version>
        </dependency>-->

        <!--<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>$hadoop.version</version>
            <exclusions>
                <exclusion>
                    <artifactId>jackson-databind</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
            </exclusions>
        </dependency>-->

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <!--<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>$hadoop.version</version>
            <exclusions>
                <exclusion>
                    <artifactId>jackson-databind</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
            </exclusions>
        </dependency>-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>$flink.version</version>
        </dependency>

        <!--
        1.compile   : 默认的scope,运行期有效,需要打入包中。
        2.provided  : 编译器有效,运行期不需要提供,不会打入包中。
        3.runtime   : 编译不需要,在运行期有效,需要导入包中。(接口与实现分离)
        4.test      : 测试需要,不会打入包中
        5.system    : 非本地仓库引入、存在系统的某个路径下的jar。(一般不使用)
        -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
		
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>$slf4j.version</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>$slf4j.version</version>
            <scope>test</scope>
        </dependency>

        <!--<dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>$redis.version</version>
        </dependency>-->

        <!--<dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>$flink-connector-redis.version</version>
        </dependency>-->
		
		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-namesrv</artifactId>
            <version>$rocketmq.version</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-broker</artifactId>
            <version>$rocketmq.version</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>$rocketmq.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>$rocketmq.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>$rocketmq.version</version>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty-tcnative</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>

        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>$commons-lang.version</version>
        </dependency>

        <!--test -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
            <version>$junit.version</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>$fastjson.version</version>
        </dependency>

        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>$joda-time.version</version>
        </dependency>

        <!-- 使用scala编程的时候使用下面的依赖 start-->
        <!--<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>-->
        <!-- 使用scala编程的时候使用下面的依赖 end-->

        <!-- kafka connector scala 2.12 -->
        <!--
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>
        -->

        <!--
        <dependency>
            <groupId>org.powermock</groupId>
            <artifactId>powermock-module-junit4</artifactId>
            <version>1.5.5</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.powermock</groupId>
            <artifactId>powermock-api-mockito</artifactId>
            <version>1.5.5</version>
            <scope>test</scope>
        </dependency>
		-->

    </dependencies>

    <distributionManagement>
        <repository>
            <id>releases</id>
            <layout>default</layout>
            <url>http://xxx.xxx.xxx/nexus/content/repositories/releases/</url>
        </repository>

        <snapshotRepository>
            <id>snapshots</id>
            <name>snapshots</name>
            <url>http://xxx.xxx.xxx/nexus/content/repositories/snapshots/</url>
        </snapshotRepository>
    </distributionManagement>

    <repositories>
        <repository>
            <id>releases</id>
            <layout>default</layout>
            <url>http://xxx.xxx.xxx/nexus/content/repositories/releases/</url>
        </repository>

        <repository>
            <id>snapshots</id>
            <name>snapshots</name>
            <url>http://xxx.xxx.xxx/nexus/content/repositories/snapshots/</url>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
                <checksumPolicy>warn</checksumPolicy>
            </snapshots>
        </repository>

        <repository>
            <id>xxxxx</id>
            <name>xxxxxx</name>
            <url>http://xxx.xxx.xxx/nexus/content/repositories/xxxx/</url>
        </repository>

        <repository>
            <id>public</id>
            <name>public</name>
            <url>http://xxx.xxx.xxx/nexus/content/groups/public/</url>
        </repository>

        <!-- 新加 -->
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <build>
        <finalName>$fileName</finalName>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>$maven.compiler.source</source>
                    <target>$maven.compiler.target</target>
                    <encoding>$project.build.sourceEncoding</encoding>
                    <compilerVersion>$maven.compiler.source</compilerVersion>
                    <showDeprecation>true</showDeprecation>
                    <showWarnings>true</showWarnings>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.12.4</version>
                <configuration>
                    <skipTests>$maven.test.skip</skipTests>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.rat</groupId>
                <artifactId>apache-rat-plugin</artifactId>
                <version>0.12</version>
                <configuration>
                    <excludes>
                        <exclude>README.md</exclude>
                    </excludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-javadoc-plugin</artifactId>
                <version>2.10.4</version>
                <configuration>
                    <aggregate>true</aggregate>
                    <reportOutputDirectory>javadocs</reportOutputDirectory>
                    <locale>en</locale>
                </configuration>
            </以上是关于1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ 自定义消息与延迟消息

Spring之自定义事件

自定义Log4j配置文件和RocketMQ-Client.jar下log文件冲突问题解决

自定义 Segue NavBar 后退按钮

如何在 laravel 库中添加自定义方法

RocketMq Topic创建和删除