如何从 Python SDK 中的 Dataflow 作业连接到 CloudSQL?
Posted
技术标签:
【中文标题】如何从 Python SDK 中的 Dataflow 作业连接到 CloudSQL?【英文标题】:How to connect to CloudSQL from a Dataflow job in Python SDK? 【发布时间】:2021-11-06 22:30:32 【问题描述】:---------------> 阅读编辑第一
我正在尝试开发一个可以读取和写入 CloudSQL 的 Dataflow 管道,但我面临很多连接问题。
首先,没有本地模板/解决方案可以做到这一点,所以我使用社区开发的库 -> beam-nuggets 它为 apache beam python SDK 提供了一系列转换。
这是我到目前为止所做的:
模板
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from beam_nuggets.io import relational_db
def main():
# get the cmd args
db_args, pipeline_args = get_args()
# Create the pipeline
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=options) as p:
source_config = relational_db.SourceConfiguration(
drivername=db_args.drivername,
host=db_args.host,
port=db_args.port,
database=db_args.database,
username=db_args.username,
password=db_args.password,
)
data = p | "Reading records from db" >> relational_db.ReadFromDB(
source_config=source_config,
table_name=db_args.table
query='select name, num from months' # optional. When omitted, all table records are returned.
)
records | 'Writing to stdout' >> beam.Map(print)
def get_args():
parser = argparse.ArgumentParser()
# adding expected database args
parser.add_argument('--drivername', dest='drivername', default='mysql+pymysql')
parser.add_argument('--host', dest='host', default='cloudsql_instance_connection_name')
parser.add_argument('--port', type=int, dest='port', default=3307)
parser.add_argument('--database', dest='database', default='irmdb')
parser.add_argument('--username', dest='username', default='root')
parser.add_argument('--password', dest='password', default='****')
parser.add_argument('--table', dest='table', default="table_name")
parsed_db_args, pipeline_args = parser.parse_known_args()
return parsed_db_args, pipeline_args
if __name__ == '__main__':
main()
作业已在 Dataflow 中正确创建,但仍处于加载状态,不显示任何日志:
自从我停止工作后它显示为红色。
管道选项:
为什么我无法连接?我错过了什么?
提前感谢您的帮助。
------------------> 编辑
由于使用 beam-nugget 库没有得到任何结果,我已切换到由 Google 创建的 cloud-sql-python-connector 库。
让我们从头开始。
template.py
import argparse
from google.cloud.sql.connector import connector
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class ReadSQLTable(beam.DoFn):
"""
parDo class to get all table names of a given cloudSQL database.
It will return each table name.
"""
def __init__(self, hostaddr, host, username, password, dbname):
super(ReadSQLTable, self).__init__()
self.hostaddr = hostaddr
self.host = host
self.username = username
self.password = password
self.dbname = dbname
def process(self, element):
# Connect to database
conn = connector.connect(
self.hostaddr,
self.host,
user=self.username,
password=self.password,
db=self.dbname
)
# Execute a query
cursor = conn.cursor()
cursor.execute("SELECT * from table_name")
# Fetch the results
result = cursor.fetchall()
# Do something with the results
for row in result:
print(row)
def main(argv=None, save_main_session=True):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--hostaddr',
dest='hostaddr',
default='project_name:region:instance_name',
help='Host Address')
parser.add_argument(
'--host',
dest='host',
default='pymysql',
help='Host')
parser.add_argument(
'--username',
dest='username',
default='root',
help='CloudSQL User')
parser.add_argument(
'--password',
dest='password',
default='password',
help='Host')
parser.add_argument(
'--dbname',
dest='dbname',
default='dbname',
help='Database name')
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
# Read the text file[pattern] into a PCollection.
# Create a dummy initiator PCollection with one element
init = p | 'Begin pipeline with initiator' >> beam.Create(['All tables initializer'])
tables = init | 'Get table names' >> beam.ParDo(ReadSQLTable(
host=known_args.host,
hostaddr=known_args.hostaddr,
dbname=known_args.dbname,
username=known_args.username,
password=known_args.password))
if __name__ == '__main__':
# logging.getLogger().setLevel(logging.INFO)
main()
按照Apache Beam documentation,我们应该上传一个 requirements.txt 文件来获取必要的包。
requirements.txt
cloud-sql-python-connector==0.4.0
之后,我们应该可以创建数据流模板了。
python3 -m template --runner DataflowRunner /
--project project_name /
--staging_location gs://bucket_name/folder/staging /
--temp_location gs://bucket_name/folder/temp /
--template_location gs://bucket_name/folder//templates/template-df /
--region europe-west1 /
--requirements_file requirements.txt
但是当我尝试执行时,出现如下错误:
没有安装这些库... apache-beam 和 cloud-sql-python-connector 都没有
由于我在 Cloud shell 上遇到此错误,因此我尝试直接在 shell 上下载软件包(听起来很绝望,我很绝望。)
pip3 install -r requirements.txt
pip3 install wheel
pip3 install 'apache-beam[gcp]'
然后我再次执行该函数。现在模板已正确创建:
此外,我们应该创建一个模板元数据,其中包含一些有关参数的信息。我不知道我是否必须在这里添加任何其他内容,所以:
"description": "An example pipeline.",
"name": "Motor prueba",
"parameters": [
]
最后,我能够创建和执行管道,但作为最后一次,它仍然在加载而不显示任何日志:
有什么线索吗? :/
【问题讨论】:
警告:我没有使用过 nugget 库。在 Dataflow 中,我们通常建议使用我们(Google)提供的 jdbc 连接器,因为它可以处理很多奇怪的连接问题。话虽如此,您的默认端口显示为 3307,请尝试将端口 3306 作为快速“消除可能的问题”? 如果有帮助,请链接到使用 jdbc 库回答的 SO 问题:***.com/questions/44699643/… @GabeWeiss 据我所知,jdbc 连接器在 Python 上不可用,是吗?出于这个原因,我正在使用外部库... Derp,这将教会我在回答之前不要仔细查看完整的问题。 :) 我一直忘记 Dataflow 现在也有 Python(在此处插入“回到我的一天”评论)。然而!可能会有帮助,我们在这里也有一个 Python 连接器:github.com/GoogleCloudPlatform/cloud-sql-python-connector 它是由一个与我密切合作的同事编写的,所以如果你想走那条路,请告诉我进展如何,我可以通过反馈反馈并在需要时获得答案。 哇!极好的。让我检查一下,我很快就会回来。提前致谢! @GabeWeiss 【参考方案1】:我没有使用过 nugget.io 库,所以我不熟悉它如何处理连接。我建议试用 Google 维护的 Python 连接器:
https://github.com/GoogleCloudPlatform/cloud-sql-python-connector
看看它是否能够为你连接。
【讨论】:
谢谢,加布!主要问题是如何通过 Dataflow 连接它......我不是 Apache Beam 专家 ^^。我应该创建某种个性化函数来建立与 SQL 的连接吗? 哦,是的,我也对 Dataflow/Beam 不满意。话虽如此,我很确定您需要在 ParDo 中使用连接。检查这个答案(不要做它正在做的事情,但它应该为您提供创建连接部分的指南):***.com/questions/50701736/…。具体来说,您不想做它正在做的事情,因为它会将每个不好的连接都列入白名单。连接器应该能够在连接代码中使用,因此您不必打开实例。 嗨 Gabe,我已经更新了问题以添加更多信息 :) 嗯,这很奇怪。可悲的是我的数据流知识非常有限,让我戳一下同事,看看他是否有任何想法。 我将不胜感激!谢谢,加布!以上是关于如何从 Python SDK 中的 Dataflow 作业连接到 CloudSQL?的主要内容,如果未能解决你的问题,请参考以下文章
如何从用于 vm 的 Azure Python SDK 获取 OS 磁盘相关的详细信息
如何从我在 Google App Engine SDK 上运行的 Python 应用程序访问本地 MySQL 实例?
如何通过手机sdk中的afnetworking从服务器获取数据