在 Apache Spark JDBC DataFrame 中使用 Postgis 几何类型

Posted

技术标签:

【中文标题】在 Apache Spark JDBC DataFrame 中使用 Postgis 几何类型【英文标题】:Using Postgis geometry type in Apache Spark JDBC DataFrame 【发布时间】:2016-03-03 13:46:05 【问题描述】:

我想知道我是否可以在 Apache Spark 的 SQL 和 DataFrames 中使用 Postgis 几何类型。

我做到了这一点:我首先注意到我可以编写一个 Postgis 方言和一个用户定义的类型,我称之为 PostgisDialectGeometryType。这是我的代码:

object PostgisDialect extends JdbcDialect 

  override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")

  override def getCatalystType(
    sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = 
    if (sqlType == Types.OTHER) 
      toCatalystType(typeName)
     else None
  

  // TODO: support more type names.
  private def toCatalystType(typeName: String): Option[DataType] = typeName match 
    case "geometry" => Some(GeometryType)
    case _ => None
  

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match 
    case GeometryType => Some(JdbcType("geometry", Types.OTHER))
    case _ => None
  

  override def getTableExistsQuery(table: String): String = 
    s"SELECT 1 FROM $table LIMIT 1"
  

  override def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = 
    super.beforeFetch(connection, properties)

    if (properties.getOrElse("fetchsize", "0").toInt > 0) 
      connection.setAutoCommit(false)
    

  


class GeometryType private() extends UserDefinedType[Geometry] 

  override def sqlType: DataType = BinaryType

  override def pyUDT: String = "my.types.GeometryType"

  override def serialize(obj: Any): GenericArrayData = 
    obj match 
      case p: Geometry =>
        val output = (new WKBWriter).write(p)
        new GenericArrayData(output)
    
  

  override def deserialize(datum: Any): Geometry = 
    datum match 
      case values: Array[Byte] => (new WKBReader).read(values)
    
  

  override def userClass: Class[Geometry] = classOf[Geometry]

  override def asNullable: GeometryType = this


case object GeometryType extends GeometryType

到目前为止一切顺利,但是当JDBCRDD调用方法getConversions时:

/**
 * Maps a StructType to a type tag list.
 */
def getConversions(schema: StructType): Array[JDBCConversion] =
    schema.fields.map(sf => getConversions(sf.dataType, sf.metadata))

private def getConversions(dt: DataType, metadata: Metadata): JDBCConversion = dt match 
    case BooleanType => BooleanConversion
    case DateType => DateConversion
    case DecimalType.Fixed(p, s) => DecimalConversion(p, s)
    case DoubleType => DoubleConversion
    case FloatType => FloatConversion
    case IntegerType => IntegerConversion
    case LongType => if (metadata.contains("binarylong")) BinaryLongConversion else LongConversion
    case StringType => StringConversion
    case TimestampType => TimestampConversion
    case BinaryType => BinaryConversion
    case ArrayType(et, _) => ArrayConversion(getConversions(et, metadata))
    case _ => throw new IllegalArgumentException(s"Unsupported type $dt.simpleString")
  

当然,我的自定义类型没有转换。

Caused by: java.lang.IllegalArgumentException: Unsupported type geometry
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.org$apache$spark$sql$execution$datasources$jdbc$JDBCRDD$$getConversions(JDBCRDD.scala:351)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConversions$1.apply(JDBCRDD.scala:337)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConversions$1.apply(JDBCRDD.scala:337)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.getConversions(JDBCRDD.scala:337)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.<init>(JDBCRDD.scala:385)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:359)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

有没有办法为我的自定义类型注册转换?

【问题讨论】:

【参考方案1】:

Spark 没有处理几何数据的内置函数,但创建了以下库来处理几何数据类型。这两个库都提供了可用于处理几何数据类型的函数。

    塞多纳链接 - https://sedona.apache.org/tutorial/sql/ GeoMesa 链接 - https://www.geomesa.org/documentation/stable/user/spark/index.html

我最近使用这两个库来处理几何数据类型,以简化和合并几何并将其加载到 postgress。

以下是一些建议-

1.浏览文档,查看其中任何一个中是否存在您需要的功能,并一次使用一个 libaray,因为将两者安装在同一个集群上可能会导致一些问题。

2.有不同版本兼容不同的spark版本,您可以通过此链接找到您正在使用的spark版本和兼容性的详细信息 https://sedona.apache.org/download/overview/

3.请按照本教程中给出的步骤 - https://sedona.apache.org/tutorial/sql/

在“Register SedonaSQL”步骤之后,您可以通过运行以下命令来交叉检查是否所有功能都可以使用。

spark.catalog.listFunctions().show()

谢谢。

【讨论】:

以上是关于在 Apache Spark JDBC DataFrame 中使用 Postgis 几何类型的主要内容,如果未能解决你的问题,请参考以下文章

Apache spark JDBC连接读写驱动程序丢失

在 Apache Spark JDBC DataFrame 中使用 Postgis 几何类型

sparksql 自定义用户函数(UDF)

Apache Spark JDBC SQL 注入 (pyspark)

是否可以使用 spark 的 jdbc 驱动程序将 apache spark 与 jasper 集成?

Spark Jdbc 连接 JDBCOptions