如何在 Google Cloud Dataflow 中设置到外部数据库服务器的 SSH 隧道?
Posted
技术标签:
【中文标题】如何在 Google Cloud Dataflow 中设置到外部数据库服务器的 SSH 隧道?【英文标题】:How to set up a SSH tunnel in Google Cloud Dataflow to an external database server? 【发布时间】:2021-01-15 11:26:11 【问题描述】:我在使用 DataflowRunner 使我的 Apache Beam 管道在 Cloud Dataflow 上工作时遇到问题。
管道的第一步是连接到托管在 VM 上的外部 Postgresql 服务器,该服务器只能通过 SSH 外部访问,端口 22,并提取一些数据。我无法更改这些防火墙规则,因此我只能通过 SSH 隧道(即端口转发)连接到数据库服务器。
在我的代码中,我使用了 python 库 sshtunnel。当使用 DirectRunner 从我的开发计算机启动管道时,它可以完美运行:
from sshtunnel import open_tunnel
with open_tunnel(
(user_options.ssh_tunnel_host, user_options.ssh_tunnel_port),
ssh_username=user_options.ssh_tunnel_user,
ssh_password=user_options.ssh_tunnel_password,
remote_bind_address=(user_options.dbhost, user_options.dbport)
) as tunnel:
with beam.Pipeline(options=pipeline_options) as p:
(p | "Read data" >> ReadFromSQL(
host=tunnel.local_bind_host,
port=tunnel.local_bind_port,
username=user_options.dbusername,
password=user_options.dbpassword,
database=user_options.dbname,
wrapper=PostgresWrapper,
query=select_query
)
| "Format CSV" >> DictToCSV(headers)
| "Write CSV" >> WriteToText(user_options.export_location)
)
在非默认 VPC 中使用 DataflowRunner 启动相同的代码,其中所有入口都被拒绝但没有出口限制,并且配置了 CloudNAT,但失败并显示以下消息:
psycopg2.OperationalError:无法连接到服务器:连接被拒绝服务器是否在主机“0.0.0.0”上运行并接受端口 41697 上的 TCP/IP 连接? [在运行“读取数据/读取”时]
所以,很明显我的隧道出了点问题,但我无法确定到底是什么。我开始怀疑是否可以通过 CloudNAT 直接设置 SSH 隧道,直到我发现这篇博文:https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 声明:
Cloud Dataflow 的核心优势在于您可以调用外部服务来丰富数据。例如,您可以调用微服务来获取元素的附加数据。 在 DoFn 中,调用服务(通常通过 HTTP 完成)。 您可以完全控制选择的任何类型的连接,只要您在项目/网络中设置的防火墙规则允许。
所以应该可以设置这条隧道!我不想放弃,但我不知道下一步该尝试什么。有什么想法吗?
感谢阅读
【问题讨论】:
【参考方案1】:问题解决了!我不敢相信我已经花了整整两天的时间...我完全看错了方向。
问题不在于某些 Dataflow 或 GCP 网络配置,据我所知...
只要您在项目/网络中设置的防火墙规则允许,您就可以完全控制选择的任何类型的连接
是真的。
问题当然出在我的代码中:只有在分布式环境中才发现问题。我犯了从主管道处理器而不是工人打开隧道的错误。所以 SSH 隧道启动了,但没有在工作人员和目标服务器之间,只是在主管道和目标服务器之间!
为了解决这个问题,我不得不更改我的请求 DoFn 以使用隧道包装查询执行:
class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""
def __init__(self, *args, **kwargs):
self.dbport = kwargs["port"]
self.dbhost = kwargs["host"]
self.args = args
self.kwargs = kwargs
super().__init__(*args, **kwargs)
def process(self, query, *args, **kwargs):
# Remote side of the SSH Tunnel
remote_address = (self.dbhost, self.dbport)
ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
with open_tunnel(
ssh_tunnel,
ssh_username=self.kwargs["ssh_user"],
ssh_password=self.kwargs["ssh_password"],
remote_bind_address=remote_address,
set_keepalive=10.0
) as tunnel:
forwarded_port = tunnel.local_bind_port
self.kwargs["port"] = forwarded_port
source = sql.SQLSource(*self.args, **self.kwargs)
sql.SQLSouceInput._build_value(source, source.runtime_params)
logging.info("Processing - ".format(query))
for records, schema in source.client.read(query):
for row in records:
yield source.client.row_as_dict(row, schema)
如您所见,我不得不重写一些 pysql_beam 库。
最后,每个工作人员为每个请求打开自己的隧道。可能可以优化此行为,但足以满足我的需求。
【讨论】:
以上是关于如何在 Google Cloud Dataflow 中设置到外部数据库服务器的 SSH 隧道?的主要内容,如果未能解决你的问题,请参考以下文章
Google Cloud Dataflow 和 Google Cloud Dataproc 有啥区别?
当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub
到 Cloud Bigtable 的 Google Cloud Dataflow 管道中的异常
Google Cloud DataFlow 作业尚不可用.. 在 Airflow