如何使用 Azure 存储目录作为流数据源执行 PySpark Stream
Posted
技术标签:
【中文标题】如何使用 Azure 存储目录作为流数据源执行 PySpark Stream【英文标题】:How can I execute PySpark Stream with Azure Storage directory as a streaming data source 【发布时间】:2022-01-13 06:31:09 【问题描述】:我想使用 Azure Blob 存储作为我的流的源来执行 Spark Stream 作业。我怎样才能使用 Python 来做到这一点
【问题讨论】:
请编辑您的问题以展示您的工作:您尝试过的内容、遇到的问题、错误、输出问题等。目前这太宽泛了,没有任何细节。 请提供足够的代码,以便其他人更好地理解或重现问题。 【参考方案1】:我们可以使用 Azure 容器和 blob 服务在 Azure 批处理上运行 Spark 作业。 Azure 批处理用于运行作业,因为它们成本低。
为此,我们需要一些必要的设置,例如存储帐户、容器注册表和 Azure 批处理来运行作业。
以下是运行简单 Spark 作业的示例 Python 代码:
import argparse
from pyspark.sql import SparkSession
import config
def get_azure_spark_connection(storage_account_name, storage_account_key):
spark = (
SparkSession.builder
.config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:2.7.3')
.config('spark.hadoop.fs.azure', "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
.config("spark.hadoop.fs.azure.account.key." + storage_account_name + ".blob.core.windows.net",
storage_account_key)
.appName("AzureSparkDemo")
.getOrCreate())
(spark.sparkContext._jsc.hadoopConfiguration().set("fs.wasbs.impl",
"org.apache.hadoop.fs.azure.NativeAzureFileSystem"))
return spark
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", help="input file to parse", type=str)
parser.add_argument("-o", "--output", help="result file to write", type=str)
args = parser.parse_args()
spark = get_azure_spark_connection(config.STORAGE_ACCOUNT_NAME, config.STORAGE_ACCOUNT_KEY)
df = (spark.read.option("header", "true")
.option("delimiter", ",")
.option("inferSchema", "true")
.csv(args.input))
df.registerTempTable("airlines")
result = spark.sql("""
select Year, Month, DayofMonth, _avg_(ArrDelay) as avg_ArrDelay, _avg_(DepDelay) as avg_DepDelay
from airlines
group by Year, Month, DayofMonth
""")
result.repartition(1).write.mode("overwrite").parquet(args.output)
以下是使用的要求:
azure
azure-storage
azure-storage-blob
pyspark==2.4.0
您可以参考这些blogs 以了解有关使用 python 运行带有 Azure 存储的作业的更多信息。
【讨论】:
以上是关于如何使用 Azure 存储目录作为流数据源执行 PySpark Stream的主要内容,如果未能解决你的问题,请参考以下文章
如何将二进制数据作为图像写入 azure blob 存储 c#
需要使用 Azure 流分析和 IoT Hub 将数据存储到 Azure Data Lake Store:数据必须按 4MB 缓冲区存储