如何运行 Apache Beam 集成测试?

Posted

技术标签:

【中文标题】如何运行 Apache Beam 集成测试?【英文标题】:How do I run Apache Beam Integration tests? 【发布时间】:2021-06-16 02:16:20 【问题描述】:

我正在尝试运行 https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/game 此处找到的游戏统计示例管道和集成测试,但我不确定设置本地环境的正确方法是什么。

我的主要目标是学习如何使用 TestDataflowRunner,以便我可以为我编写的现有管道实施集成测试。

[更新] 我编写了一个基本的数据流,它从 PubSub 读取消息并将其写入不同的主题。我有一个使用 TestDirectRunner 通过的集成测试,但在尝试使用 TestDataflowRunner 时出现错误

pipeline.py

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions


def run(argv=None):
  """Build and run the pipeline."""
  parser = argparse.ArgumentParser()  
  parser.add_argument('--output_topic', required=True)
  parser.add_argument('--input_subscription', required=True)

  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(StandardOptions).streaming = True
  with beam.Pipeline(options=pipeline_options) as p:
    # Read from PubSub into a PCollection.
    messages = (
        p |
        beam.io.ReadFromPubSub(subscription=known_args.input_subscription).
        with_output_types(bytes)
    )

    lines = messages | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))

    def format_pubsub(msg):
        logging.info(f'Format PubSub: msg')
        return str(msg)

    output = (
        lines
        | 'format' >> beam.Map(format_pubsub)
        | 'encode' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes))

    output | beam.io.WriteToPubSub(known_args.output_topic)

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

pubsub_it_test.py

from __future__ import absolute_import

import logging
import os
import time
import unittest
import uuid

from hamcrest.core.core.allof import all_of
from nose.plugins.attrib import attr

from apache_beam.io.gcp.tests import utils
from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
from apache_beam.runners.runner import PipelineState
from apache_beam.testing import test_utils
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline

import pipeline


INPUT_TOPIC = 'wordcount-input'
OUTPUT_TOPIC = 'wordcount-output'
INPUT_SUB = 'wordcount-input-sub'
OUTPUT_SUB = 'wordcount-output-sub'

DEFAULT_INPUT_NUMBERS = 1
WAIT_UNTIL_FINISH_DURATION = 12 * 60 * 1000  # in milliseconds


class TestIT(unittest.TestCase):
    def setUp(self):
        self.test_pipeline = TestPipeline(is_integration_test=True)
        self.project = self.test_pipeline.get_option('project')
        self.uuid = str(uuid.uuid4())

        # Set up PubSub environment.
        from google.cloud import pubsub
        self.pub_client = pubsub.PublisherClient()
        self.input_topic = self.pub_client.create_topic(
            self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
        self.output_topic = self.pub_client.create_topic(
            self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))

        self.sub_client = pubsub.SubscriberClient()
        self.input_sub = self.sub_client.create_subscription(
            self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
            self.input_topic.name)
        self.output_sub = self.sub_client.create_subscription(
            self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
            self.output_topic.name,
            ack_deadline_seconds=60)
    
    def _inject_numbers(self, topic, num_messages):
        """Inject numbers as test data to PubSub."""
        logging.debug('Injecting %d numbers to topic %s', num_messages, topic.name)
        for n in range(num_messages):
            self.pub_client.publish(self.input_topic.name, str(n).encode('utf-8'))

    def tearDown(self):
        test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub, self.output_sub])
        test_utils.cleanup_topics(self.pub_client, [self.input_topic, self.output_topic])
  
    @attr('IT')
    def test_pubsub_pipe_it(self):
        # Build expected dataset.
        expected_msg = [('%d' % num).encode('utf-8') for num in range(DEFAULT_INPUT_NUMBERS)]

        # Set extra options to the pipeline for test purpose
        state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
        pubsub_msg_verifier = PubSubMessageMatcher(self.project, self.output_sub.name, expected_msg, timeout=400)
        extra_opts = 
            'input_subscription': self.input_sub.name,
            'output_topic': self.output_topic.name,
            'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
            'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)
        

        # Generate input data and inject to PubSub.
        self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)

        # Get pipeline options from command argument: --test-pipeline-options,
        # and start pipeline job by calling pipeline main function.
        pipeline.run(self.test_pipeline.get_full_options_as_args(**extra_opts))

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.DEBUG)
    unittest.main()

