我们如何将 Spark 结构化流连接到 Redis?
Posted
技术标签:
【中文标题】我们如何将 Spark 结构化流连接到 Redis?【英文标题】:How can we connect spark structured stream to redis? 【发布时间】:2020-05-16 14:21:23 【问题描述】:我的目标是从 redis 中提取流数据并进行处理。如何通过 Spark 结构化流连接和处理数据?
【问题讨论】:
github.com/RedisLabs/spark-redis 【参考方案1】:在 Spark 中从 Redis Streams 中读取数据,我们需要确定如何连接到 Redis,以及 Redis Streams 中数据的 schema 结构。
要连接到 Redis,我们必须创建一个带有 Redis 连接参数的新 SparkSession:
import com.redislabs.provider.redis._
import redis.clients.jedis.Jedis
object Samj45
def main(args: Array[String]): Unit =
val spark = SparkSession
.builder()
.appName("redis-example")
.master("local[*]")
.config("spark.redis.host", "localhost")
.config("spark.redis.port", "6379")
.getOrCreate()
val data_from_redis = spark
.readStream
.format("redis")
.option("stream.keys","data_clicks")
.schema(StructType(Array(
StructField("asset", StringType),
StructField("cost", LongType)
)))
.load()
对于写作,您可以使用 ForeachWriter。让我知道这是否有帮助。
【讨论】:
以上是关于我们如何将 Spark 结构化流连接到 Redis?的主要内容,如果未能解决你的问题,请参考以下文章
我正在使用 php 通过 PHP 和客户端凭据流连接到 spotify 的 API
Reactive Stream: 如何将两个数据流接到一起,然后进行操作