AWS EMR Airflow:Postgresql 连接器

Posted

技术标签:

【中文标题】AWS EMR Airflow:Postgresql 连接器【英文标题】:AWS EMR Airflow: Postgresql Connector 【发布时间】:2021-10-13 14:26:57 【问题描述】:

我正在通过 Airflow 启动 AWS EMR 作业,该作业依赖于将数据保存到 PostgreSQL 数据库中。不幸的是,据我所知,在 EMR 中默认情况下连接器不可用,因此出现错误:

Traceback (most recent call last):
  File "my_emr_script.py", line 725, in <module>
    main()
  File "my_emr_script.py", line 718, in main
    .mode("overwrite") \
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1634133413183_0001/container_1634133413183_0001_01_000001/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1493.save.
: java.lang.ClassNotFoundException: org.postgresql.Driver
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:102)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

如何确保 EMR 启动时包含 PostgreSQL 连接器?我通过引导寻找方法来做到这一点,但我没有找到答案;所有官方documents refer only the Presto version。

编辑:

我按照@Emerson 的建议将 .JAR 下载到 S3 文件夹中,并通过配置直接在 Airflow JOB_FLOW_OVERRIDES 中传递:

"Configurations": [
        
            "Classification": "spark-defaults",
            "Properties":
                
                    "spark.jar": "s3:// var.value.s3_folder /scripts/postgresql-42.2.5.jar",
                ,
        
    ],

在气流中:

instance_type: str = 'm5.xlarge'


SPARK_STEPS = [
    
        'Name': 'emr_test',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': 
            'Jar': 'command-runner.jar',
            "Args": [
                'spark-submit',
                '--deploy-mode',
                'cluster',
                '--master',
                'yarn',
                "s3:// var.value.s3_folder /scripts/el_emr.py",
                '--execution_date',
                ' ds '
            ],
        ,
    
]


JOB_FLOW_OVERRIDES = 
    'Name': 'EMR Test',
    "ReleaseLabel": "emr-6.4.0",
    "Applications": ["Name": "Hadoop", "Name": "Spark"],
    'Instances': 
        'InstanceGroups': [
            
                'Name': 'Master node',
                'Market': 'SPOT',
                'InstanceRole': 'MASTER',
                'InstanceType': instance_type,
                'InstanceCount': 1,
            ,
            
                "Name": "Core",
                "Market": "SPOT",
                "InstanceRole": "CORE",
                "InstanceType": instance_type,
                "InstanceCount": 1,
            ,
        ],
        'KeepJobFlowAliveWhenNoSteps': False,
        'TerminationProtected': False,
    ,
    'Steps': SPARK_STEPS,
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',
    'BootstrapActions': [
        
            'Name': 'string',
            'ScriptBootstrapAction': 
                'Path': 's3:// var.value.s3_folder  /scripts/emr_bootstrap.sh',
            
        ,
    ],
    'LogUri': 's3:// var.value.s3_folder /logs',
     "Configurations": [
        
            "Classification": "spark-defaults",
            "Properties":
                
                    "spark.jar": "s3:// var.value.s3_path /scripts/postgresql-42.2.5.jar"
                ,
        
    ]



emr_creator = EmrCreateJobFlowOperator(
        task_id='create_emr',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_conn',
        emr_conn_id='emr_conn',
        region_name='us-west-2',
    )

不幸的是,问题仍然存在。

另外,我尝试修改引导程序以下载.JAR:

cd $HOME && wget https://jdbc.postgresql.org/download/postgresql-42.2.5.jar

并将其传递给配置:

"Configurations": [
        
            "Classification": "spark-defaults",
            "Properties":
                
                    "spark.executor.extraClassPath": "org.postgresql:postgresql:42.2.5",
                    "spark.driver.extraClassPath": "$HOME/postgresql-42.2.5.jar",
                ,
        
    ],

在气流中:

