pandas_udf结果无法写入表

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pandas_udf结果无法写入表相关的知识,希望对你有一定的参考价值。

Pandas_udf结果无法写入表是一个比较常见的问题,原因可能有很多。首先,您可以检查您的表是否存在,如果不存在,您可以尝试创建表。其次,您可以检查您的pandas_udf函数是否正确,如果不正确,您可以尝试重新定义函数。最后,您可以检查您的表结构是否与pandas_udf结果匹配,如果不匹配,您可以尝试修改表结构。总之,您可以通过以上几个步骤来解决pandas_udf结果无法写入表的问题。 参考技术A 如果pandas使用UDF(User Defined Function)进行处理,且结果无法写入表,可能是UDF程序中存在错误,或者没有正确使用UDF结果。建议您检查UDF程序,核实结果数据类型是否正确,以及是否足够符合表的字段要求。如果pandas使用UDF(User Defined Function)进行处理,且结果无法写入表,可能是UDF程序中存在错误,或者没有正确使用UDF结果。建议您检查UDF程序,核实结果数据类型是否正确,以及是否足够符合表的字段要求。 参考技术B 如果不能将pandas_udf的结果写入表中,可能是因为pandas_udf的输出格式和表中字段类型不匹配,或者表中字段类型不支持pandas_udf结果的数据类型。因此,建议重新检查表中字段类型,并对pandas_udf进行必要的调整,以保证两者数据类型的一致性。 参考技术C 如果pandasudf的结果无法写入表,可能是由于表中字段的类型、长度不匹配,或者被写入的数据过大造成的。可以检查一下表的定义,并且确保传入pandasudf的参数类型有效,尝试修改写入的数据大小,以免出现问题。 参考技术D 可能是由于pandasUDF函数中操作的表格数据类型不支持写入表。建议你可以尝试修改输出的数据类型,以便可以正确写入表中。

将结果写入 bigquery 中的永久表

【中文标题】将结果写入 bigquery 中的永久表【英文标题】:Write results to permanent table in bigquery 【发布时间】:2017-03-27 07:47:15 【问题描述】:

我在 Bigquery SQL 中使用命名参数,并希望将结果写入永久表。我有两个函数 1 用于使用命名查询参数和 1 用于将查询结果写入表。如何将两者结合起来将查询结果写入表;具有命名参数的查询。

    这是使用参数化查询的函数:

    def sync_query_named_params(column_name,min_word_count,value):
    query = """with lsq_results as
    (select "%s" = @min_word_count)
    replace (%s  AS %s)
    from lsq.lsq_results
    """ % (min_word_count,value,column_name)
    
    client = bigquery.Client()
    
    query_results = client.run_sync_query(query
    ,
    query_parameters=(
        bigquery.ScalarQueryParameter('column_name', 'STRING', column_name),
        bigquery.ScalarQueryParameter(
            'min_word_count',
            'STRING',
            min_word_count),
        bigquery.ScalarQueryParameter('value','INT64',value)
        ))
    query_results.use_legacy_sql = False
    query_results.run()
    

    写入永久表的函数

    class BigQueryClient(object):
    
       def __init__(self, bq_service, project_id, swallow_results=True):
            self.bigquery = bq_service
            self.project_id = project_id
            self.swallow_results = swallow_results
            self.cache = 
       def write_to_table(
         self,
         query,
         dataset=None,
         table=None,
         external_udf_uris=None,
         allow_large_results=None,
         use_query_cache=None,
         priority=None,
         create_disposition=None,
         write_disposition=None,
         use_legacy_sql=None,
         maximum_billing_tier=None,
         flatten=None):
    
     configuration = 
        "query": query,
    
    
    if dataset and table:
        configuration['destinationTable'] = 
            "projectId": self.project_id,
            "tableId": table,
            "datasetId": dataset
        
    
    if allow_large_results is not None:
        configuration['allowLargeResults'] = allow_large_results
    
    if flatten is not None:
        configuration['flattenResults'] = flatten
    
    if maximum_billing_tier is not None:
        configuration['maximumBillingTier'] = maximum_billing_tier
    
    if use_query_cache is not None:
        configuration['useQueryCache'] = use_query_cache
    
    if use_legacy_sql is not None:
        configuration['useLegacySql'] = use_legacy_sql
    
    if priority:
        configuration['priority'] = priority
    
    if create_disposition:
        configuration['createDisposition'] = create_disposition
    
    if write_disposition:
        configuration['writeDisposition'] = write_disposition
    
    if external_udf_uris:
        configuration['userDefinedFunctionResources'] = \
            [ 'resourceUri': u for u in external_udf_uris ]
    
    body = 
        "configuration": 
            'query': configuration
        
    
    
    logger.info("Creating write to table job %s" % body)
    job_resource = self._insert_job(body)
    self._raise_insert_exception_if_error(job_resource)
    return job_resource
    

如何结合这两个函数来编写参数化查询并将结果写入永久表?或者如果有另一种更简单的方法。请提出建议。

【问题讨论】:

我的意思是问,如何在 write_to_table 函数中传递参数化查询?如,如果我需要传递参数化查询,我应该在 write_to_table() 中传递什么来代替查询? 【参考方案1】:

您似乎在使用两个不同的客户端库。

您的第一个代码示例使用了 BigQuery 客户端库的 Beta 版,但我暂时建议不要使用它,因为它需要大量修改才能被认为普遍可用。 (如果您确实使用它,我建议使用run_async_query() 使用所有可用参数创建作业,然后调用results() 以获取QueryResults 对象。)

您的第二个代码示例是直接创建作业资源,这是一个较低级别的接口。使用此方法时,您可以直接在查询配置中指定 configuration.query.queryParameters 字段。这是我现在推荐的方法。

【讨论】:

以上是关于pandas_udf结果无法写入表的主要内容,如果未能解决你的问题,请参考以下文章

pandas_udf 与 pyspark 3.0 的 scipiy.find_peaks 结果不一致

将结果写入 bigquery 中的永久表

写入表的 228 行结果查询作业在允许大结果为 True 时给出 0 行

带有 iloc 的 Pandas_UDF 连接循环

如何将多个表的结果写入配置单元中的单个表?

Spark SQL - 无法将所有记录写入配置单元表