如何使用 Azure 存储目录作为流数据源执行 PySpark Stream

Posted

技术标签:

【中文标题】如何使用 Azure 存储目录作为流数据源执行 PySpark Stream【英文标题】:How can I execute PySpark Stream with Azure Storage directory as a streaming data source 【发布时间】:2022-01-13 06:31:09 【问题描述】:

我想使用 Azure Blob 存储作为我的流的源来执行 Spark Stream 作业。我怎样才能使用 Python 来做到这一点

【问题讨论】:

请编辑您的问题以展示您的工作:您尝试过的内容、遇到的问题、错误、输出问题等。目前这太宽泛了,没有任何细节。 请提供足够的代码,以便其他人更好地理解或重现问题。 【参考方案1】:

我们可以使用 Azure 容器和 blob 服务在 Azure 批处理上运行 Spark 作业。 Azure 批处理用于运行作业,因为它们成本低。

为此,我们需要一些必要的设置,例如存储帐户、容器注册表和 Azure 批处理来运行作业。

以下是运行简单 Spark 作业的示例 Python 代码:

import argparse  
  
from pyspark.sql import SparkSession  
  
import config  
  
  
def get_azure_spark_connection(storage_account_name, storage_account_key):  
    spark = (  
        SparkSession.builder  
            .config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:2.7.3')  
            .config('spark.hadoop.fs.azure', "org.apache.hadoop.fs.azure.NativeAzureFileSystem")  
            .config("spark.hadoop.fs.azure.account.key." + storage_account_name + ".blob.core.windows.net",  
                    storage_account_key)  
            .appName("AzureSparkDemo")  
            .getOrCreate())  
  
    (spark.sparkContext._jsc.hadoopConfiguration().set("fs.wasbs.impl",  
                                                       "org.apache.hadoop.fs.azure.NativeAzureFileSystem"))  
    return spark  
  
  
if __name__ == '__main__':  
    parser = argparse.ArgumentParser()  
    parser.add_argument("-i", "--input", help="input file to parse", type=str)  
    parser.add_argument("-o", "--output", help="result file to write", type=str)  
    args = parser.parse_args()  
    spark = get_azure_spark_connection(config.STORAGE_ACCOUNT_NAME, config.STORAGE_ACCOUNT_KEY)  
    df = (spark.read.option("header", "true")  
          .option("delimiter", ",")  
          .option("inferSchema", "true")  
          .csv(args.input))  
    df.registerTempTable("airlines")  
    result = spark.sql("""  
      select Year, Month, DayofMonth, _avg_(ArrDelay) as avg_ArrDelay, _avg_(DepDelay) as avg_DepDelay  
      from airlines   
      group by Year, Month, DayofMonth  
""")  
    result.repartition(1).write.mode("overwrite").parquet(args.output)

以下是使用的要求:

azure  
azure-storage  
azure-storage-blob  
pyspark==2.4.0

您可以参考这些blogs 以了解有关使用 python 运行带有 Azure 存储的作业的更多信息。

【讨论】:

以上是关于如何使用 Azure 存储目录作为流数据源执行 PySpark Stream的主要内容,如果未能解决你的问题,请参考以下文章

如何将二进制数据作为图像写入 azure blob 存储 c#

Azure Blob 存储和流分析

如何使用 Azure.Storage.Blobs 上传流

需要使用 Azure 流分析和 IoT Hub 将数据存储到 Azure Data Lake Store:数据必须按 4MB 缓冲区存储

如何使用 Azure 流分析强制一个空的输出文件

如何直接在 Azure Blob 存储上存储火花作业(结构化流)的检查点?