用于vertica sql的pyspark逻辑连接

Posted

技术标签:

【中文标题】用于vertica sql的pyspark逻辑连接【英文标题】:pyspark logical conjunction for vertica sql 【发布时间】:2019-02-13 19:48:20 【问题描述】:

spark1.6,从我的 Vertica 数据库中检索数据以对其进行处理 以下查询在 vertica db 上运行良好,但在 pyspark 上不起作用,Spark DataFrames 支持使用 JDBC 源进行谓词下推,但术语谓词是用于严格的 SQL 含义。这意味着它仅涵盖 WHERE 子句。此外,它看起来仅限于逻辑连接(恐怕没有 IN 和 OR)和简单的谓词,它显示了这个错误:java.lang.RuntimeException: Option 'dbtable' not specified

conf = (SparkConf()
.setAppName("hivereader")
.setMaster("yarn-client")
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.shuffle.service.enabled", "false")
.set("spark.io.compression.codec", "snappy")
.set("spark.rdd.compress", "true")
.set("spark.executor.cores" , 7)
.set("spark.sql.inMemoryStorage.compressed", "true")
.set("spark.sql.shuffle.partitions" , 2000)
.set("spark.sql.tungsten.enabled" , 'true')
.set("spark.port.maxRetries" , 200))

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

url = "*******"
properties = "user": "*****", "password": "*******", "driver": "com.vertica.jdbc.Driver" 

df = sqlContext.read.format("JDBC").options(
    url = url,
    query = "SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29'",
    **properties
).load()

df.show()

【问题讨论】:

【参考方案1】:

问题在于,即使您说此查询适用于 Vertica,您的查询也不是以 Vertica 可以识别的 SQL 语法编写的。您的查询应重写为:

SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION
FROM traffic.stats
WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29'

修复所有这些错误后,您的 sqlContext.read 方法应如下所示。

df = sqlContext.read.format("JDBC").options(
    url = url,
    query = "SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29'",
    **properties
).load()

df.show()

或者您可以将表别名为子查询并使用dbtable 而不是query

df = sqlContext.read.format("JDBC").options(
    url = url,
    dbtable = "(SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29') temp",
    **properties
).load()

df.show()

【讨论】:

谢谢,执行后出现下面的错误“:java.lang.RuntimeException: Option 'dbtable' not specified” @MahmoudOdeh 你运行的是什么版本的 Spark 和 Python?我的测试是在 Spark 2.4.0 和 Python 2.7.10 上进行的。 这个错误有点令人困惑。 querydbtable 是互斥的,如果您在options 中使用query,则不应抱怨缺少dbtable。您能否发布您使用的导致此错误的确切代码? 我试过query = "SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-27' AND '2019-01-29' limit 10000000" df = sqlContext.read.format("JDBC").options( url = url, dbtable="( " + query + " ) as temp", **properties ).load() df.show() pyspark 1.6、python 3.4 以及他们在文档中说的:应该读取的 JDBC 表。请注意,可以使用在 SQL 查询的 FROM 子句中有效的任何内容。例如,您也可以在括号中使用子查询,而不是完整的表。

以上是关于用于vertica sql的pyspark逻辑连接的主要内容,如果未能解决你的问题,请参考以下文章

Vertica SQL 用于按列获取数据

执行 pyspark.sql.DataFrame.take(4) 超过一小时

从 Datagrip 连接到 Vertica

无法从 Spark 显示 Vertica 表

用于连接两个按连接逻辑排序的表的最佳 SQL 查询

c#连接vertica数据库