我在数据流日志中收到此错误

Error message from worker: generic::unknown: Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 290, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 275, in loads
    return load(file, ignore, **kwds)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 826, in _import_module
    return __import__(import_name)
ModuleNotFoundError: No module named 'pipeline'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
    response = task()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 638, in process_bundle
    instruction_id, request.process_bundle_descriptor_id)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 467, in get
    self.data_channel_factory)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 868, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 925, in create_execution_tree
    descriptor.transforms, key=topological_height, reverse=True)])
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in <listcomp>
    get_operation(transform_id))) for transform_id in sorted(
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in <dictcomp>
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in <dictcomp>
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in <dictcomp>
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 909, in get_operation
    transform_id, transform_consumers)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1198, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1546, in create_par_do
    parameter)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1582, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 294, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 275, in loads
    return load(file, ignore, **kwds)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 826, in _import_module
    return __import__(import_name)
ModuleNotFoundError: No module named 'pipeline'

passed through:
==>
    dist_proc/dax/workflow/worker/fnapi_service.cc:631 

我正在运行的命令是

pytest --log-cli-level=INFO pubsub_it_test.py --test-pipeline-options="--runner=TestDataflowRunner \
    --project=$PROJECT --region=europe-west1 \
    --staging_location=gs://$BUCKET/staging \
    --temp_location=gs://$BUCKET/temp \
    --job_name=it-test-pipeline \
    --setup_file ./setup.py"

我的这个管道的回购可以在这里找到https://github.com/tunnelWithAC/apache-beam-integration-test

谢谢

【问题讨论】:

你可以尝试从这个dataflow quick start开始,它展示了如何在本地和通过数据流服务运行数据流。 【参考方案1】:

集成测试旨在由 Beam 的 CI/CD 基础架构运行。它们基于nose,需要自定义插件才能理解--test-pipeline-options 标志。不建议走这条路。

我会遵循 Ricco D 为环境建议的 quick start 指南。您可以使用 pytest 运行集成测试。要使用相同的--test-pipeline-options 标志,您需要this definition。否则,wordcount 示例显示了如何设置您自己的命令行标志。


更新:

我用它来设置 virtualenv:

pip install apache-beam[gcp,test]

test 标签引入 pytest,但如果您已经安装了 pytest,则不需要。

然后我创建了这个conftest.py 文件来配置pytest(基于Beam 自己的conftest.py):

def pytest_addoption(parser):
  parser.addoption('--test-pipeline-options',
                   help='Options to use in test pipelines. NOTE: Tests may '
                        'ignore some or all of these options.')

运行测试:

pytest --log-cli-level=INFO pipeline_it_test.py --test-pipeline-options="--runner=TestDataflowRunner --project=PROJECT --region=us-west1 --staging_location=gs://BUCKET/staging --temp_location=gs://BUCKET/temp --output=gs://BUCKET/output "

您的测试可能需要 --test-pipeline-options 中的所有选项。

【讨论】:

您能否分享一个使用 PyTest 运行集成测试的示例? 我已经根据您的示例代码更新了我的答案 感谢您迄今为止的所有帮助。我可以使用 TestDirectRunner 成功运行测试,但出现 TestDataflowError 错误。我已经添加了我最新的代码和上面的错误,如果你知道是什么导致了这个问题,你能告诉我吗? Beam 没有找到 pipeline.py。从您的存储库中的 setup.py 来看,我认为将 pipeline.py 放在 porter/ 下会有所帮助。另见:***.com/a/58845832/63302 谢谢,现在一切正常。非常感谢您的帮助

以上是关于如何运行 Apache Beam 集成测试?的主要内容,如果未能解决你的问题,请参考以下文章

测试技术如何做好系统集成测试?

jenkins,设置集成测试环境,就是构建过程先部署服务,然后运行整体测试用例,执行完后在关闭服务

如何使用 TestContainers 创建 apache spark 独立集群以进行集成测试?

单元测试集成测试

单测覆盖率都100%了,还需要集成测试吗?

IIS托管WCF服务:集成测试和代码覆盖