空间数据分析利器之-GeoSpark
Posted 横竖乌托邦
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了空间数据分析利器之-GeoSpark相关的知识,希望对你有一定的参考价值。
scala
<geospark.version>1.3.1</geospark.version>
<dependency>
<groupId>org.datasyslab</groupId>
<artifactId>geospark</artifactId>
<version>${geospark.version}</version>
</dependency>
<dependency>
<groupId>org.datasyslab</groupId>
<artifactId>geospark-sql_2.3</artifactId>
<version>${geospark.version}</version>
</dependency>
python
1.安装geospark模块--------pip install geospark
2.将geospark包中的相关jar包copy到$SPARK_HOME/jars
C:\Users\wzj\AppData\Local\Programs\Python\Python37\Lib\site-packages\geospark\jars\2_3
Spatial RDD
通过csv文件数据创建点、线、面
创建点的测试数据
-88.331492,32.324142,hotel
-84.01,34.01,gas
-99.388954,32.357073,bar
-88.221102,32.35078,restaurant
2,2,test
1,3,ce
def createpointRDD(sc: SparkContext): Unit ={
val pointRDDInputLocation = "D:\\bigdata\\gis_spark\\geospark\\data\\test.csv"
// 这个变量控制我们的地理经度和纬度在数据的哪两列,我们这里是第0,1列,Offset就设置为0
val pointRDDOffset = 0
val pointRDDSplitter = FileDataSplitter.CSV
// 这个参数允许我们除了经纬度外还可以携带其他自定义数据
val carryOtherAttributes = true
val objectRDD = new PointRDD(sc, pointRDDInputLocation,pointRDDOffset, pointRDDSplitter, carryOtherAttributes)
// 获取rawRDD进行遍历输出
objectRDD.rawSpatialRDD.rdd.collect().foreach(println)
}
创建面的测试数据
-88.331492,32.324142,-88.331492,32.324142,-88.331492,32.324142,-88.331492,32.324142,-88.331492,32.324142,hotel
-88.175933,32.360763,-88.175933,32.360763,-88.175933,32.360763,-88.175933,32.360763,-88.175933,32.360763,gas
-88.388954,32.357073,-88.388954,32.357073,-88.388954,32.357073,-88.388954,32.357073,-88.388954,32.357073,bar
-88.221102,32.35078,-88.221102,32.35078,-88.221102,32.35078,-88.221102,32.35078,-88.221102,32.35078,restaurant
def createPolygonRDD(sc: SparkContext): Unit ={
val polygonRDDInputLocation = "D:\\bigdata\\gis_spark\\geospark\\data\\polygon.csv"
val polygonRDDStartOffset = 0 // The coordinates start from Column 0
val polygonRDDEndOffset = 9 // The coordinates end at Column 8
val polygonRDDSplitter = FileDataSplitter.CSV // or use FileDataSplitter.TSV
val carryOtherAttributes = true
val objectRDD = new PolygonRDD(sc, polygonRDDInputLocation, polygonRDDStartOffset, polygonRDDEndOffset, polygonRDDSplitter, carryOtherAttributes)
// 获取rawRDD进行遍历输出
objectRDD.rawSpatialRDD.rdd.collect().foreach(println)
}
通过Geometry工厂类创建点、线、面
def GeometryFactoryCreate(): Unit ={
// 创建一个坐标
val coord = new Coordinate(-84.01, 34.01)
// 实例化Geometry工厂类
val factory = new GeometryFactory()
// 创建Point
val pointObject = factory.createPoint(coord)
// 创建Polygon
val coordinates = new Array[Coordinate](5)
coordinates(0) = new Coordinate(0,0)
coordinates(1) = new Coordinate(0,4)
coordinates(2) = new Coordinate(4,4)
coordinates(3) = new Coordinate(4,0)
// 多边形是闭合的,所有最后一个点就是第一个点
coordinates(4) = coordinates(0)
val polygonObject = factory.createPolygon(coordinates)
println(polygonObject)
// 创建LineString
val coordinates2 = new Array[Coordinate](4)
coordinates2(0) = new Coordinate(0,0)
coordinates2(1) = new Coordinate(0,4)
coordinates2(2) = new Coordinate(4,4)
coordinates2(3) = new Coordinate(4,0)
val linestringObject = factory.createLineString(coordinates2)
println(linestringObject)
}
RDD==>DataFrame
-88.331492,32.324142,1.hotel
-88.175933,32.360763,1.gas
-88.388954,32.357073,1.bar
-88.588954,32.357073,1.spark
// RDD转dataframe
val pointRDDInputLocation = "D:\\bigdata\\gis_spark\\geospark\\data\\checkinone.csv"
val pointRDDOffset = 0
val pointRDDSplitter = FileDataSplitter.CSV
val carryOtherAttributes = true
val objectRDD = new PointRDD(sc.sparkContext, pointRDDInputLocation,pointRDDOffset, pointRDDSplitter, carryOtherAttributes)
val frame = Adapter.toDf(objectRDD.asInstanceOf[SpatialRDD[Geometry]], sc)
frame.show()
运行结果:
+--------------------+-------+
| geometry| _c1|
+--------------------+-------+
|POINT (-88.331492...|1.hotel|
|POINT (-88.175933...| 1.gas|
|POINT (-88.388954...| 1.bar|
|POINT (-88.588954...|1.spark|
+--------------------+-------+
Spatial SQL
我们也可以利用sparkSQL去更方便进行数据的读取和转换
-88.331492,32.324142,hotel
-84.01,34.01,gas
-99.388954,32.357073,bar
-88.221102,32.35078,restaurant
2,2,test
1,3,ce
val sc = SparkSession.builder()
.master("local[*]") // Delete this if run in cluster mode
.appName("readTestScala") // Change this to a proper name
// Enable GeoSpark custom Kryo serializer
.config("spark.serializer", classOf[KryoSerializer].getName)
.config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
.getOrCreate()
GeoSparkSQLRegistrator.registerAll(sc)
val rawDf = sc.read.format("csv").option("delimiter", ",").option("header", "false").load("D:\\bigdata\\gis_spark\\geospark\\data\\test.csv")
rawDf.createOrReplaceTempView("rawdf")
sc.sql("SELECT ST_Point(CAST(_c0 AS Decimal(24,20)),CAST(_c1 AS Decimal(24,20))) as point,* FROM rawdf").show()
运行结果:将数据转化成了地理坐标(常用函数运用后面会单独有一篇内容去聊)
+--------------------+----------+---------+----------+
| point| _c0| _c1| _c2|
+--------------------+----------+---------+----------+
|POINT (-88.331492...|-88.331492|32.324142| hotel|
|POINT (-84.01 34.01)| -84.01| 34.01| gas|
|POINT (-99.388954...|-99.388954|32.357073| bar|
|POINT (-88.221102...|-88.221102| 32.35078|restaurant|
| POINT (22)| 2|2| test|
| POINT (1 3)| 1| 3| ce|
+--------------------+----------+---------+----------+
读取shapfile文件
Scala
val shapefileInputLocation="file:///D:\\transactiondata"
val value = ShapefileReader.readToGeometryRDD(sc.sparkContext, shapefileInputLocation)
python
import findspark
findspark.init()
from geospark.register import GeoSparkRegistrator
from geospark.core.formatMapper.shapefileParser import ShapefileReader
from geospark.utils.adapter import Adapter
from pyspark.sql import SparkSession
from geospark.utils import KryoSerializer, GeoSparkKryoRegistrator
spark = SparkSession\
.builder\
.master("local[*]")\
.config("spark.serializer", KryoSerializer.getName).\
config("spark.kryo.registrator", GeoSparkKryoRegistrator.getName)\
.getOrCreate()
GeoSparkRegistrator.registerAll(spark)
shape_file_location = "D:\\newYork"
value = ShapefileReader.readToGeometryRDD(spark.sparkContext, shape_file_location)
df = Adapter.toDf(value,spark)
df.show(10)
spark.stop()
>>下一篇会聊一聊空间连接查询算法,空间临近算法,空间范围查询
以上是关于空间数据分析利器之-GeoSpark的主要内容,如果未能解决你的问题,请参考以下文章