使用 load_table_from_dataframe 时出错

Posted

技术标签:

【中文标题】使用 load_table_from_dataframe 时出错【英文标题】:Error while using load_table_from_dataframe 【发布时间】:2020-08-23 17:21:19 【问题描述】:

我正在尝试将数据从 mysql 加载到 BigQuery。我正在使用熊猫、jaydebeapi 和 load_table_from_dataframe。 使用相同时,出现以下错误:

>>> job = client.load_table_from_dataframe(chunk, table_id)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/client.py", line 1993, in load_table_from_dataframe
    parquet_compression=parquet_compression,
  File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 486, in dataframe_to_parquet
    arrow_table = dataframe_to_arrow(dataframe, bq_schema)
  File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 450, in dataframe_to_arrow
    bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field)
  File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 224, in bq_to_arrow_array
    return pyarrow.Array.from_pandas(series, type=arrow_type)
  File "pyarrow/array.pxi", line 755, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
TypeError: an integer is required
>>> 

几点:

    我的源表存在并且具有以下架构:

    EMPID INTEGER, 
    EMPNAME VARCHAR, 
    STREETADRESS VARCHAR, 
    REGION VARCHAR, 
    STATE VARCHAR, 
    COUNTRY VARCHAR, 
    joining_date date, 
    last_update_date TIMESTAMP(6) -- to hold till millisecond
    

    我的目标表也存在于 BigQuery 中,下面是架构:

    create table if not exists `Project.dataset.table_name`
    (EMPID INT64,
    EMPNAME STRING,
    STREETADRESS STRING,
    REGION STRING,
    STATE STRING,
    COUNTRY STRING,
    joining_date DATE,
    last_update_date TIMESTAMP
    );
    

    下面是我正在使用的代码:

    import datetime
    from google.cloud import bigquery
    import pandas as pd
    import jaydebeapi
    import os
    
    client = bigquery.Client()
    
    table_id = "<project_id>.<dataset>.<target_table>"
    database_host='<IP Address>'
    database_user='<user id>'
    database_password='<password>'
    database_port='<port>'
    database_db='<database_name>'
    
    jclassname = "com.mysql.jdbc.Driver"
          url = "jdbc:mysql://host:port/database".format(host=database_host, port=database_port, database=database_db)
          driver_args = [database_user, database_password]
          jars = ["/<Home_Dir>/script/jars/mysql-connector-java-5.1.45.jar"]
          libs = None
    
    cnx = jaydebeapi.connect(jclassname, url, driver_args, jars=jars, libs=libs)
    query='select EMPID,EMPNAME,STREETADRESS,REGION,STATE,COUNTRY,joining_date,last_update_date from <table_name>'
    
    cursor = cnx.cursor()
    for chunk in pd.read_sql(query, cnx, coerce_float=True, params=None, parse_dates=None, columns=None,chunksize=500000):chunk.apply(lambda x: x.replace(u'\r', u' ').replace(u'\n', u' ') if isinstance(x, str) or isinstance(x, unicode) else x)
    
    job = client.load_table_from_dataframe(chunk, table_id)
    job.result()
    

请帮助我解决问题。我也尝试使用 LoadJobConfig,但同样的错误即将到来。

【问题讨论】:

修复for chunk in pd.read_sql(...)块中的缩进和语句。 如果您打印生成的 pandas 块的架构会很有帮助。该错误似乎表明某种模式不匹配。另外您使用的是什么版本的 Arrow 和 BQ 客户端? 嗨,我正在从 python 提示符运行它。提示输出如下: >>> EMPID EMPNAME STREETADRESS REGION STATE COUNTRY join_date last_update_date 0 168050 Arpan Roy A 3/3 Anandanagar;Dakshin Behala Road Behala West Bengal India 2005-11-17 2020-08-23 08:57:30 = =========PyArrow ========== numpy>=1.14 enum34>=1.1.6 六>=1.0.0 如何查看BQ客户端版本? 大家好,我发现了这个问题。对于 DATE 和 TIMESTAMP 列,我们需要在 pd.read_sql 中使用 parse_dates=['joining_date','last_update_date']: targettabledttscols=['joining_date','last_update_date'] for chunk in pd.read_sql(query, cnx, coerce_float=True , params=None, parse_dates=targettabledttscols, columns=None,chunksize=500000):chunk.apply(lambda x: x.replace(u'\r', u' ').replace(u'\n', u' ') if isinstance(x, str) or isinstance(x, unicode) else x) 现在我需要弄清楚为什么 pd.read_sql 的“块”中没有捕获毫秒。任何人都遇到过类似的问题并得到任何解决方案?请帮忙 【参考方案1】:

这与您复制的代码相同吗?我的意思是缩进语句呢?

for chunk in pd.read_sql(query, cnx, coerce_float=True, params=None, parse_dates=None, columns=None,chunksize=500000):
    chunk.apply(lambda x: x.replace(u'\r', u' ').replace(u'\n', u' ') if isinstance(x, str) or isinstance(x, unicode) else x)
    job = client.load_table_from_dataframe(chunk, table_id)
    job.result()

修正以上行。

【讨论】:

嗨,我从 Python Prompt 运行它,因此缩进就是这样。我从数据框中得到了正确的输出。你提到的缩进,是有实际的python代码。 >>> EMPID EMPNAME STRETADRESS REGION STATE COUNTRY join_date last_update_date 0 168050 Arpan Roy A 3/3 Anandanagar;Dakshin Behala Road Behala West Bengal India 2005-11-17 2020-08-23 08:57:30

以上是关于使用 load_table_from_dataframe 时出错的主要内容,如果未能解决你的问题,请参考以下文章

在使用加载数据流步骤的猪中,使用(使用 PigStorage)和不使用它有啥区别?

今目标使用教程 今目标任务使用篇

Qt静态编译时使用OpenSSL有三种方式(不使用,动态使用,静态使用,默认是动态使用)

MySQL db 在按日期排序时使用“使用位置;使用临时;使用文件排序”

使用“使用严格”作为“使用强”的备份

Kettle java脚本组件的使用说明(简单使用升级使用)