如何将 pyspark-dataframe 写入红移?

Posted

技术标签:

【中文标题】如何将 pyspark-dataframe 写入红移?【英文标题】:How to write a pyspark-dataframe to redshift? 【发布时间】:2019-09-22 16:29:58 【问题描述】:

我正在尝试将 pyspark DataFrame 写入 Redshift,但它会导致错误:-

java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: 提供者 org.apache.spark.sql.avro.AvroFileFormat 无法实例化

引起:java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.FileFormat.$init$(Lorg/apache/spark/sql/execution/datasources/FileFormat;)V

Spark 版本:2.4.1

Spark-submit 命令:spark-submit --master local[*] --jars ~/Downloads/spark-avro_2.12-2.4.0.jar,~/Downloads/aws-java-sdk-1.7.4 .jar,~/Downloads/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar,~/Downloads/hadoop-aws-2.7.3.jar,~/Downloads/hadoop-common-2.7.3.jar --packages com .databricks:spark-redshift_2.11:2.0.1,com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3,org.apache.hadoop:hadoop-common :2.7.3,org.apache.spark:spark-avro_2.12:2.4.0 script.py

from pyspark.sql import DataFrameReader
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

import sys
import os

pe_dl_dbname            = os.environ.get("REDSHIFT_DL_DBNAME")
pe_dl_host              = os.environ.get("REDSHIFT_DL_HOST")
pe_dl_port              = os.environ.get("REDSHIFT_DL_PORT")
pe_dl_user              = os.environ.get("REDSHIFT_DL_USER")
pe_dl_password          = os.environ.get("REDSHIFT_DL_PASSWORD")

s3_bucket_path = "s3-bucket-name/sub-folder/sub-sub-folder"
tempdir = "s3a://".format(s3_bucket_path)

driver = "com.databricks.spark.redshift"
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

sc._jsc.hadoopConfiguration().set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")

datalake_jdbc_url = 'jdbc:redshift://:/?user=&password='.format(pe_dl_host, pe_dl_port, pe_dl_dbname, pe_dl_user, pe_dl_password)

"""
The table is created in Redshift as follows:
create table adhoc_analytics.testing (name varchar(255), age integer);
"""
l = [('Alice', 1)]
df = spark.createDataFrame(l, ['name', 'age'])
df.show()
df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", datalake_jdbc_url) \
  .option("dbtable", "adhoc_analytics.testing") \
  .option("tempdir", tempdir) \
  .option("tempformat", "CSV") \
  .save()

【问题讨论】:

如果您有一个相当大的文件,最好将其拆分为与您的 Redshift 集群大小相对应的较小部分,以便您可以利用 Redshift 的并行处理 【参考方案1】:

Databricks Spark-Redshift 不适用于 Spark 2.4.1 版, 这是我为使其与 Spark 2.4.1 一起使用而维护的版本 https://github.com/goibibo/spark-redshift

使用方法:

pyspark --packages "com.github.goibibo:spark-redshift:v4.1.0" --repositories "https://jitpack.io"

【讨论】:

以上是关于如何将 pyspark-dataframe 写入红移?的主要内容,如果未能解决你的问题,请参考以下文章

卸载红移:追加

如何将时间插入红移

如何将纪元转换为日期时间红移?

aws 胶水中的 catalog_connection 参数是啥?

MATLAB如何将k线图设置为经典红绿配色?

MATLAB如何将k线图设置为经典红绿配色?