在 Apache Spark JDBC DataFrame 中使用 Postgis 几何类型
Posted
技术标签:
【中文标题】在 Apache Spark JDBC DataFrame 中使用 Postgis 几何类型【英文标题】:Using Postgis geometry type in Apache Spark JDBC DataFrame 【发布时间】:2016-03-03 13:46:05 【问题描述】:我想知道我是否可以在 Apache Spark 的 SQL 和 DataFrames 中使用 Postgis 几何类型。
我做到了这一点:我首先注意到我可以编写一个 Postgis 方言和一个用户定义的类型,我称之为 PostgisDialect
和 GeometryType
。这是我的代码:
object PostgisDialect extends JdbcDialect
override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] =
if (sqlType == Types.OTHER)
toCatalystType(typeName)
else None
// TODO: support more type names.
private def toCatalystType(typeName: String): Option[DataType] = typeName match
case "geometry" => Some(GeometryType)
case _ => None
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match
case GeometryType => Some(JdbcType("geometry", Types.OTHER))
case _ => None
override def getTableExistsQuery(table: String): String =
s"SELECT 1 FROM $table LIMIT 1"
override def beforeFetch(connection: Connection, properties: Map[String, String]): Unit =
super.beforeFetch(connection, properties)
if (properties.getOrElse("fetchsize", "0").toInt > 0)
connection.setAutoCommit(false)
class GeometryType private() extends UserDefinedType[Geometry]
override def sqlType: DataType = BinaryType
override def pyUDT: String = "my.types.GeometryType"
override def serialize(obj: Any): GenericArrayData =
obj match
case p: Geometry =>
val output = (new WKBWriter).write(p)
new GenericArrayData(output)
override def deserialize(datum: Any): Geometry =
datum match
case values: Array[Byte] => (new WKBReader).read(values)
override def userClass: Class[Geometry] = classOf[Geometry]
override def asNullable: GeometryType = this
case object GeometryType extends GeometryType
到目前为止一切顺利,但是当JDBCRDD调用方法getConversions
时:
/**
* Maps a StructType to a type tag list.
*/
def getConversions(schema: StructType): Array[JDBCConversion] =
schema.fields.map(sf => getConversions(sf.dataType, sf.metadata))
private def getConversions(dt: DataType, metadata: Metadata): JDBCConversion = dt match
case BooleanType => BooleanConversion
case DateType => DateConversion
case DecimalType.Fixed(p, s) => DecimalConversion(p, s)
case DoubleType => DoubleConversion
case FloatType => FloatConversion
case IntegerType => IntegerConversion
case LongType => if (metadata.contains("binarylong")) BinaryLongConversion else LongConversion
case StringType => StringConversion
case TimestampType => TimestampConversion
case BinaryType => BinaryConversion
case ArrayType(et, _) => ArrayConversion(getConversions(et, metadata))
case _ => throw new IllegalArgumentException(s"Unsupported type $dt.simpleString")
当然,我的自定义类型没有转换。
Caused by: java.lang.IllegalArgumentException: Unsupported type geometry
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.org$apache$spark$sql$execution$datasources$jdbc$JDBCRDD$$getConversions(JDBCRDD.scala:351)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConversions$1.apply(JDBCRDD.scala:337)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConversions$1.apply(JDBCRDD.scala:337)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.getConversions(JDBCRDD.scala:337)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.<init>(JDBCRDD.scala:385)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:359)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
有没有办法为我的自定义类型注册转换?
【问题讨论】:
【参考方案1】:Spark 没有处理几何数据的内置函数,但创建了以下库来处理几何数据类型。这两个库都提供了可用于处理几何数据类型的函数。
-
塞多纳链接 - https://sedona.apache.org/tutorial/sql/
GeoMesa 链接 - https://www.geomesa.org/documentation/stable/user/spark/index.html
我最近使用这两个库来处理几何数据类型,以简化和合并几何并将其加载到 postgress。
以下是一些建议-
1.浏览文档,查看其中任何一个中是否存在您需要的功能,并一次使用一个 libaray,因为将两者安装在同一个集群上可能会导致一些问题。
2.有不同版本兼容不同的spark版本,您可以通过此链接找到您正在使用的spark版本和兼容性的详细信息 https://sedona.apache.org/download/overview/
3.请按照本教程中给出的步骤 - https://sedona.apache.org/tutorial/sql/
在“Register SedonaSQL”步骤之后,您可以通过运行以下命令来交叉检查是否所有功能都可以使用。
spark.catalog.listFunctions().show()
谢谢。
【讨论】:
以上是关于在 Apache Spark JDBC DataFrame 中使用 Postgis 几何类型的主要内容,如果未能解决你的问题,请参考以下文章
在 Apache Spark JDBC DataFrame 中使用 Postgis 几何类型
Apache Spark JDBC SQL 注入 (pyspark)