空间数据分析利器之-GeoSpark

Posted 横竖乌托邦

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了空间数据分析利器之-GeoSpark相关的知识,希望对你有一定的参考价值。

READING
前言


对于涉及到地理信息数据的处理,常规的技术手段是无法实现的,因为没有对应支持的点、线、面等数据格式,必须要使用专业的软件进行处理,但是专业软件还是有一定门槛的,所以为了能够实现GIS的空间数据分析和做到低延迟的分析处理工作,一个大规模空间数据的集群计算系统就应运而生,GeoSpark,是Apache Spark第三方项目中的一个子项目,继承自Apache Spark ,并拥有一系列创造性的空间弹性分布式数据集(SRDDs),包括PointRDD,RectangleRDD,PolygonRDD以及LineStringRDD


环境准备:

scala

<geospark.version>1.3.1</geospark.version><dependency>    <groupId>org.datasyslab</groupId>    <artifactId>geospark</artifactId>    <version>${geospark.version}</version></dependency><dependency>    <groupId>org.datasyslab</groupId>    <artifactId>geospark-sql_2.3</artifactId>    <version>${geospark.version}</version></dependency>

python

1.安装geospark模块--------pip install geospark

2.将geospark包中的相关jar包copy到$SPARK_HOME/jars

C:\Users\wzj\AppData\Local\Programs\Python\Python37\Lib\site-packages\geospark\jars\2_3



开  搞

Spatial RDD

通过csv文件数据创建点、线、面

创建点的测试数据

