PySpark:如何使用带有 JDBC 连接的 MySQL 函数?

Posted

技术标签:

【中文标题】PySpark:如何使用带有 JDBC 连接的 MySQL 函数?【英文标题】:PySpark: how to use a MySQL function with JDBC connection? 【发布时间】:2019-09-20 12:22:31 【问题描述】:

我在 mysql 数据库中使用此查询

select *,
    UNIX_TIMESTAMP(CONVERT_TZ(
        SUBSTRING(input_date, 1, 19),
        SUBSTRING(input_date, 20),
        'SYSTEM'
    )) as timestamp
from my_table

用于将带有 UTC 偏移量的样本 input_date 时间戳(例如 2018-12-15T13:48:16-08:00)转换为纪元时间。

我现在需要使用 PySpark 做同样的事情并通过 JDBC 连接访问该表,但是当我尝试时出现以下错误

Py4JJavaError: An error occurred while calling o62.sql.
: org.apache.spark.sql.AnalysisException: Undefined function: 'CONVERT_TZ'. This function is neither a registered temporary function nor a permanent function registered in the database ...

我做错了什么?在 PySpark 中是否有更好的方法?

谢谢

【问题讨论】:

【参考方案1】:

您可以使用此功能连接到 MySQL db:

def connect_to_sql(
    spark, jdbc_hostname, jdbc_port, database, data_table, username, password
):
    jdbc_url = "jdbc:mysql://0:1/2".format(jdbc_hostname, jdbc_port, database)

    connection_details = 
        "user": username,
        "password": password,
        "driver": "com.mysql.cj.jdbc.Driver",
    

    df = spark.read.jdbc(url=jdbc_url, table=data_table, properties=connection_details)
    return df

关于时区转换,这个问题会对你有所帮助:

How to convert a Date String from UTC to Specific TimeZone in HIVE?

【讨论】:

感谢 pissal,这就是我正在使用的,但我的问题与使用函数 CONVERT_TZ 有关,该函数显然在 Hive 中不可用,因此我收到了上述错误。所以我的问题是如何避免这种情况。

以上是关于PySpark:如何使用带有 JDBC 连接的 MySQL 函数?的主要内容,如果未能解决你的问题,请参考以下文章

我们如何使用 jdbc 执行连接查询,而不是使用 pyspark 获取多个表

如何使用 JDBC 源在 (Pyspark?

如何使用 JDBC 源在 (Pyspark?

使用 pyspark 对 SQL Server JDBC 使用 Windows 身份验证

通过 JDBC 进行并行化 - Pyspark - 并行化如何使用 JDBC 工作?

DSX PySpark 使用自定义 JDBC 方言将数据写入 dashDB