PYSPARK org.apache.spark.sql.AnalysisException:无法解析给定输入列的“INPUT__FILE__NAME”

Posted

技术标签:

【中文标题】PYSPARK org.apache.spark.sql.AnalysisException:无法解析给定输入列的“INPUT__FILE__NAME”【英文标题】:PYSPARK org.apache.spark.sql.AnalysisException: cannot resolve '`INPUT__FILE__NAME`' given input columns 【发布时间】:2020-07-12 16:14:07 【问题描述】:

我使用的配置文件如下:

"trial":
            "stage_table": "trial_stg",
            "folder_location": "Trial",
            "column_mapping": [
            
              "source_column": "(split(INPUT__FILE__NAME, '\\/')[11])",
              "source_datatype": "text",
              "target_column": "indication",
              "target_datatype": "text",
              "transform_type": "expression",
              "validate": false
            

我正在尝试使用 pyspark 中的 INPUT__FILE__NAME 函数获取文件名,但我遇到了问题。 以下是阅读此配置文件后的代码:

def query_expression_builder(mapping):
    print("Inside query_expression_builder")
    print("mapping :",mapping)

    def match_transform_type(map_col):
        print("Inside match_transform_type")

        if map_col.get('transform_type') is None:
            print("transform_type is",map_col.get('transform_type'))
            print("map_col  inside if :",map_col)
                        
            return f"`map_col['source_column']` AS map_col['target_column']"

        elif str(map_col.get('transform_type')).__eq__('expression'):
            print("transform_type is",map_col.get('transform_type'))
            print("map_col  inside elif :",map_col)

            return f"map_col['source_column'] AS map_col['target_column']"

        else:
            print("transform_type is",map_col.get('transform_type'))
            print("map_col  inside else :",map_col)

            return f"`map_col['source_column']` AS map_col['target_column']"

    if mapping is None:
        print("Check for mapping is None")


         return []
        else:
            print("Mapping is not None")            
return list(map(lambda col_mapping: match_transform_type(map_col=col_mapping), mapping))



def main():

query = query_expression_builder\
                            (mapping=config['file_table_mapping'][tbl]['column_mapping'])
                        print(f"Table = tbl Executing query query")
                            file_path = f"s3://config['raw_bucket']/config['landing_directory']/config['file_table_mapping'][tbl]['folder_location']/config_audit['watermark_timestamp']*.csv"
                            write_df = spark.read.csv(path=file_path, header=True,\
                                                      inferSchema=False).selectExpr(query) \
                                .withColumn("prcs_run_id", func.lit(config_audit['prcs_run_id']))\
                                .withColumn("job_run_id",\
                                func.lit(config_audit['job_run_id']))\
                                .withColumn("ins_ts", func.lit(ins_ts))\
                                .withColumn("rec_crt_user", func.lit(config["username"]))
                            write_df.show()

以下是我得到的错误:

"cannot resolve '`INPUT__FILE__NAME`' given input columns: [Pediatric Patients included (Y/N), Trial registry number, Number of patients, Sponsor, Number of treatment arms, Multicenter study, Trial Conclusion, Clinical Phase, Study Population, Country Codes, Exclusion criteria, Trial ID, Trial AcronymDerived, Comments, Countries, Trial registry name, Sample size calculation details, Randomisation, Blinding, Trial Comments, Trial start year, Trial end year, Inclusion criteria, Study treatment, Trial design, Controlled trial, Trial Acronym, Trial Control, Asymptomatic patients, Analysis method details]; line 1 pos 7;\n'Project ['split('INPUT__FILE__NAME, /)[11] AS indication#4346, Trial ID#4286 AS trial_id#4347, Trial Acronym#4287 AS trial_acronym#4348, Trial AcronymDerived#4288 AS trial_acronym_derived#4349, Sponsor#4289 AS sponsor#4350, Asymptomatic patients#4290 AS asymptomatic_patients#4351, Pediatric Patients included (Y/N)#4291 AS pediatric_patients_included#4352, Number of patients#4292 AS num_of_patients#4353, Number of treatment arms#4293 AS num_of_treatment_arms#4354, Trial start year#4294 AS trial_strt_yr#4355, Trial end year#4295 AS trial_end_yr#4356, Clinical Phase#4296 AS clinical_phase#4357, Study Population#4297 AS study_population#4358, Study treatment#4298 AS study_treatment#4359, Randomisation#4299 AS randomization#4360, Controlled trial#4300 AS controlled_trial#4361, Trial Control#4301 AS trial_control#4362, Blinding#4302 AS blinding#4363, Trial registry name#4303 AS trial_registry_name#4364, Trial registry number#4304 AS trial_registry_num#4365, Countries#4305 AS countries#4366, Country Codes#4306 AS country_codes#4367, Trial design#4307 AS trial_design#4368, Multicenter study#4308 AS multicenter_study#4369, ... 7 more fields]\n+- Relation[Trial ID#4286,Trial Acronym#4287,Trial AcronymDerived#4288,Sponsor#4289,Asymptomatic patients#4290,Pediatric Patients included (Y/N)#4291,Number of patients#4292,Number of treatment arms#4293,Trial start year#4294,Trial end year#4295,Clinical Phase#4296,Study Population#4297,Study treatment#4298,Randomisation#4299,Controlled trial#4300,Trial Control#4301,Blinding#4302,Trial registry name#4303,Trial registry number#4304,Countries#4305,Country Codes#4306,Trial design#4307,Multicenter study#4308,Inclusion criteria#4309,... 6 more fields] csv\n"
Traceback (most recent call last):
  File "/mnt/yarn/usercache/root/appcache/application_1594568207850_0001/container_1594568207850_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/mnt/yarn/usercache/root/appcache/application_1594568207850_0001/container_1594568207850_0001_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o529.selectExpr.
: org.apache.spark.sql.AnalysisException: cannot resolve '`INPUT__FILE__NAME`' given input columns:

如何使用 INPUT__FILE__NAME 函数?我已经在我的代码中启用了配置单元支持。或者有没有其他方法可以做到这一点?我在网上找不到有关如何使用此功能的任何信息。

【问题讨论】:

【参考方案1】:

尝试在input_file_name() 中使用single underscore(_) 而不是double underscore

Example:

from pyspark.sql.functions import *
sql("select *,input_file_name() from tmp")
#or
df.withColumn("filename",input_file_name()).show()

【讨论】:

以上是关于PYSPARK org.apache.spark.sql.AnalysisException:无法解析给定输入列的“INPUT__FILE__NAME”的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 中的 org.apache.spark.ml.feature.Tokenizer NPE

pyspark 读取 bigquery 时出错:java.lang.ClassNotFoundException:org.apache.spark.internal.Logging$class

kmeans pyspark org.apache.spark.SparkException:作业因阶段失败而中止

将 pyspark 数据帧转换为 pandas 会抛出 org.apache.spark.SparkException: Unseen label: null [重复]

PYSPARK org.apache.spark.sql.AnalysisException:无法解析给定输入列的“INPUT__FILE__NAME”

org.apache.spark.sql.AnalysisException:给定pyspark中的输入列,无法解析'sub_tot`'