如何在 Google Cloud Dataflow 中设置到外部数据库服务器的 SSH 隧道?

Posted

技术标签:

【中文标题】如何在 Google Cloud Dataflow 中设置到外部数据库服务器的 SSH 隧道?【英文标题】:How to set up a SSH tunnel in Google Cloud Dataflow to an external database server? 【发布时间】:2021-01-15 11:26:11 【问题描述】:

我在使用 DataflowRunner 使我的 Apache Beam 管道在 Cloud Dataflow 上工作时遇到问题。

管道的第一步是连接到托管在 VM 上的外部 Postgresql 服务器,该服务器只能通过 SSH 外部访问,端口 22,并提取一些数据。我无法更改这些防火墙规则,因此我只能通过 SSH 隧道(即端口转发)连接到数据库服务器。

在我的代码中,我使用了 python 库 sshtunnel。当使用 DirectRunner 从我的开发计算机启动管道时,它可以完美运行:

from sshtunnel import open_tunnel

with open_tunnel(
        (user_options.ssh_tunnel_host, user_options.ssh_tunnel_port),
        ssh_username=user_options.ssh_tunnel_user,
        ssh_password=user_options.ssh_tunnel_password,
        remote_bind_address=(user_options.dbhost, user_options.dbport)
    ) as tunnel:
        with beam.Pipeline(options=pipeline_options) as p:
            (p | "Read data" >> ReadFromSQL(
                host=tunnel.local_bind_host,
                port=tunnel.local_bind_port,
                username=user_options.dbusername,
                password=user_options.dbpassword,
                database=user_options.dbname,
                wrapper=PostgresWrapper,
                query=select_query
            )
                | "Format CSV" >> DictToCSV(headers)
                | "Write CSV" >> WriteToText(user_options.export_location)
            )

在非默认 VPC 中使用 DataflowRunner 启动相同的代码,其中所有入口都被拒绝但没有出口限制,并且配置了 CloudNAT,但失败并显示以下消息:

psycopg2.OperationalError:无法连接到服务器:连接被拒绝服务器是否在主机“0.0.0.0”上运行并接受端口 41697 上的 TCP/IP 连接? [在运行“读取数据/读取”时]

所以,很明显我的隧道出了点问题,但我无法确定到底是什么。我开始怀疑是否可以通过 CloudNAT 直接设置 SSH 隧道,直到我发现这篇博文:https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 声明:

Cloud Dataflow 的核心优势在于您可以调用外部服务来丰富数据。例如,您可以调用微服务来获取元素的附加数据。 在 DoFn 中,调用服务(通常通过 HTTP 完成)。 您可以完全控制选择的任何类型的连接,只要您在项目/网络中设置的防火墙规则允许。

所以应该可以设置这条隧道!我不想放弃,但我不知道下一步该尝试什么。有什么想法吗?

感谢阅读

【问题讨论】:

【参考方案1】:

问题解决了!我不敢相信我已经花了整整两天的时间...我完全看错了方向。

问题不在于某些 Dataflow 或 GCP 网络配置,据我所知...

只要您在项目/网络中设置的防火墙规则允许,您就可以完全控制选择的任何类型的连接

是真的。

问题当然出在我的代码中:只有在分布式环境中才发现问题。我犯了从主管道处理器而不是工人打开隧道的错误。所以 SSH 隧道启动了,但没有在工作人员和目标服务器之间,只是在主管道和目标服务器之间!

为了解决这个问题,我不得不更改我的请求 DoFn 以使用隧道包装查询执行:

class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""

def __init__(self, *args, **kwargs):
    self.dbport = kwargs["port"]
    self.dbhost = kwargs["host"]
    self.args = args
    self.kwargs = kwargs
    super().__init__(*args, **kwargs)

def process(self, query, *args, **kwargs):
    # Remote side of the SSH Tunnel
    remote_address = (self.dbhost, self.dbport)
    ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
    with open_tunnel(
        ssh_tunnel,
        ssh_username=self.kwargs["ssh_user"],
        ssh_password=self.kwargs["ssh_password"],
        remote_bind_address=remote_address,
        set_keepalive=10.0
    ) as tunnel:
        forwarded_port = tunnel.local_bind_port
        self.kwargs["port"] = forwarded_port
        source = sql.SQLSource(*self.args, **self.kwargs)
        sql.SQLSouceInput._build_value(source, source.runtime_params)
        logging.info("Processing - ".format(query))
        for records, schema in source.client.read(query):
            for row in records:
                yield source.client.row_as_dict(row, schema)

如您所见,我不得不重写一些 pysql_beam 库。

最后,每个工作人员为每个请求打开自己的隧道。可能可以优化此行为,但足以满足我的需求。

【讨论】:

以上是关于如何在 Google Cloud Dataflow 中设置到外部数据库服务器的 SSH 隧道?的主要内容,如果未能解决你的问题,请参考以下文章

Google Cloud Dataflow 和 Google Cloud Dataproc 有啥区别?

当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub

到 Cloud Bigtable 的 Google Cloud Dataflow 管道中的异常

Google Cloud DataFlow 作业尚不可用.. 在 Airflow

Google Cloud DataFlow 随机化 WritetoBigQuery

通过 Google Cloud Dataflow 创建/写入 Parititoned BigQuery 表