使用来自 Airflow 的 Python 在 Pub/Sub 中发布消息时遇到问题
Posted
技术标签:
【中文标题】使用来自 Airflow 的 Python 在 Pub/Sub 中发布消息时遇到问题【英文标题】:Having problem to publish a message in Pub/Sub using Python from Airflow 【发布时间】:2021-10-29 15:17:30 【问题描述】:我正在尝试在 Pub/Sub 中发布消息,但收到此错误:
[2021-08-30 23:10:55,317] taskinstance.py:1049 ERROR - 'Future' object has no attribute '_condition'
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 1046, in _run_raw_tas
task.on_success_callback(context
File "/home/airflow/gcs/plugins/logger.py", line 70, in on_success_task_instanc
log_monitoring(DAG_STATE_SUCCESS, context=context
File "/home/airflow/gcs/plugins/logger.py", line 220, in log_monitorin
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED
File "/opt/python3.6/lib/python3.6/concurrent/futures/_base.py", line 284, in wai
with _AcquireFutures(fs)
File "/opt/python3.6/lib/python3.6/concurrent/futures/_base.py", line 146, in __enter_
future._condition.acquire(
AttributeError: 'Future' object has no attribute '_condition
代码如下:
# Publishes multiple messages to a Pub/Sub topic with an error handler.
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Any, Callable
topic_path = 'projects/.../topics/test_log_monitoring'
publish_futures = []
publisher = pubsub_v1.PublisherClient()
def get_callback(
publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
try:
# Wait 60 seconds for the publish call to succeed.
print(publish_future.result(timeout=60))
except futures.TimeoutError:
print(f"Publishing data timed out.")
return callback
record =
'Key1': 'Value1',
'Key2': 'Value2',
'Key3': 'Value3'
data = json.dumps(record).encode("utf-8")
# When you publish a message, the client returns a future.
publish_future = publisher.publish(topic_path, data)
# on-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)
# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with error handler to topic_path.")
顺便说一句,我正在关注这个官方教程:https://cloud.google.com/pubsub/docs/samples/pubsub-publish-with-error-handler
你知道出了什么问题吗?
最后,是否有不同的方式来等待消息发布?
【问题讨论】:
可能这个版本不兼容:从concurrent
导入期货。 python代码试图隐式使用future._condition,但它不存在。
我使用了你的代码,对我来说效果很好。我能够发布一条消息。我更改的一件小事是在record
中我将Value1... 更改为字符串。我在测试时使用的是 google-cloud-pubsub==2.7.1。
【参考方案1】:
我能够使用代码 sn-p 重现您的问题。我使用了 Composer 版本 1.16.15 和 Airflow 版本 1.10.15。我没有安装任何额外的 python 库。
要解决此问题,请将您的 Pubsub 更新到 Cloud Composer 实例中的最新版本 2.7.1。您可以使用命令gcloud composer environments update
对其进行更新。详情请见Installing a Python dependency from PyPI。
为了能够顺利更新 Pubsub 库,请在您的 requirements.txt
中明确定义 Google 库。这是因为 Google 库依赖于其他 Google 库,请参阅Pubsub library dependencies。您可以在Cloud Composer pre installed packages 参考中获取预安装的 Google 库。但是,如果您更新了 Google 库,则可以在 requirements.txt
中包含您使用的版本。
requirements.txt
google-ads==4.0.0
google-api-core==1.26.1
google-api-python-client==1.12.8
google-apitools==0.5.31
google-auth==1.28.0
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.4.3
google-cloud-automl==2.2.0
google-cloud-bigquery==2.13.0
google-cloud-bigquery-datatransfer==3.1.0
google-cloud-bigquery-storage==2.1.0
google-cloud-bigtable==1.7.0
google-cloud-build==2.0.0
google-cloud-container==1.0.1
google-cloud-core==1.6.0
google-cloud-datacatalog==3.1.0
google-cloud-dataproc==2.3.0
google-cloud-datastore==1.15.3
google-cloud-dlp==1.0.0
google-cloud-kms==2.2.0
google-cloud-language==1.3.0
google-cloud-logging==2.2.0
google-cloud-memcache==0.3.0
google-cloud-monitoring==2.0.0
google-cloud-os-login==2.1.0
google-cloud-pubsub==2.7.1
google-cloud-pubsublite==0.3.0
google-cloud-redis==2.1.0
google-cloud-secret-manager==1.0.0
google-cloud-spanner==1.19.1
google-cloud-speech==1.3.2
google-cloud-storage==1.36.2
google-cloud-tasks==2.2.0
google-cloud-texttospeech==1.0.1
google-cloud-translate==1.7.0
google-cloud-videointelligence==1.16.1
google-cloud-vision==1.0.0
google-cloud-workflows==0.2.0
google-crc32c==1.1.2
google-pasta==0.2.0
google-resumable-media==1.2.0
googleapis-common-protos==1.53.0
graphviz==0.16
greenlet==1.0.0
grpc-google-iam-v1==0.12.3
grpcio==1.38.1
grpcio-gcp==0.2.2
更新命令:
gcloud composer environments update your-environrment-name --update-pypi-packages-from-file requirements.txt --location your-composer-location
安装后会返回完成:
在 GCP Console-> Composer-> your-environment -> PYPI Packages 中检查版本:
气流测试运行:
气流日志:
使用的 DAG:
import datetime
import airflow
from airflow.operators import bash_operator
from airflow.operators import python_operator
import json
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Any, Callable
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args =
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
def publish_error_handle():
topic_path = 'projects/your-project-id/topics/test-topic'
publish_futures = []
publisher = pubsub_v1.PublisherClient()
def get_callback(
publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
try:
# Wait 60 seconds for the publish call to succeed.
print(publish_future.result(timeout=60))
except futures.TimeoutError:
print(f"Publishing data timed out.")
return callback
record =
'Key1': 'Value1',
'Key2': 'Value2',
'Key3': 'Value3'
data = json.dumps(record).encode("utf-8")
# When you publish a message, the client returns a future.
publish_future = publisher.publish(topic_path, data)
# on-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)
# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with error handler to topic_path.")
with airflow.DAG(
'composer_sample_dag',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
publish_handle = python_operator.PythonOperator(
task_id='publish_handle',
python_callable=publish_error_handle
)
publish_handle
【讨论】:
以上是关于使用来自 Airflow 的 Python 在 Pub/Sub 中发布消息时遇到问题的主要内容,如果未能解决你的问题,请参考以下文章
使用 Python 和 Airflow 在电子邮件中发送 Redshift 查询结果
如何使用 AirFlow 运行 python 文件的文件夹?
airflow实战系列 基于 python 的调度和监控工作流的平台