在 Spark 中将 Json 键转换为列
Posted
技术标签:
【中文标题】在 Spark 中将 Json 键转换为列【英文标题】:Convert Json keys into Columns in Spark 【发布时间】:2017-11-14 10:30:03 【问题描述】:我编写了一个代码,它读取数据并从元组中挑选第二个元素。第二个元素恰好是一个 JSON。 获取 JSON 的代码:
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.conf.Configuration;
import com.amazon.traffic.emailautomation.cafe.purchasefilter.util.CodecAwareManifestFileSystem;
import com.amazon.traffic.emailautomation.cafe.purchasefilter.util.CodecAwareManifestInputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import amazon.emr.utils.manifest.input.ManifestItemFileSystem;
import amazon.emr.utils.manifest.input.ManifestInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat ;
import scala.Tuple2;
val configuration = new Configuration(sc.hadoopConfiguration);
ManifestItemFileSystem.setImplementation(configuration);
ManifestInputFormat.setInputFormatImpl(configuration, classOf[TextInputFormat]);
val linesRdd1 = sc.newAPIHadoopFile("location", classOf[ManifestInputFormat[LongWritable,Text]], classOf[LongWritable], classOf[Text], configuration).map(tuple2 => tuple2._2.toString());
下面是一个例子:
"data": "marketplaceId":7,"customerId":123,"eventTime":1471206800000,"asin":"4567","type":"OWN","region":"NA","uploadedDate":1471338703958
现在,我想创建一个数据框,其中包含像 marketplaceId、customerId 等 json 键作为列和具有其值的行。我不确定如何进行此操作?有人可以帮我用指针来帮助我实现同样的目标吗?
【问题讨论】:
【参考方案1】:您可以使用此链接创建一个用于编组/解组 JSON 的 scala 对象 https://coderwall.com/p/o--apg/easy-json-un-marshalling-in-scala-with-jackson
然后使用该对象在 scala 中使用案例类读取 JSON 数据:
import org.apache.spark.SparkConf, SparkContext
object stackover
case class Data(
marketplaceId: Double,
customerId: Double,
eventTime: Double,
asin: String,
`type`: String,
region: String
)
case class R00tJsonObject(
data: Data,
uploadedDate: Double
)
def main(args: Array[String]): Unit =
val conf = new SparkConf(true)
conf.setAppName("example");
conf.setMaster("local[*]")
val sc = new SparkContext(conf)
val data = sc.textFile("test1.json")
val parsed = data.map(row => JsonUtil.readValue[R00tJsonObject](row))
parsed.map(rec => (rec.data, rec.uploadedDate, rec.data.customerId,
rec.data.marketplaceId)).collect.foreach(println)
输出:
(Data(7.0,123.0,1.4712068E12,4567,OWN,NA),1.471338703958E12,123.0,7.0)
【讨论】:
以上是关于在 Spark 中将 Json 键转换为列的主要内容,如果未能解决你的问题,请参考以下文章
如何解压缩数据框列中存在的 json 的键,值将转换为键作为列,而使用 python 将其值转换为列?