使用 load_table_from_dataframe 时出错
Posted
技术标签:
【中文标题】使用 load_table_from_dataframe 时出错【英文标题】:Error while using load_table_from_dataframe 【发布时间】:2020-08-23 17:21:19 【问题描述】:我正在尝试将数据从 mysql 加载到 BigQuery。我正在使用熊猫、jaydebeapi 和 load_table_from_dataframe。 使用相同时,出现以下错误:
>>> job = client.load_table_from_dataframe(chunk, table_id)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/client.py", line 1993, in load_table_from_dataframe
parquet_compression=parquet_compression,
File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 486, in dataframe_to_parquet
arrow_table = dataframe_to_arrow(dataframe, bq_schema)
File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 450, in dataframe_to_arrow
bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field)
File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 224, in bq_to_arrow_array
return pyarrow.Array.from_pandas(series, type=arrow_type)
File "pyarrow/array.pxi", line 755, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
TypeError: an integer is required
>>>
几点:
我的源表存在并且具有以下架构:
EMPID INTEGER,
EMPNAME VARCHAR,
STREETADRESS VARCHAR,
REGION VARCHAR,
STATE VARCHAR,
COUNTRY VARCHAR,
joining_date date,
last_update_date TIMESTAMP(6) -- to hold till millisecond
我的目标表也存在于 BigQuery 中,下面是架构:
create table if not exists `Project.dataset.table_name`
(EMPID INT64,
EMPNAME STRING,
STREETADRESS STRING,
REGION STRING,
STATE STRING,
COUNTRY STRING,
joining_date DATE,
last_update_date TIMESTAMP
);
下面是我正在使用的代码:
import datetime
from google.cloud import bigquery
import pandas as pd
import jaydebeapi
import os
client = bigquery.Client()
table_id = "<project_id>.<dataset>.<target_table>"
database_host='<IP Address>'
database_user='<user id>'
database_password='<password>'
database_port='<port>'
database_db='<database_name>'
jclassname = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://host:port/database".format(host=database_host, port=database_port, database=database_db)
driver_args = [database_user, database_password]
jars = ["/<Home_Dir>/script/jars/mysql-connector-java-5.1.45.jar"]
libs = None
cnx = jaydebeapi.connect(jclassname, url, driver_args, jars=jars, libs=libs)
query='select EMPID,EMPNAME,STREETADRESS,REGION,STATE,COUNTRY,joining_date,last_update_date from <table_name>'
cursor = cnx.cursor()
for chunk in pd.read_sql(query, cnx, coerce_float=True, params=None, parse_dates=None, columns=None,chunksize=500000):chunk.apply(lambda x: x.replace(u'\r', u' ').replace(u'\n', u' ') if isinstance(x, str) or isinstance(x, unicode) else x)
job = client.load_table_from_dataframe(chunk, table_id)
job.result()
请帮助我解决问题。我也尝试使用 LoadJobConfig,但同样的错误即将到来。
【问题讨论】:
修复for chunk in pd.read_sql(...)
块中的缩进和语句。
如果您打印生成的 pandas 块的架构会很有帮助。该错误似乎表明某种模式不匹配。另外您使用的是什么版本的 Arrow 和 BQ 客户端?
嗨,我正在从 python 提示符运行它。提示输出如下: >>> EMPID EMPNAME STREETADRESS REGION STATE COUNTRY join_date last_update_date 0 168050 Arpan Roy A 3/3 Anandanagar;Dakshin Behala Road Behala West Bengal India 2005-11-17 2020-08-23 08:57:30 = =========PyArrow ========== numpy>=1.14 enum34>=1.1.6 六>=1.0.0 如何查看BQ客户端版本?
大家好,我发现了这个问题。对于 DATE 和 TIMESTAMP 列,我们需要在 pd.read_sql 中使用 parse_dates=['joining_date','last_update_date']: targettabledttscols=['joining_date','last_update_date'] for chunk in pd.read_sql(query, cnx, coerce_float=True , params=None, parse_dates=targettabledttscols, columns=None,chunksize=500000):chunk.apply(lambda x: x.replace(u'\r', u' ').replace(u'\n', u' ') if isinstance(x, str) or isinstance(x, unicode) else x) 现在我需要弄清楚为什么 pd.read_sql 的“块”中没有捕获毫秒。任何人都遇到过类似的问题并得到任何解决方案?请帮忙
【参考方案1】:
这与您复制的代码相同吗?我的意思是缩进语句呢?
for chunk in pd.read_sql(query, cnx, coerce_float=True, params=None, parse_dates=None, columns=None,chunksize=500000):
chunk.apply(lambda x: x.replace(u'\r', u' ').replace(u'\n', u' ') if isinstance(x, str) or isinstance(x, unicode) else x)
job = client.load_table_from_dataframe(chunk, table_id)
job.result()
修正以上行。
【讨论】:
嗨,我从 Python Prompt 运行它,因此缩进就是这样。我从数据框中得到了正确的输出。你提到的缩进,是有实际的python代码。 >>> EMPID EMPNAME STRETADRESS REGION STATE COUNTRY join_date last_update_date 0 168050 Arpan Roy A 3/3 Anandanagar;Dakshin Behala Road Behala West Bengal India 2005-11-17 2020-08-23 08:57:30以上是关于使用 load_table_from_dataframe 时出错的主要内容,如果未能解决你的问题,请参考以下文章
在使用加载数据流步骤的猪中,使用(使用 PigStorage)和不使用它有啥区别?
Qt静态编译时使用OpenSSL有三种方式(不使用,动态使用,静态使用,默认是动态使用)