使用来自 Airflow 的 Python 在 Pub/Sub 中发布消息时遇到问题

Posted

技术标签:

【中文标题】使用来自 Airflow 的 Python 在 Pub/Sub 中发布消息时遇到问题【英文标题】:Having problem to publish a message in Pub/Sub using Python from Airflow 【发布时间】:2021-10-29 15:17:30 【问题描述】:

我正在尝试在 Pub/Sub 中发布消息,但收到此错误:

[2021-08-30 23:10:55,317] taskinstance.py:1049 ERROR - 'Future' object has no attribute '_condition'
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 1046, in _run_raw_tas
    task.on_success_callback(context
  File "/home/airflow/gcs/plugins/logger.py", line 70, in on_success_task_instanc
    log_monitoring(DAG_STATE_SUCCESS, context=context
  File "/home/airflow/gcs/plugins/logger.py", line 220, in log_monitorin
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED
  File "/opt/python3.6/lib/python3.6/concurrent/futures/_base.py", line 284, in wai
    with _AcquireFutures(fs)
  File "/opt/python3.6/lib/python3.6/concurrent/futures/_base.py", line 146, in __enter_
    future._condition.acquire(
AttributeError: 'Future' object has no attribute '_condition

代码如下:

# Publishes multiple messages to a Pub/Sub topic with an error handler. 
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Any, Callable

topic_path = 'projects/.../topics/test_log_monitoring'
publish_futures = []

publisher = pubsub_v1.PublisherClient()

def get_callback(
    publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
    def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
        try:
            # Wait 60 seconds for the publish call to succeed.
            print(publish_future.result(timeout=60))
        except futures.TimeoutError:
            print(f"Publishing data timed out.")

    return callback

record = 
    'Key1': 'Value1',
    'Key2': 'Value2',
    'Key3': 'Value3'


data = json.dumps(record).encode("utf-8")
# When you publish a message, the client returns a future.
publish_future = publisher.publish(topic_path, data)
# on-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to topic_path.")

顺便说一句,我正在关注这个官方教程:https://cloud.google.com/pubsub/docs/samples/pubsub-publish-with-error-handler

你知道出了什么问题吗?

最后,是否有不同的方式来等待消息发布?

【问题讨论】:

可能这个版本不兼容:从concurrent导入期货。 python代码试图隐式使用future._condition,但它不存在。 我使用了你的代码,对我来说效果很好。我能够发布一条消息。我更改的一件小事是在record 中我将Value1... 更改为字符串。我在测试时使用的是 google-cloud-pubsub==2.7.1 【参考方案1】:

我能够使用代码 sn-p 重现您的问题。我使用了 Composer 版本 1.16.15Airflow 版本 1.10.15。我没有安装任何额外的 python 库。

要解决此问题,请将您的 Pubsub 更新到 Cloud Composer 实例中的最新版本 2.7.1。您可以使用命令gcloud composer environments update 对其进行更新。详情请见Installing a Python dependency from PyPI。

为了能够顺利更新 Pubsub 库,请在您的 requirements.txt 中明确定义 Google 库。这是因为 Google 库依赖于其他 Google 库,请参阅Pubsub library dependencies。您可以在Cloud Composer pre installed packages 参考中获取预安装的 Google 库。但是,如果您更新了 Google 库,则可以在 requirements.txt 中包含您使用的版本。

requirements.txt

google-ads==4.0.0
google-api-core==1.26.1
google-api-python-client==1.12.8
google-apitools==0.5.31
google-auth==1.28.0
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.4.3
google-cloud-automl==2.2.0
google-cloud-bigquery==2.13.0
google-cloud-bigquery-datatransfer==3.1.0
google-cloud-bigquery-storage==2.1.0
google-cloud-bigtable==1.7.0
google-cloud-build==2.0.0
google-cloud-container==1.0.1
google-cloud-core==1.6.0
google-cloud-datacatalog==3.1.0
google-cloud-dataproc==2.3.0
google-cloud-datastore==1.15.3
google-cloud-dlp==1.0.0
google-cloud-kms==2.2.0
google-cloud-language==1.3.0
google-cloud-logging==2.2.0
google-cloud-memcache==0.3.0
google-cloud-monitoring==2.0.0
google-cloud-os-login==2.1.0
google-cloud-pubsub==2.7.1
google-cloud-pubsublite==0.3.0
google-cloud-redis==2.1.0
google-cloud-secret-manager==1.0.0
google-cloud-spanner==1.19.1
google-cloud-speech==1.3.2
google-cloud-storage==1.36.2
google-cloud-tasks==2.2.0
google-cloud-texttospeech==1.0.1
google-cloud-translate==1.7.0
google-cloud-videointelligence==1.16.1
google-cloud-vision==1.0.0
google-cloud-workflows==0.2.0
google-crc32c==1.1.2
google-pasta==0.2.0
google-resumable-media==1.2.0
googleapis-common-protos==1.53.0
graphviz==0.16
greenlet==1.0.0
grpc-google-iam-v1==0.12.3
grpcio==1.38.1
grpcio-gcp==0.2.2

更新命令:

gcloud composer environments update your-environrment-name --update-pypi-packages-from-file requirements.txt --location your-composer-location

安装后会返回完成:

在 GCP Console-> Composer-> your-environment -> PYPI Packages 中检查版本:

气流测试运行:

气流日志:

使用的 DAG:

import datetime

import airflow
from airflow.operators import bash_operator
from airflow.operators import python_operator
import json
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Any, Callable

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = 
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,


def publish_error_handle():
    topic_path = 'projects/your-project-id/topics/test-topic'
    publish_futures = []

    publisher = pubsub_v1.PublisherClient()

    def get_callback(
        publish_future: pubsub_v1.publisher.futures.Future, data: str
    ) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
        def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
            try:
                # Wait 60 seconds for the publish call to succeed.
                print(publish_future.result(timeout=60))
            except futures.TimeoutError:
                print(f"Publishing data timed out.")

        return callback

    record = 
        'Key1': 'Value1',
        'Key2': 'Value2',
        'Key3': 'Value3'
    

    data = json.dumps(record).encode("utf-8")
    # When you publish a message, the client returns a future.
    publish_future = publisher.publish(topic_path, data)
    # on-blocking. Publish failures are handled in the callback function.
    publish_future.add_done_callback(get_callback(publish_future, data))
    publish_futures.append(publish_future)

    # Wait for all the publish futures to resolve before exiting.
    futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

    print(f"Published messages with error handler to topic_path.")

with airflow.DAG(
        'composer_sample_dag',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    publish_handle = python_operator.PythonOperator(
        task_id='publish_handle',
        python_callable=publish_error_handle
    )

    publish_handle    

【讨论】:

以上是关于使用来自 Airflow 的 Python 在 Pub/Sub 中发布消息时遇到问题的主要内容,如果未能解决你的问题,请参考以下文章

使用 Python 和 Airflow 在电子邮件中发送 Redshift 查询结果

airflow的使用

如何使用 AirFlow 运行 python 文件的文件夹?

airflow实战系列 基于 python 的调度和监控工作流的平台

Airflow 的 bigqueryoperator 不能与 udf 一起使用

大数据调度平台Airflow:Airflow单机搭建