如何从 Python SDK 中的 Dataflow 作业连接到 CloudSQL?

Posted

技术标签:

【中文标题】如何从 Python SDK 中的 Dataflow 作业连接到 CloudSQL?【英文标题】:How to connect to CloudSQL from a Dataflow job in Python SDK? 【发布时间】:2021-11-06 22:30:32 【问题描述】:

---------------> 阅读编辑第一

我正在尝试开发一个可以读取和写入 CloudSQL 的 Dataflow 管道,但我面临很多连接问题。

首先,没有本地模板/解决方案可以做到这一点,所以我使用社区开发的库 -> beam-nuggets 它为 apache beam python SDK 提供了一系列转换。

这是我到目前为止所做的:

模板

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions

from beam_nuggets.io import relational_db


def main():
    # get the cmd args
    db_args, pipeline_args = get_args()

    # Create the pipeline
    options = PipelineOptions(pipeline_args)
    options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(options=options) as p:
        source_config = relational_db.SourceConfiguration(
            drivername=db_args.drivername,
            host=db_args.host,
            port=db_args.port,
            database=db_args.database,
            username=db_args.username,
            password=db_args.password,
        )

        data = p | "Reading records from db" >> relational_db.ReadFromDB(
            source_config=source_config,
            table_name=db_args.table
            query='select name, num from months'  # optional. When omitted, all table records are returned.
        )
        records | 'Writing to stdout' >> beam.Map(print)



def get_args():
    parser = argparse.ArgumentParser()
    # adding expected database args
    parser.add_argument('--drivername', dest='drivername', default='mysql+pymysql')
    parser.add_argument('--host', dest='host', default='cloudsql_instance_connection_name')
    parser.add_argument('--port', type=int, dest='port', default=3307)
    parser.add_argument('--database', dest='database', default='irmdb')
    parser.add_argument('--username', dest='username', default='root')
    parser.add_argument('--password', dest='password', default='****')
    parser.add_argument('--table', dest='table', default="table_name")

    parsed_db_args, pipeline_args = parser.parse_known_args()

    return parsed_db_args, pipeline_args


if __name__ == '__main__':
    main()

作业已在 Dataflow 中正确创建,但仍处于加载状态,不显示任何日志:

自从我停止工作后它显示为红色。

管道选项:

为什么我无法连接?我错过了什么?

提前感谢您的帮助。


------------------> 编辑

由于使用 beam-nugget 库没有得到任何结果,我已切换到由 Google 创建的 cloud-sql-python-connector 库。

让我们从头开始。

template.py

import argparse

from google.cloud.sql.connector import connector

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


class ReadSQLTable(beam.DoFn):
    """
    parDo class to get all table names of a given cloudSQL database.
    It will return each table name.
    """

    def __init__(self, hostaddr, host, username, password, dbname):
        super(ReadSQLTable, self).__init__()

        self.hostaddr = hostaddr
        self.host = host
        self.username = username
        self.password = password
        self.dbname = dbname

    def process(self, element):
        # Connect to database

        conn = connector.connect(
            self.hostaddr,
            self.host,
            user=self.username,
            password=self.password,
            db=self.dbname
        )

        # Execute a query
        cursor = conn.cursor()
        cursor.execute("SELECT * from table_name")

        # Fetch the results
        result = cursor.fetchall()

        # Do something with the results
        for row in result:
            print(row)


def main(argv=None, save_main_session=True):
    """Main entry point; defines and runs the wordcount pipeline."""

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--hostaddr',
        dest='hostaddr',
        default='project_name:region:instance_name',
        help='Host Address')
    parser.add_argument(
        '--host',
        dest='host',
        default='pymysql',
        help='Host')
    parser.add_argument(
        '--username',
        dest='username',
        default='root',
        help='CloudSQL User')
    parser.add_argument(
        '--password',
        dest='password',
        default='password',
        help='Host')
    parser.add_argument(
        '--dbname',
        dest='dbname',
        default='dbname',
        help='Database name')
    known_args, pipeline_args = parser.parse_known_args(argv)

    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    with beam.Pipeline(options=pipeline_options) as p:
        # Read the text file[pattern] into a PCollection.

        # Create a dummy initiator PCollection with one element
        init = p | 'Begin pipeline with initiator' >> beam.Create(['All tables initializer'])
        tables = init | 'Get table names' >> beam.ParDo(ReadSQLTable(
            host=known_args.host,
            hostaddr=known_args.hostaddr,
            dbname=known_args.dbname,
            username=known_args.username,
            password=known_args.password))