instance_type: str = 'm5.xlarge'
    
    
    SPARK_STEPS = [
        
            'Name': 'emr_test',
            'ActionOnFailure': 'CANCEL_AND_WAIT',
            'HadoopJarStep': 
                'Jar': 'command-runner.jar',
                "Args": [
                    'spark-submit',
                    '--deploy-mode',
                    'cluster',
                    '--master',
                    'yarn',
                    "s3:// var.value.s3_folder /scripts/emr.py",
                    '--execution_date',
                    ' ds '
                ],
            ,
        
    ]
    
    
    JOB_FLOW_OVERRIDES = 
        'Name': 'EMR Test',
        "ReleaseLabel": "emr-6.4.0",
        "Applications": ["Name": "Hadoop", "Name": "Spark"],
        'Instances': 
            'InstanceGroups': [
                
                    'Name': 'Master node',
                    'Market': 'SPOT',
                    'InstanceRole': 'MASTER',
                    'InstanceType': instance_type,
                    'InstanceCount': 1,
                ,
                
                    "Name": "Core",
                    "Market": "SPOT",
                    "InstanceRole": "CORE",
                    "InstanceType": instance_type,
                    "InstanceCount": 1,
                ,
            ],
            'KeepJobFlowAliveWhenNoSteps': False,
            'TerminationProtected': False,
        ,
        'Steps': SPARK_STEPS,
        'JobFlowRole': 'EMR_EC2_DefaultRole',
        'ServiceRole': 'EMR_DefaultRole',
        'BootstrapActions': [
            
                'Name': 'string',
                'ScriptBootstrapAction': 
                    'Path': 's3:// var.value.s3_folder  /scripts/emr_bootstrap.sh',
                
            ,
        ],
        'LogUri': 's3:// var.value.s3_folder /logs',
         "Configurations": [
            
                "Classification": "spark-defaults",
                "Properties":
                    
                        "spark.executor.extraClassPath": "org.postgresql:postgresql:42.2.5",
                        "spark.driver.extraClassPath": "$HOME/postgresql-42.2.5.jar",
                    ,
            
        ]
    
    
    
    emr_creator = EmrCreateJobFlowOperator(
            task_id='create_emr',
            job_flow_overrides=JOB_FLOW_OVERRIDES,
            aws_conn_id='aws_conn',
            emr_conn_id='emr_conn',
            region_name='us-west-2',
        )

这反过来又会导致一个新的错误,导致 Spark 无法读取 JSON 文件,将它们视为损坏的文件。

root
 |-- _corrupt_record: string (nullable = true)

最后,常见的emr_boostrap.sh

#!/bin/bash -xe

sudo pip3 install -U \
    boto3 \
    typing


cd $HOME && wget https://jdbc.postgresql.org/download/postgresql-42.2.5.jar

【问题讨论】:

【参考方案1】:

我不确定 emr 是如何配置的,但下面是您将如何配置。

首先将 postgres jdbc jar 上传到 s3 位置。然后在配置集群时参考它。

如果您通过 Cloudformation 进行配置,那么您需要执行以下操作

  EMR:
    Type: AWS::EMR::Cluster
    Properties:
      Applications:
        - Name: Spark
      Configurations:
        - Classification: spark-defaults
          ConfigurationProperties:
            spark.jars: s3://path_to_jar/postgresql-42.2.11.jar

如果它的 cli 命令,那么它会像下面这样

aws emr create-cluster ...... --configurations config.json

config.json 可能如下所示

[
  
    "Classification": "spark-defaults",
    "Properties": 
      "spark.jars": "s3://path_to_jar/postgresql-42.2.11.jar"
     
  
]

编辑:

看到你编辑的问题后,我可以看到你的 spark 提交参数(SPARKSTEP 变量)。 在该部分中,只需添加两个如下所示的项目

‘—jars’
‘s3://pathtodriver/postgresdriver.jar’

【讨论】:

您好@Emerson,感谢您的回复!不幸的是,我尝试按照您的步骤操作,但问题仍然存在。我编辑了原始答案以反映您的建议和我尝试过的其他事情。 如果我使用气流操作符,那么您可以将带有 jdbc 驱动程序的 s3 路径的 —jars 标志作为参数的一部分传递。如果您描述您如何配置集群或我如何运行作业,这将非常有帮助 很高兴了解您的 spark 提交情况。当你提交火花时,可以通过罐子 嗨@Emerson,你是对的,我很抱歉我应该包含所有内容。我更新了我的回复!至于在 spark-submit 中传递 jars,是通过“--packages”标志吗? 不。是——罐子。在那里添加你的 jdbc 驱动程序将起作用

以上是关于AWS EMR Airflow:Postgresql 连接器的主要内容,如果未能解决你的问题,请参考以下文章

通过AWS EMR降低集群计算成本

我在 AWS 中有一个现有的 EMR 集群。我想从气流运行 dag 到现有的 aws 集群

airflow集成EMR使用

Apache Manged Airflow EMR 操作员 DAG 失败

在 Airflow EMR 操作员步骤中使用 Json 输入变量

从 Airflow(使用气流 Livy 运算符)向 Livy(在 EMR 中)提交 Spark 作业