-88.331492,32.324142,hotel-84.01,34.01,gas-99.388954,32.357073,bar-88.221102,32.35078,restaurant2,2,test1,3,ce
def createpointRDD(sc: SparkContext): Unit ={  val pointRDDInputLocation = "D:\\bigdata\\gis_spark\\geospark\\data\\test.csv"  // 这个变量控制我们的地理经度和纬度在数据的哪两列,我们这里是第0,1列,Offset就设置为0  val pointRDDOffset = 0  val pointRDDSplitter = FileDataSplitter.CSV  // 这个参数允许我们除了经纬度外还可以携带其他自定义数据  val carryOtherAttributes = true  val objectRDD = new PointRDD(sc, pointRDDInputLocation,pointRDDOffset, pointRDDSplitter, carryOtherAttributes)  // 获取rawRDD进行遍历输出  objectRDD.rawSpatialRDD.rdd.collect().foreach(println)}

创建面的测试数据

-88.331492,32.324142,-88.331492,32.324142,-88.331492,32.324142,-88.331492,32.324142,-88.331492,32.324142,hotel-88.175933,32.360763,-88.175933,32.360763,-88.175933,32.360763,-88.175933,32.360763,-88.175933,32.360763,gas-88.388954,32.357073,-88.388954,32.357073,-88.388954,32.357073,-88.388954,32.357073,-88.388954,32.357073,bar-88.221102,32.35078,-88.221102,32.35078,-88.221102,32.35078,-88.221102,32.35078,-88.221102,32.35078,restaurant
def createPolygonRDD(sc: SparkContext): Unit ={  val polygonRDDInputLocation = "D:\\bigdata\\gis_spark\\geospark\\data\\polygon.csv"  val polygonRDDStartOffset = 0 // The coordinates start from Column 0  val polygonRDDEndOffset = 9 // The coordinates end at Column 8  val polygonRDDSplitter = FileDataSplitter.CSV // or use  FileDataSplitter.TSV  val carryOtherAttributes = true  val objectRDD = new PolygonRDD(sc, polygonRDDInputLocation, polygonRDDStartOffset, polygonRDDEndOffset, polygonRDDSplitter, carryOtherAttributes)  // 获取rawRDD进行遍历输出  objectRDD.rawSpatialRDD.rdd.collect().foreach(println)}

通过Geometry工厂类创建点、线、面

def GeometryFactoryCreate(): Unit ={  // 创建一个坐标  val coord = new Coordinate(-84.01, 34.01)  // 实例化Geometry工厂类  val factory = new GeometryFactory()
 // 创建Point  val pointObject = factory.createPoint(coord)
 // 创建Polygon  val coordinates = new Array[Coordinate](5)  coordinates(0) = new Coordinate(0,0)  coordinates(1) = new Coordinate(0,4)  coordinates(2) = new Coordinate(4,4)  coordinates(3) = new Coordinate(4,0)  // 多边形是闭合的,所有最后一个点就是第一个点  coordinates(4) = coordinates(0)  val polygonObject = factory.createPolygon(coordinates)  println(polygonObject)
 // 创建LineString  val coordinates2 = new Array[Coordinate](4)  coordinates2(0) = new Coordinate(0,0)  coordinates2(1) = new Coordinate(0,4)  coordinates2(2) = new Coordinate(4,4)  coordinates2(3) = new Coordinate(4,0)  val linestringObject = factory.createLineString(coordinates2)  println(linestringObject)}

RDD==>DataFrame

-88.331492,32.324142,1.hotel-88.175933,32.360763,1.gas-88.388954,32.357073,1.bar-88.588954,32.357073,1.spark
// RDD转dataframeval pointRDDInputLocation = "D:\\bigdata\\gis_spark\\geospark\\data\\checkinone.csv"val pointRDDOffset = 0val pointRDDSplitter = FileDataSplitter.CSVval carryOtherAttributes = trueval objectRDD = new PointRDD(sc.sparkContext, pointRDDInputLocation,pointRDDOffset, pointRDDSplitter, carryOtherAttributes)val frame = Adapter.toDf(objectRDD.asInstanceOf[SpatialRDD[Geometry]], sc)frame.show()

运行结果:

+--------------------+-------+|            geometry|    _c1|+--------------------+-------+|POINT (-88.331492...|1.hotel||POINT (-88.175933...|  1.gas||POINT (-88.388954...|  1.bar||POINT (-88.588954...|1.spark|+--------------------+-------+

Spatial SQL

我们也可以利用sparkSQL去更方便进行数据的读取和转换

-88.331492,32.324142,hotel-84.01,34.01,gas-99.388954,32.357073,bar-88.221102,32.35078,restaurant2,2,test1,3,ce
val sc = SparkSession.builder()  .master("local[*]") // Delete this if run in cluster mode  .appName("readTestScala") // Change this to a proper name  // Enable GeoSpark custom Kryo serializer  .config("spark.serializer", classOf[KryoSerializer].getName)  .config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)  .getOrCreate()GeoSparkSQLRegistrator.registerAll(sc)val rawDf = sc.read.format("csv").option("delimiter", ",").option("header", "false").load("D:\\bigdata\\gis_spark\\geospark\\data\\test.csv")rawDf.createOrReplaceTempView("rawdf")sc.sql("SELECT ST_Point(CAST(_c0 AS Decimal(24,20)),CAST(_c1 AS Decimal(24,20))) as point,* FROM rawdf").show()

运行结果:将数据转化成了地理坐标(常用函数运用后面会单独有一篇内容去聊)

+--------------------+----------+---------+----------+|               point|       _c0|      _c1|       _c2|+--------------------+----------+---------+----------+|POINT (-88.331492...|-88.331492|32.324142|     hotel||POINT (-84.01 34.01)|    -84.01|    34.01|       gas||POINT (-99.388954...|-99.388954|32.357073|       bar||POINT (-88.221102...|-88.221102| 32.35078|restaurant||         POINT (22)|         2|2|      test||         POINT (1 3)|         1|        3|        ce|+--------------------+----------+---------+----------+


读取shapfile文件


Scala

val shapefileInputLocation="file:///D:\\transactiondata"val value = ShapefileReader.readToGeometryRDD(sc.sparkContext, shapefileInputLocation)

python

import findsparkfindspark.init()from geospark.register import GeoSparkRegistratorfrom geospark.core.formatMapper.shapefileParser import ShapefileReaderfrom geospark.utils.adapter import Adapterfrom pyspark.sql import SparkSessionfrom geospark.utils import KryoSerializer, GeoSparkKryoRegistratorspark = SparkSession\    .builder\    .master("local[*]")\    .config("spark.serializer", KryoSerializer.getName).\    config("spark.kryo.registrator", GeoSparkKryoRegistrator.getName)\    .getOrCreate()GeoSparkRegistrator.registerAll(spark)shape_file_location = "D:\\newYork"value = ShapefileReader.readToGeometryRDD(spark.sparkContext, shape_file_location)df = Adapter.toDf(value,spark)df.show(10)spark.stop()

>>下一篇会聊一聊空间连接查询算法,空间临近算法,空间范围查询



转载是一种动力 分享是一种美德

以上是关于空间数据分析利器之-GeoSpark的主要内容,如果未能解决你的问题,请参考以下文章

(十二)Geospark源码解析(一)

干货|海量数据处理利器之布隆过滤器

Geospark电火花使用再记录

xts包:时间序列分析利器

今天发布了电火花geoSpark作业视频

linux工作利器之二,网络分析工具tcpdump