if __name__ == '__main__':
    # logging.getLogger().setLevel(logging.INFO)
    main()

按照Apache Beam documentation,我们应该上传一个 requirements.txt 文件来获取必要的包。

requirements.txt

cloud-sql-python-connector==0.4.0

之后,我们应该可以创建数据流模板了。

python3 -m template --runner DataflowRunner /
                    --project project_name /
                    --staging_location gs://bucket_name/folder/staging /
                    --temp_location gs://bucket_name/folder/temp /
                    --template_location gs://bucket_name/folder//templates/template-df /
                    --region europe-west1 /
                    --requirements_file requirements.txt

但是当我尝试执行时,出现如下错误:

没有安装这些库... apache-beam 和 cloud-sql-python-connector 都没有

由于我在 Cloud shell 上遇到此错误,因此我尝试直接在 shell 上下载软件包(听起来很绝望,我很绝望。)

pip3 install -r requirements.txt
pip3 install wheel
pip3 install 'apache-beam[gcp]'

然后我再次执行该函数。现在模板已正确创建:

此外,我们应该创建一个模板元数据,其中包含一些有关参数的信息。我不知道我是否必须在这里添加任何其他内容,所以:


  "description": "An example pipeline.",
  "name": "Motor prueba",
  "parameters": [
  ]

最后,我能够创建和执行管道,但作为最后一次,它仍然在加载而不显示任何日志:

有什么线索吗? :/

【问题讨论】:

警告:我没有使用过 nugget 库。在 Dataflow 中,我们通常建议使用我们(Google)提供的 jdbc 连接器,因为它可以处理很多奇怪的连接问题。话虽如此,您的默认端口显示为 3307,请尝试将端口 3306 作为快速“消除可能的问题”? 如果有帮助,请链接到使用 jdbc 库回答的 SO 问题:***.com/questions/44699643/… @GabeWeiss 据我所知,jdbc 连接器在 Python 上不可用,是吗?出于这个原因,我正在使用外部库... Derp,这将教会我在回答之前不要仔细查看完整的问题。 :) 我一直忘记 Dataflow 现在也有 Python(在此处插入“回到我的一天”评论)。然而!可能会有帮助,我们在这里也有一个 Python 连接器:github.com/GoogleCloudPlatform/cloud-sql-python-connector 它是由一个与我密切合作的同事编写的,所以如果你想走那条路,请告诉我进展如何,我可以通过反馈反馈并在需要时获得答案。 哇!极好的。让我检查一下,我很快就会回来。提前致谢! @GabeWeiss 【参考方案1】:

我没有使用过 nugget.io 库,所以我不熟悉它如何处理连接。我建议试用 Google 维护的 Python 连接器:

https://github.com/GoogleCloudPlatform/cloud-sql-python-connector

看看它是否能够为你连接。

【讨论】:

谢谢,加布!主要问题是如何通过 Dataflow 连接它......我不是 Apache Beam 专家 ^^。我应该创建某种个性化函数来建立与 SQL 的连接吗? 哦,是的,我也对 Dataflow/Beam 不满意。话虽如此,我很确定您需要在 ParDo 中使用连接。检查这个答案(不要做它正在做的事情,但它应该为您提供创建连接部分的指南):***.com/questions/50701736/…。具体来说,您不想做它正在做的事情,因为它会将每个不好的连接都列入白名单。连接器应该能够在连接代码中使用,因此您不必打开实例。 嗨 Gabe,我已经更新了问题以添加更多信息 :) 嗯,这很奇怪。可悲的是我的数据流知识非常有限,让我戳一下同事,看看他是否有任何想法。 我将不胜感激!谢谢,加布!

以上是关于如何从 Python SDK 中的 Dataflow 作业连接到 CloudSQL?的主要内容,如果未能解决你的问题,请参考以下文章

如何从用于 vm 的 Azure Python SDK 获取 OS 磁盘相关的详细信息

如何从我在 Google App Engine SDK 上运行的 Python 应用程序访问本地 MySQL 实例?

如何通过手机sdk中的afnetworking从服务器获取数据

如何使环境变量作为python sdk中的环境变量到达Dataflow工作人员

python中如何从字符串中提取数字?

如何从 iOS 中的 Facebook SDK 获取用户信息?