本地 Pubsub 模拟器不适用于 Dataflow

Posted

技术标签:

【中文标题】本地 Pubsub 模拟器不适用于 Dataflow【英文标题】:Local Pubsub Emulator won't work with Dataflow 【发布时间】:2020-07-23 00:36:37 【问题描述】:

我正在用 Java 开发 Dataflow,输入来自 Pubsub。后来我看到了here关于如何使用本地Pubsub模拟器的指南,所以我不需要部署到GCP来测试。

这是我的简单代码:

private interface Options extends PipelineOptions, PubsubOptions, StreamingOptions 

    @Description("Pub/Sub topic to read messages from")
    String getTopic();
    void setTopic(String topic);

    @Description("Pub/Sub subscription to read messages from")
    String getSubscription();
    void setSubscription(String subscription);

    @Description("Local file output")
    String getOutput();
    void setOutput(String output);


public static void main(String[] args) 

    Options options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(Options.class);
    options.setStreaming(true);
    options.setPubsubRootUrl("localhost:8085");

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
        // other .apply's

    pipeline.run();


我能够按照指南进行操作,包括我需要使用示例 Python 代码创建主题、订阅、发布者甚至发布消息的部分。当我使用 Python 代码与 Pubsub 模拟器交互时,我注意到运行模拟器的命令行中的消息 Detected HTTP/2 connection

Executing: cmd /c C:\...\google-cloud-sdk\platform\pubsub-emulator\bin\cloud-pubsub-emulator.bat --host=localhost --port=8085
[pubsub] This is the Google Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
[pubsub] Apr 10, 2020 3:33:26 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL checks are not supported
[pubsub] Apr 10, 2020 3:33:26 PM io.gapi.emulators.netty.NettyUtil applyJava7LongHostnameWorkaround
[pubsub] INFO: Unable to apply Java 7 long hostname workaround.
[pubsub] Apr 10, 2020 3:33:27 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: Server started, listening on 8085
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.

我使用 Dataflow Pipeline Run Configuration 在 Eclipse 中编译/运行代码,但遇到了问题。

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create subscription: 
...
Caused by: java.lang.RuntimeException: Failed to create subscription: 
    at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.createRandomSubscription(PubsubUnboundedSource.java:1427)
...
Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: unknown protocol: localhost
...
Caused by: java.net.MalformedURLException: unknown protocol: localhost

当我尝试在options.setPubsubRootUrl("localhost:8085") 行中添加http 时,我得到一个无限重复的异常:

com.google.api.client.http.HttpRequest execute
WARNING: exception thrown while executing request
java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source)
    at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
    at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
    at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
    at java.net.PlainSocketImpl.connect(Unknown Source)
    at java.net.SocksSocketImpl.connect(Unknown Source)

它似乎到达了 Pubsub 模拟器,但无法连接,因为我运行模拟器的命令行也无限生成:

[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected non-HTTP/2 connection.

如何让我的 Dataflow 与 Pubsub 模拟器一起工作?

【问题讨论】:

【参考方案1】:

您正在尝试使用 Beam 2.5 SDK 的 Dataflow 分支从 Beam Direct Runner 连接到 Pubsub 模拟器。自 2019 年 6 月 6 日起,Dataflow 2.5 SDK 和 Eclipse 插件已被弃用。但这应该可以工作。

正如您所发现的,您需要在 Beam 中为您的 PubsubRootUrl 加上“http://”前缀。您看到的第二个问题表明localhost:8085 上没有任何内容正在监听。这可能是因为实际上有 2 个 localhost:IPv4 和 IPv6。 Pubsub Emulator 仅侦听 IPv4,Windows 首先尝试 IPv6。尝试将localhost 替换为127.0.0.1 以强制使用IPv4。你应该得到这个:

options.setPubsubRootUrl("http://127.0.0.1:8085")

【讨论】:

我遇到了同样的问题(尽管我在 OSX 上)并尝试了您的解决方案@andrew-pilloud,但情况并没有什么不同......还有其他想法吗?

以上是关于本地 Pubsub 模拟器不适用于 Dataflow的主要内容,如果未能解决你的问题,请参考以下文章

本地通知适用于 ios 模拟器但不适用于设备

尝试在本地运行 PubSub 模拟器时出错

Firebase 函数不会看到 pubsub 模拟器在本地运行

如何使用 pubsub 模拟器在本地调用 firebase Schedule 函数

如何使用 pubsub 模拟器在本地调用 firebase Schedule 函数

Firebase 模拟器:在函数中使用 PubSub