气流 mysql 到 gcp Dag 错误

Posted

技术标签:

【中文标题】气流 mysql 到 gcp Dag 错误【英文标题】:Airflow mysql to gcp Dag error 【发布时间】:2017-07-20 23:09:47 【问题描述】:

我最近开始使用 Airflow。我正在研究 DAG:

    查询 mysql 数据库 提取查询并将其作为 JSON 文件存储在云存储桶中 将存储的 JSON 文件上传到 BigQuery

Dag 导入三个运算符:MySqlOperatorMySqlToGoogleCloudStorageOperatorGoogleCloudStorageToBigQueryOperator

我正在使用 Airflow 1.8.0、Python 3 和 Pandas 0.19.0。

这是我的 Dag 代码:

sql2gcp_csv = MySqlToGoogleCloudStorageOperator(

    task_id='sql2gcp_csv',
    sql='airflow_gcp/aws_sql_extract_7days.sql',
    bucket='gs://var.value.gcs_bucket/ ds_nodash /',
    filename=' ds_nodash -account-*.json',
    schema_filename='support/file.json',
    approx_max_file_size_bytes=1900000000,
    mysql_conn_id='aws_mysql',
    google_cloud_storage_conn_id='airflow_gcp',

)

但是,当我运行它时,我收到以下错误:

[2017-07-20 22:38:07,478] models.py:1441 INFO - Marking task as FAILED. 

[2017-07-20 22:38:07,490] models.py:1462 ERROR - a bytes-like object is required, not 'str'

/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to MySqlOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: 'database': 'test'
category=PendingDeprecationWarning

/home/User/airflow/workspace/env/lib/python3.5/site-
packages/airflow/ti_deps/deps/base_ti_dep.py:94: PendingDeprecationWarning: generator '_get_dep_statuses' raised StopIteration

for dep_status in self._get_dep_statuses(ti, session, dep_context):
Traceback (most recent call last):

File "/home/User/airflow/workspace/env/bin/airflow", line 28, in <module> args.func(args)

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/bin/cli.py", line 422, in run pool=args.pool,

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/utils/db.py", line 53, in wrapper result = func(*args, **kwargs)

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/models.py", line 1374, in run result = task_copy.execute(context=context)

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 91, in execute files_to_upload = self._write_local_data_files(cursor)

File "/home/User/airflow/workspace/env/lib/python3.5/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 136, in _write_local_data_files 
json.dump(row_dict, tmp_file_handle)

File "/usr/lib/python3.5/json/__init__.py", line 179, in dump 

TypeError: a bytes-like object is required, not 'str'

有谁知道为什么会抛出这个异常?

【问题讨论】:

【参考方案1】:

根据您的回溯,您的代码在this point 处中断。如您所见,它处理代码:

json.dump(row_dict, tmp_file_handle)

tmp_file_handle 是一个NamedTemporaryFile initialized 带有默认输入参数,也就是说,它模拟一个以w+b 模式打开的文件(因此只接受字节类数据作为输入)。

问题在于,在 Python 2 中,所有字符串都是字节,而在 Python 3 中,字符串是文本(默认编码为 utf-8)。

如果您打开 Python 2 并运行此代码:

In [1]: from tempfile import NamedTemporaryFile
In [2]: tmp_f = NamedTemporaryFile(delete=True)
In [3]: import json
In [4]: json.dump('1': 1, tmp_f)

效果很好。

但如果你打开一个 Python 3 并运行相同的代码:

In [54]: from tempfile import NamedTemporaryFile
In [55]: tmp_f = NamedTemporaryFile(delete=True)
In [56]: import json
In [57]: json.dump('1': 1, tmp_f)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-57-81743b9013c4> in <module>()
----> 1 json.dump('1': 1, tmp_f)

/usr/local/lib/python3.6/json/__init__.py in dump(obj, fp, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    178     # a debuggability cost
    179     for chunk in iterable:
--> 180         fp.write(chunk)
    181 
    182 

/usr/local/lib/python3.6/tempfile.py in func_wrapper(*args, **kwargs)
    481             @_functools.wraps(func)
    482             def func_wrapper(*args, **kwargs):
--> 483                 return func(*args, **kwargs)
    484             # Avoid closing the file as long as the wrapper is alive,
    485             # see issue #18879.

TypeError: a bytes-like object is required, not 'str'

我们得到了和你一样的错误。

这意味着 Python 3 仍然不完全支持 Airflow(正如您在 test coverage 中看到的那样,模块 airflow/contrib/operators/mysql_to_gcs.py 尚未在 python 2 或 3 中测试)。确认这一点的一种方法是使用 python 2 运行您的代码并查看它是否有效。

我建议在their JIRA 上创建一个问题,请求两个版本的 Python 的可移植性。

【讨论】:

以上是关于气流 mysql 到 gcp Dag 错误的主要内容,如果未能解决你的问题,请参考以下文章

在气流上部署 dag 文件的有效方法

气流 DAG EMR EmrCreateJobFlowOperator 不执行任何操作

通过 Cloud Composer 运行气流时出现授权错误

Bash 操作员错误:气流中没有这样的文件或目录

如何防止气流回填 dag 运行?

如何防止气流回填dag运行?