从 Redis 读取数据到 Flink

Posted

技术标签:

【中文标题】从 Redis 读取数据到 Flink【英文标题】:Read data from Redis to Flink 【发布时间】:2017-05-26 03:35:08 【问题描述】:

我一直在尝试寻找一个连接器来将数据从 Redis 读取到 Flink。 Flink 的文档包含对连接器写入 Redis 的描述。我需要在我的 Flink 作业中从 Redis 读取数据。在Using Apache Flink for data streaming 中,F*** 提到可以从 Redis 读取数据。可以用于什么目的的连接器?

【问题讨论】:

【参考方案1】:

目前Flink Redis Connector不可用,但可以通过扩展RichSinkFunction/SinkFunction类来实现。

public class RedisSink extends RichSinkFunction<String> 

  @Override
  public void open(Configuration parameters) throws Exception 
      //open redis connection
  

  @Override
  public void invoke(String map) throws Exception 
     //sink data to redis
  

  @Override
  public void close() throws Exception 
     super.close();
  


【讨论】:

【参考方案2】:

我们正在生产中运行一个大致像这样的产品

class RedisSource extends RichSourceFunction[SomeDataType] 

  var client: RedisClient = _

  override def open(parameters: Configuration) = 
    client = RedisClient() // init connection etc
  

  @volatile var isRunning = true

  override def cancel(): Unit = 
    isRunning = false
    client.close()
  

  override def run(ctx: SourceContext[SomeDataType]): Unit = while (isRunning) 
      for 
        data <- ??? // get some data from the redis client
       yield ctx.collect(SomeDataType(data))

  
 

我认为这真的取决于你需要从 redis 中获取什么。以上可用于从列表/队列中获取消息,转换/推送,然后将其从队列中删除。 Redis 还支持 Pub/Sub,因此可以订阅、抓取 SourceConext 并将消息推送到下游。

【讨论】:

我怎样才能调用这个类...请帮助我【参考方案3】:

关于为 Apache Flink 提供流式 redis 源连接器的讨论已经很多(请参阅 FLINK-3033),但没有可用的连接器。不过,实现它应该不难。

【讨论】:

【参考方案4】:

让您的 Flink 程序使用 Jedis 与 Redis 对话的挑战之一是将适当的库放入您提交给 Flink 的 JAR 文件中。如果没有这个,你会得到调用堆栈,表明某些类是未定义的。这是我创建的 Maven pom.xml 的 sn-p,用于将 Redis 及其依赖组件 apache commons-pool2 移动到我的 JAR 中。

    <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-dependency-plugin</artifactId>
            <version>2.9</version>
            <executions>
                <execution>
                    <id>unpack</id>
                    <!-- executed just before the package phase -->
                    <!-- https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html -->
                    <phase>prepare-package</phase>
                    <goals>
                        <goal>unpack</goal>
                    </goals>
                    <configuration>
                        <artifactItems>
                            <artifactItem>
                                <groupId>org.apache.commons</groupId>
                                <artifactId>commons-pool2</artifactId>
                                <version>2.4.2</version>
                                <type>jar</type>
                                <overWrite>false</overWrite>
                                <outputDirectory>$project.build.directory/classes</outputDirectory>
                                <includes>org/apache/commons/**</includes>
                            </artifactItem>
                            <artifactItem>
                                <groupId>redis.clients</groupId>
                                <artifactId>jedis</artifactId>
                                <version>2.9.0</version>
                                <type>jar</type>
                                <overWrite>false</overWrite>
                                <outputDirectory>$project.build.directory/classes</outputDirectory>
                                <includes>redis/clients/**</includes>
                            </artifactItem>

                        </artifactItems>
                    </configuration>
                </execution>
            </executions>
        </plugin>

    </plugins>
</build>

【讨论】:

以上是关于从 Redis 读取数据到 Flink的主要内容,如果未能解决你的问题,请参考以下文章

Docker 安装 filebeat 读取日志 输出到redis或者es

flink 读取mysql并使用flink sql

flink处理数据从kafka到另外一个kafka

flink1.12 sql向redis实时写数据

如何读取redis中的key值中的结果

如何读取redis中的key值中的结果