使用 pyspark 请求 JSON

Posted

技术标签:

【中文标题】使用 pyspark 请求 JSON【英文标题】:Request a JSON with pyspark 【发布时间】:2018-03-23 09:52:55 【问题描述】:

我正在尝试请求具有复杂架构的 JSON 文件(来自 Google Maps API)以获取所有 latlng。请在此处找到 JSON 示意图:

root                                                                            
 |-- geocoded_waypoints: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- geocoder_status: string (nullable = true)
 |    |    |-- place_id: string (nullable = true)
 |    |    |-- types: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- routes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- bounds: struct (nullable = true)
 |    |    |    |-- northeast: struct (nullable = true)
 |    |    |    |    |-- lat: double (nullable = true)
 |    |    |    |    |-- lng: double (nullable = true)
 |    |    |    |-- southwest: struct (nullable = true)
 |    |    |    |    |-- lat: double (nullable = true)
 |    |    |    |    |-- lng: double (nullable = true)
 |    |    |-- copyrights: string (nullable = true)
 |    |    |-- legs: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- distance: struct (nullable = true)
 |    |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |    |-- value: long (nullable = true)
 |    |    |    |    |-- duration: struct (nullable = true)
 |    |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |    |-- value: long (nullable = true)
 |    |    |    |    |-- end_address: string (nullable = true)
 |    |    |    |    |-- end_location: struct (nullable = true)
 |    |    |    |    |    |-- lat: double (nullable = true)
 |    |    |    |    |    |-- lng: double (nullable = true)
 |    |    |    |    |-- start_address: string (nullable = true)
 |    |    |    |    |-- start_location: struct (nullable = true)
 |    |    |    |    |    |-- lat: double (nullable = true)
 |    |    |    |    |    |-- lng: double (nullable = true)
 |    |    |    |    |-- steps: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- distance: struct (nullable = true)
 |    |    |    |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |    |    |    |-- value: long (nullable = true)
 |    |    |    |    |    |    |-- duration: struct (nullable = true)
 |    |    |    |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |    |    |    |-- value: long (nullable = true)
 |    |    |    |    |    |    |-- end_location: struct (nullable = true)
 |    |    |    |    |    |    |    |-- lat: double (nullable = true)
 |    |    |    |    |    |    |    |-- lng: double (nullable = true)
 |    |    |    |    |    |    |-- html_instructions: string (nullable = true)
 |    |    |    |    |    |    |-- maneuver: string (nullable = true)
 |    |    |    |    |    |    |-- polyline: struct (nullable = true)
 |    |    |    |    |    |    |    |-- points: string (nullable = true)
 |    |    |    |    |    |    |-- start_location: struct (nullable = true)
 |    |    |    |    |    |    |    |-- lat: double (nullable = true)
 |    |    |    |    |    |    |    |-- lng: double (nullable = true)
 |    |    |    |    |    |    |-- travel_mode: string (nullable = true)
 |    |    |    |    |-- traffic_speed_entry: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- via_waypoint: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- location: struct (nullable = true)
 |    |    |    |    |    |    |    |-- lat: double (nullable = true)
 |    |    |    |    |    |    |    |-- lng: double (nullable = true)
 |    |    |    |    |    |    |-- step_index: long (nullable = true)
 |    |    |    |    |    |    |-- step_interpolation: double (nullable = true)
 |    |    |-- overview_polyline: struct (nullable = true)
 |    |    |    |-- points: string (nullable = true)
 |    |    |-- summary: string (nullable = true)
 |    |    |-- warnings: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- waypoint_order: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- status: string (nullable = true)

这是我获取latlng 数据的函数:

def getTraceGps(json_file, spark):
   #Lecture du fichier route
   sqlContext = SQLContext(spark)
   df=sqlContext.read.json(json_file, multiLine=True)
   df.printSchema()
   df.createOrReplaceTempView("Maps")
   df.select(df["routes.bounds.northeast.lat"],   df["routes.bounds.northeast.lng"]).show() #IT WORKS
   df.select(df["routes.legs.steps.end_location.lat"],df["routes.legs.steps.end_location.lng"]) #WRONG
   results.show()

这是日志:

py4j.protocol.Py4JJavaError: An error occurred while calling o53.select.
: org.apache.spark.sql.AnalysisException: cannot resolve '`routes`.`legs`['steps']' due to data type mismatch: argument 2 requires integral type, however, ''steps'' is of string type.;;
'Project [routes#1.legs[steps].end_location.lat AS lat#19, routes#1.legs[steps].end_location.lng AS lng#20]
+- AnalysisBarrier
  +- Relation[geocoded_waypoints#0,routes#1,status#2] json

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:120)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:120)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:125)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:125)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:104)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3295)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1307)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

我不明白为什么第一个 df.select 有效,而第二个无效。 可能是因为steps 包含多个对象。 我之前尝试了很多查询,但我错了。

问题从何而来?

提前谢谢你。

【问题讨论】:

【参考方案1】:

错误信息有点神秘,但请注意legs 是一个数组类型。因为它是一个数组,所以您必须使用括号表示法选择特定元素(如legs[1]

我没有在任何文档中看到IntegralType,但它是spark.sql 内部结构的一部分。它只是表示数组等的内部数据类型,请参阅

https://github.com/apache/spark/blob/cba69aeb453d2489830f3e6e0473a64dee81989e/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala

【讨论】:

但是如果我需要选择一个索引来选择一行,那么做一个sql请求没有任何意义。无论如何,非常感谢您的回答。

以上是关于使用 pyspark 请求 JSON的主要内容,如果未能解决你的问题,请参考以下文章

用于 API 发布请求的 PySpark DataFrame 到 JSON

ImportError:运行火花时没有名为请求的模块

如何在pyspark中连接具有相同名称的列的值

在非 Spark 环境中加载 pyspark ML 模型

具有组间聚合结果的 Pyspark 窗口

PySpark 将 JSON 字符串分解为多列