使用 pyspark 请求 JSON
Posted
技术标签:
【中文标题】使用 pyspark 请求 JSON【英文标题】:Request a JSON with pyspark 【发布时间】:2018-03-23 09:52:55 【问题描述】:我正在尝试请求具有复杂架构的 JSON 文件(来自 Google Maps API)以获取所有 lat
和 lng
。请在此处找到 JSON 示意图:
root
|-- geocoded_waypoints: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- geocoder_status: string (nullable = true)
| | |-- place_id: string (nullable = true)
| | |-- types: array (nullable = true)
| | | |-- element: string (containsNull = true)
|-- routes: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- bounds: struct (nullable = true)
| | | |-- northeast: struct (nullable = true)
| | | | |-- lat: double (nullable = true)
| | | | |-- lng: double (nullable = true)
| | | |-- southwest: struct (nullable = true)
| | | | |-- lat: double (nullable = true)
| | | | |-- lng: double (nullable = true)
| | |-- copyrights: string (nullable = true)
| | |-- legs: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- distance: struct (nullable = true)
| | | | | |-- text: string (nullable = true)
| | | | | |-- value: long (nullable = true)
| | | | |-- duration: struct (nullable = true)
| | | | | |-- text: string (nullable = true)
| | | | | |-- value: long (nullable = true)
| | | | |-- end_address: string (nullable = true)
| | | | |-- end_location: struct (nullable = true)
| | | | | |-- lat: double (nullable = true)
| | | | | |-- lng: double (nullable = true)
| | | | |-- start_address: string (nullable = true)
| | | | |-- start_location: struct (nullable = true)
| | | | | |-- lat: double (nullable = true)
| | | | | |-- lng: double (nullable = true)
| | | | |-- steps: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- distance: struct (nullable = true)
| | | | | | | |-- text: string (nullable = true)
| | | | | | | |-- value: long (nullable = true)
| | | | | | |-- duration: struct (nullable = true)
| | | | | | | |-- text: string (nullable = true)
| | | | | | | |-- value: long (nullable = true)
| | | | | | |-- end_location: struct (nullable = true)
| | | | | | | |-- lat: double (nullable = true)
| | | | | | | |-- lng: double (nullable = true)
| | | | | | |-- html_instructions: string (nullable = true)
| | | | | | |-- maneuver: string (nullable = true)
| | | | | | |-- polyline: struct (nullable = true)
| | | | | | | |-- points: string (nullable = true)
| | | | | | |-- start_location: struct (nullable = true)
| | | | | | | |-- lat: double (nullable = true)
| | | | | | | |-- lng: double (nullable = true)
| | | | | | |-- travel_mode: string (nullable = true)
| | | | |-- traffic_speed_entry: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- via_waypoint: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- location: struct (nullable = true)
| | | | | | | |-- lat: double (nullable = true)
| | | | | | | |-- lng: double (nullable = true)
| | | | | | |-- step_index: long (nullable = true)
| | | | | | |-- step_interpolation: double (nullable = true)
| | |-- overview_polyline: struct (nullable = true)
| | | |-- points: string (nullable = true)
| | |-- summary: string (nullable = true)
| | |-- warnings: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- waypoint_order: array (nullable = true)
| | | |-- element: string (containsNull = true)
|-- status: string (nullable = true)
这是我获取lat
和lng
数据的函数:
def getTraceGps(json_file, spark):
#Lecture du fichier route
sqlContext = SQLContext(spark)
df=sqlContext.read.json(json_file, multiLine=True)
df.printSchema()
df.createOrReplaceTempView("Maps")
df.select(df["routes.bounds.northeast.lat"], df["routes.bounds.northeast.lng"]).show() #IT WORKS
df.select(df["routes.legs.steps.end_location.lat"],df["routes.legs.steps.end_location.lng"]) #WRONG
results.show()
这是日志:
py4j.protocol.Py4JJavaError: An error occurred while calling o53.select.
: org.apache.spark.sql.AnalysisException: cannot resolve '`routes`.`legs`['steps']' due to data type mismatch: argument 2 requires integral type, however, ''steps'' is of string type.;;
'Project [routes#1.legs[steps].end_location.lat AS lat#19, routes#1.legs[steps].end_location.lng AS lng#20]
+- AnalysisBarrier
+- Relation[geocoded_waypoints#0,routes#1,status#2] json
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:120)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:120)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:125)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:125)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:104)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3295)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1307)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
我不明白为什么第一个 df.select
有效,而第二个无效。
可能是因为steps
包含多个对象。
我之前尝试了很多查询,但我错了。
问题从何而来?
提前谢谢你。
【问题讨论】:
【参考方案1】:错误信息有点神秘,但请注意legs
是一个数组类型。因为它是一个数组,所以您必须使用括号表示法选择特定元素(如legs[1]
)
我没有在任何文档中看到IntegralType
,但它是spark.sql
内部结构的一部分。它只是表示数组等的内部数据类型,请参阅
https://github.com/apache/spark/blob/cba69aeb453d2489830f3e6e0473a64dee81989e/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
【讨论】:
但是如果我需要选择一个索引来选择一行,那么做一个sql请求没有任何意义。无论如何,非常感谢您的回答。以上是关于使用 pyspark 请求 JSON的主要内容,如果未能解决你的问题,请参考以下文章